6

I want to use parallel arrays for a task, and before I start with the coding, I'd be interested in knowing if this small snipept is threadsafe:

import collection.mutable._

var listBuffer = ListBuffer[String]("one","two","three","four","five","six","seven","eight","nine")
var jSyncList  = java.util.Collections.synchronizedList(new java.util.ArrayList[String]())
listBuffer.par.foreach { e =>
    println("processed :"+e)
    // using sleep here to simulate a random delay
    Thread.sleep((scala.math.random * 1000).toLong)
    jSyncList.add(e)
}
jSyncList.toArray.foreach(println)

Are there better ways of processing something with parallel collections, and acumulating the results elsewhere?

Geo
  • 93,257
  • 117
  • 344
  • 520

5 Answers5

6

The code you posted is perfectly safe; I'm not sure about the premise though: why do you need to accumulate the results of a parallel collection in a non-parallel one? One of the whole points of the parallel collections is that they look like other collections.

I think that parallel collections also will provide a seq method to switch to sequential ones. So you should probably use this!

oxbow_lakes
  • 133,303
  • 56
  • 317
  • 449
  • Where should I search to see some other examples of acumulating to a parallel collection? Besides using map. – Geo May 07 '11 at 12:49
  • Well, that's difficult to say, given that 2.9 (and hence parallel collections) is *not released yet*. As I said, however, the parallel collections are pretty much identical to sequential ones, from the perspective of an interface, so you would use them in the same manner. I guess it is important to understand which of the operations are going to be parallelized, though. For example, `foldLeft` is **not** parallelizable. – oxbow_lakes May 07 '11 at 14:13
  • Hmm, but it (or something similar) is when applying an associative operation. Is there anything like this in 2.9? – ziggystar May 07 '11 at 14:33
  • How can we find out which operations are not parallelizable? – Geo May 07 '11 at 14:40
  • @ziggystar - this is not true unless the element *folded in* is the identity for the associative operation (i.e. that the fold is in fact a `sum` over a monoid - a la Scalaz). Otherwise, when you have split the collection up; what gets folded into the left hand side of all of the pieces? – oxbow_lakes May 08 '11 at 14:44
  • @lakes Yes. I tried to soften my proposition with the parenthesis text. It's not exactly the reduce operation but the special version of it described by you. But nevertheless, this sounds like a usefull operation (think of sum, multiply, min, max, whatever), and it can be paralellized. What makes us get back to my question. – ziggystar May 08 '11 at 18:15
3

For this pattern to be safe:

listBuffer.par.foreach { e => f(e) }

f has to be able to run concurrently in a safe way. I think the same rules that you need for safe multi-threading apply (access to share state needs to be thread safe, the order of the f calls for different e won't be deterministic and you may run into deadlocks as you start synchronizing your statements in f).

Additionally I'm not clear what guarantees the parallel collections gives you about the underlying collection being modified while being processed, so a mutable list buffer which can have elements added/removed is possibly a poor choice. You never know when the next coder will call something like foo(listBuffer) before your foreach and pass that reference to another thread which may mutate the list while it's being processed.

Other than that, I think for any f that will take a long time, can be called concurrently and where e can be processed out of order, this is a fine pattern.

immutCol.par.foreach { e => threadSafeOutOfOrderProcessingOf(e) }

disclaimer: I have not tried // colls myself, but I'm looking forward at having SO questions/answers show us what works well.

huynhjl
  • 41,520
  • 14
  • 105
  • 158
2

The synchronisedList should be safe, though the println may give unexpected results - you have no guarantees of the order that items will be printed, or even that your printlns won't be interleaved mid-character.

A synchronised list is also unlikely to be the fastest way you can do this, a safer solution is to map over an immutable collection (Vector is probably your best bet here), then print all the lines (in order) afterwards:

val input = Vector("one","two","three","four","five","six","seven","eight","nine")
val output  = input.par.map { e =>
  val msg = "processed :" + e
  // using sleep here to simulate a random delay
  Thread.sleep((math.random * 1000).toLong)
  msg
}
println(output mkString "\n")

You'll also note that this code has about as much practical usefulness as your example :)

Kevin Wright
  • 49,540
  • 9
  • 105
  • 155
2

This code is plain weird -- why add stuff in parallel to something that needs to be synchronized? You'll add contention and gain absolutely nothing in return.

The principle of the thing -- accumulating results from parallel processing, are better achieved with stuff like fold, reduce or aggregate.

Daniel C. Sobral
  • 295,120
  • 86
  • 501
  • 681
  • At the moment I could only think of acumulating using a Collections synchronized List, because I don't know a different way of making the above code threadsafe. – Geo May 07 '11 at 17:54
  • 1
    @Geo The thing is, you are not transforming the collection. You are simply inserting the elements of the parallel collection into a synchronized list. If you showed _what_ you want to do, then we could give advise on how to do it. Right now, your code doesn't do anything. – Daniel C. Sobral May 08 '11 at 02:31
2

The code you've posted is safe - there will be no errors due to inconsistent state of your array list, because access to it is synchronized.

However, parallel collections process items concurrently (at the same time), AND out-of-order. The out-of-order means that the 54. element may be processed before the 2. element - your synchronized array list will contain items in non-predefined order.

In general it's better to use map, filter and other functional combinators to transform a collection into another collection - these will ensure that the ordering guarantees are preserved if a collection has some (like Seqs do). For example:

ParArray(1, 2, 3, 4).map(_ + 1)

always returns ParArray(2, 3, 4, 5).

However, if you need a specific thread-safe collection type such as a ConcurrentSkipListMap or a synchronized collection to be passed to some method in some API, modifying it from a parallel foreach is safe.

Finally, a note - parallel collection provide parallel bulk operations on data. Mutable parallel collections are not thread-safe in the sense that you can add elements to them from different threads. Mutable operations like insertion to a map or appending a buffer still have to be synchronized.

axel22
  • 32,045
  • 9
  • 125
  • 137