3

I'm looking for a way to map a function f :: a -> IO(b) over a 2-dimensional array, in parallel, while retaining sensible memory consumption.
I would also appreciate being able to also supply the array index as an argument of the function, i.e. mapping g :: Int -> Int -> a -> IO(b), like imap from Data.Vector, or mapWithKey from Data.Map.
Current attempts (see below) either have terrible memory consumption, or throw an error at runtime.

Note that, in actual fact, the type of the function I am interested in is h :: Int -> Int -> a -> Random b, where Random denotes some Random monad from Control.Monad.Random; I move it over to the IO monad using evalRandIO.


Attempted solutions:

Say I want to map the function foo :: Int -> Int -> a -> IO(b) over a 2D array of elements of type a. (Here a and b are specific types; no implicit universal quantification.)
So far, I have tried the following approaches:

  1. Plain lists with Control.Concurrent.Async

    import Control.Concurrent.Async(mapConcurrently)
    
    indexedArray :: [[(Int,Int,a)]]
    indexedArray = -- ...
    mappedArray = mapConcurrently (traverse (\(x,y,a) -> foo x y a)) indexedArray
    

The problem with this approach is that the memory consumption is off the charts (say 4GB for reference).
As noted in the answers, with this approach I'm only evaluating the rows in parallel instead of all elements, but that doesn't make much difference to me in practice.

  1. Repa

    import qualified Data.Array.Repa as R
    import Data.Array.Repa(Z(..), (:.)(..), U, DIM2)
    
    array :: R.Array U DIM2 a
    array = -- ...
    mappedArray = R.traverse array id (\i (Z :. x :. y) -> unsafePerformIO $ foo x y (i (Z :. x :. y)))
    result = R.computeP mappedArray
    

Note that R.traverse is not Data.Traversable(traverse). As Repa arrays do not support Data.Traversable(traverse), I cannot sequence the IO actions in any way, so I have to use unsafePerformIO to be able to use the inbuilt "traverse" functionality.
This approach has good performance and excellent memory consumption (around 50MB for reference).
There is a problem however, as I consistently get the following runtime error: thread blocked indefinitely in an MVar operation.

3a. Data.Vector and Control.Parallel

Essentially the same approach as with Repa leads to the same error, thread blocked indefinitely in an MVar operation.
I again resort to using unsafePerformIO as Data.Vector vectors do not have a traversable instance.

    import qualified Data.Vector as V
    import Control.Parallel.Strategies(using)
    import Data.Vector.Strategies(parVector)

    array :: V.Vector (V.Vector a)
    array = -- ...
    mappedArray = V.imap (\ y row -> V.imap (\x a -> unsafePerformIO $ foo x y a ) row ) `using` (parVector 1)

The memory consumption and performance are slightly worse compared to Repa (around 100MB for reference), but remain comparable.

3b. Data.Vector and Control.Concurrent.Async

As suggested by sheyll, but using a flat vector instead of nested vectors.

    import qualified Data.Vector.Unboxed as V
    import qualified Data.Vector.Unboxed.Mutable as M
    import Control.Concurrent.Async(forConcurrently_)

    mappedFlattenedArray = do
      flattenedMArray <- V.unsafeThaw $ -- ...
      forConcurrently_ [0..w*h] (\i -> do v <- M.unsafeRead flattenedMArray i
                                          let (y,x) = quotRem i w
                                          v' <- foo x y v
                                          M.unsafeWrite flattenedMArray i v' )
      V.unsafeFreeze flattenedMArray

Unfortunately the memory consumption is very high with this approach (~3GB). I think it's because forConcurrently_ creates many thunks? I'm not sure how to avoid this problem.

  1. Data.Array and Control.Concurrent.Async

Using the traversable instance of Data.Array arrays, as suggested by Alec:

    import qualified Data.Array.Unboxed as A
    import Control.Concurrent.Async(mapConcurrently)

    indexedArray :: A.Array (Int,Int) ((Int,Int),a)
    indexedArray = -- ...
    mappedArray = mapConcurrently (\((x,y),a) -> foo x y a) indexedArray

Once again, the memory consumption is very high (~3GB), even using unboxed arrays; the problem is probably the same as in approaches 1 and 3b, with a buildup of thunks consuming a lot of memory. I'm not sure how to tackle it.


The overall performance and memory consumption seems to be better with Repa than any of the other approaches, and I also appreciate the inbuilt functionality for dealing with 2-dimensional arrays and being able to map a function that uses indices. Unfortunately, most of the time I get the aforementioned runtime error (but not always!).

I remarked earlier that the only reason the return type of foo is IO(b) is because of non-determinism. So I would have thought I could change the output type to some Random monad, and instead of doing unsafePerformIO I could simply perform a runRandom with a given seed. Unfortunately this did not solve the problem, as I kept getting the error thread blocked indefinitely in an MVar operation anyway.

Is there any way I can salvage method 2 (Repa) to circumvent this error? Or are there other applicable methods? I understand that in general, IO necessarily breaks parallelism as there are no guarantees that the IO actions don't conflict, but at least for this use-case I believe a solution should be possible. (See: Why is there no mapM for repa arrays?)

See also the following question: Parallel mapM on Repa arrays. Note however that I do not know in advance how many random numbers my function foo is going to need.

Community
  • 1
  • 1
Will
  • 321
  • 2
  • 9
  • Have you read the documentation for `unsafePerformIO`? Can you show us `foo`? – jberryman Feb 18 '17 at 00:52
  • 1
    This question seems to be based on a misunderstanding of the role of `IO` in the type system. If you have a function `A -> IO B` which you 'know' has no side effects, then you should be able to rewrite it to have type `IO (A -> B)`. If you cannot do this, then your function really does have side effects. If randomness is your only side effect, then maybe [this](https://hackage.haskell.org/package/MonadRandom-0.5.1/docs/Control-Monad-Random-Class.html#t:MonadInterleave) would be helpful (in particular, docs say "This can be used, for example, to allow random computations to run in parallel") – user2407038 Feb 18 '17 at 06:28

3 Answers3

3

Your first approach is probably what you want, but not with a linked list. Note the type mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b) allows you to do what amounts to a parallel traverse over anything that is Traversable, including Array (I propose Array over Vector here just because it lends itself better to multiple dimensions).

import Control.Concurrent.Async (mapConcurrently)
import Data.Array

indexedArray :: Array (Int,Int) (Int,Int,a)
indexedArray = ...

mappedArray = mapConcurrently (\(x,y,a) -> foo x y a) indexedArray

Also, note that your previous approach with nested lists only parallelized the traverse of each sublist - it didn't paralellize the whole thing.

Alec
  • 31,829
  • 7
  • 67
  • 114
  • Thanks, I didn't realise that Data.Array arrays were traversable, as both Data.Vector vectors and Repa arrays are unfortunately not. – Will Feb 18 '17 at 09:42
  • @will Data.vector is traversable too - just not the unboxed version – Alec Feb 18 '17 at 20:03
  • Oh ok, I didn't realise. Do you have any advice to alleviate the issue of (what I presume is) thunk buildup? All the versions using Async seem to use at least 20x the amount of memory, as I explained in the (updated) question. – Will Feb 18 '17 at 21:06
  • Using `Traversable` instance of boxed `Array`/`Vector` and `mapConcurrently` from `async` package is terribly inefficient approach. First of all array values are boxed (that approach is impossible with unboxed arrays) so for each array cell you have to follow an extra pointer. Another problem is `mapConcurrently` will create something like 3 threads per array element and that has some significant overhead as well, especially if you consider arrays with millions of elements in them (1000x1000 is not such a big array) – lehins Jun 04 '19 at 20:38
2

A couple years late to the party, but there is a library that can do exactly what you need right out of the box: massiv. There is a function imapIO, which has a type signature (when restricting m to IO):

imapIO :: (Source r' ix a, Mutable r ix b)
  => (ix -> a -> IO b) -- ^ Index aware mapping action
  -> Array r' ix a -- ^ Source array
  -> IO (Array r ix b)

Depending on how the source array was constructed, this imapIO can be automatically parallelized or run sequentially. In the example below, because of usage of Par, randomR will be parallelized:

λ> arr = makeArrayR D Par (Sz (5 :. 11)) $ \ (i :. j) -> (i, j)
λ> mapIO randomR arr :: IO (Array P Ix2 Int)
Array P Par (Sz (5 :. 11))
  [ [ 0, 1, 2, 0, 4, 4, 1, 7, 2, 8, 9 ]
  , [ 0, 1, 1, 1, 1, 4, 6, 2, 1, 8, 4 ]
  , [ 2, 1, 2, 3, 4, 5, 5, 3, 4, 9, 4 ]
  , [ 2, 2, 2, 3, 4, 5, 6, 7, 3, 8, 8 ]
  , [ 2, 4, 2, 3, 4, 4, 4, 4, 8, 8, 9 ]
  ]

That being said, this is pretty bad and very slow way of generating array of random numbers. For a few reasons:

  • using randomR (same thing applies to evalRandIO) essentially uses one global random number generator that is stored in an IORef. This approach is thread safe, but because it is in critical section and multiple threads will be trying to use it at the same time, parallelization is not gonna be as effective.
  • It uses random package underneath, which is ridiculously slow, I mean a factor of ~x250 times slower than say splitmix, or any other random library.

There are two better ways of generating random numbers:

  • In case when generator is pure and splittable, we can deterministically split the initial generator as many times as the number of threads we want to use for generation of a random array and than use those generators independently to generate portions of the array. There is a special function massiv that does just that randomArray. Here is an example that uses random package:
λ> import Data.Massiv.Array
λ> import System.Random as System
λ> gen <- System.newStdGen
λ> compute $ randomArray gen System.split System.random Par (Sz2 2 3) :: Array P Ix2 Double
Array P Par (Sz (2 :. 3))
  [ [ 0.8867416334370383, 0.6217394261977418, 0.4536893479057291 ]
  , [ 0.6566602646092554, 0.6988432454700997, 0.14116451452794965 ]
  ]
  • On the other side of spectrum there are stateful, non-splittable generators, such as mwc-random. For those there are separate functions called randomArrayWC, and imapWS which can also very efficiently generate arrays with random values while using a separate random number generator per thread. See my answer to the SO question: Parallel mapM on Repa arrays which was also linked in this question.
lehins
  • 9,642
  • 2
  • 35
  • 49
1

To get max performance and a tight memory layout, without any unnessary copying of arrays, I would suggest to use Data.Vector.Storable.Mutable.

One can thaw/unsafeThaw any immutable vector (e.g. Data.Vector.Storable) to get back a mutable vector, which supports the operations defined in Data.Vector.Storable.Mutable, like read and write, and which are monadic actions with the PrimMonad constraint, a PrimMonad is a basic monad like IO or ST.

For example, the signature of write is:

(PrimMonad m, Storable a) => MVector (PrimState m) a -> Int -> a -> m () 

Just take a look at the documentation for convertion to/from a mutable vector.

Which seems daunting, but it is actually pretty simple: MVector (PrimState m) a is what you get from thaw, m could be ST or IO and PrimState m is the s if m is ST s or ReadWorld if m is IO, the Int parameter is just the element-index and a is the new value. This function returns an action with the sideeffect of inplace/destructively updating the vector at the given position.

When finished mutating a vector, you can freeze/unsafeFreeze it, to get an immutable vector back, freeze and unsafeFreeze are the opposite of thaw and unsafeThaw, e.g. unsafeFreeze has the type signature:

unsafeFreeze :: (Storable a, PrimMonad m) => MVector (PrimState m) a -> m (Vector a) 

As you can see, the function also returns a monadic action with the PrimMonad constraint, see the documentation of the primitive package for more details.

Now, to achieve your goal, as I understand it, you would unsafeThaw the outter vector and then concurrently (from async) unsafeThaw, read, apply foo, write each element and finally unsafeFreeze each inner vector, and then unsafeFreeze the outter mutable vector.

Please note, that this can also be done with unboxed mutable IO arrays, in a similar manner.

Also note, that I assumed from your question, that the parallelism should be restricted to the outter vector, i.e. all rows should be done in parallel not all elements in all rows.

sheyll
  • 135
  • 1
  • 5
  • Thanks, I'll give this a go. I was only doing parallelism row-by-row for convenience, but I don't mind either way. – Will Feb 18 '17 at 09:59
  • I use a similar approach in an application where I need to use a native C-library; the special feature of `Storable` is that any vector of `Storable`s can be used as a C-Array directly, see [Foreign.Storable](https://www.stackage.org/haddock/lts-8.0/base-4.9.1.0/Foreign-Storable.html). – sheyll Feb 19 '17 at 00:08
  • This approach is sensible, but not optimal. First of all this describes ragged array, where it is a boxed vector of storable vectors (i.e `Stoable a => V.Vector (S.Vector a))`, which means the memory layout is not optimal. Secondly using `async` will not give optimal parallelization either, because a work stealing scheduler with just a handful of threads will perform much better. See my answer for more info. – lehins Jun 04 '19 at 20:48