I'm looking for a way to map a function f :: a -> IO(b)
over a 2-dimensional array, in parallel, while retaining sensible memory consumption.
I would also appreciate being able to also supply the array index as an argument of the function, i.e. mapping g :: Int -> Int -> a -> IO(b)
, like imap
from Data.Vector, or mapWithKey
from Data.Map.
Current attempts (see below) either have terrible memory consumption, or throw an error at runtime.
Note that, in actual fact, the type of the function I am interested in is h :: Int -> Int -> a -> Random b
, where Random
denotes some Random monad from Control.Monad.Random; I move it over to the IO monad using evalRandIO
.
Attempted solutions:
Say I want to map the function foo :: Int -> Int -> a -> IO(b)
over a 2D array of elements of type a
. (Here a
and b
are specific types; no implicit universal quantification.)
So far, I have tried the following approaches:
Plain lists with Control.Concurrent.Async
import Control.Concurrent.Async(mapConcurrently) indexedArray :: [[(Int,Int,a)]] indexedArray = -- ... mappedArray = mapConcurrently (traverse (\(x,y,a) -> foo x y a)) indexedArray
The problem with this approach is that the memory consumption is off the charts (say 4GB for reference).
As noted in the answers, with this approach I'm only evaluating the rows in parallel instead of all elements, but that doesn't make much difference to me in practice.
Repa
import qualified Data.Array.Repa as R import Data.Array.Repa(Z(..), (:.)(..), U, DIM2) array :: R.Array U DIM2 a array = -- ... mappedArray = R.traverse array id (\i (Z :. x :. y) -> unsafePerformIO $ foo x y (i (Z :. x :. y))) result = R.computeP mappedArray
Note that R.traverse
is not Data.Traversable(traverse)
. As Repa arrays do not support Data.Traversable(traverse)
, I cannot sequence the IO actions in any way, so I have to use unsafePerformIO
to be able to use the inbuilt "traverse"
functionality.
This approach has good performance and excellent memory consumption (around 50MB for reference).
There is a problem however, as I consistently get the following runtime error: thread blocked indefinitely in an MVar operation
.
3a. Data.Vector and Control.Parallel
Essentially the same approach as with Repa leads to the same error, thread blocked indefinitely in an MVar operation
.
I again resort to using unsafePerformIO
as Data.Vector vectors do not have a traversable instance.
import qualified Data.Vector as V
import Control.Parallel.Strategies(using)
import Data.Vector.Strategies(parVector)
array :: V.Vector (V.Vector a)
array = -- ...
mappedArray = V.imap (\ y row -> V.imap (\x a -> unsafePerformIO $ foo x y a ) row ) `using` (parVector 1)
The memory consumption and performance are slightly worse compared to Repa (around 100MB for reference), but remain comparable.
3b. Data.Vector and Control.Concurrent.Async
As suggested by sheyll, but using a flat vector instead of nested vectors.
import qualified Data.Vector.Unboxed as V
import qualified Data.Vector.Unboxed.Mutable as M
import Control.Concurrent.Async(forConcurrently_)
mappedFlattenedArray = do
flattenedMArray <- V.unsafeThaw $ -- ...
forConcurrently_ [0..w*h] (\i -> do v <- M.unsafeRead flattenedMArray i
let (y,x) = quotRem i w
v' <- foo x y v
M.unsafeWrite flattenedMArray i v' )
V.unsafeFreeze flattenedMArray
Unfortunately the memory consumption is very high with this approach (~3GB). I think it's because forConcurrently_
creates many thunks? I'm not sure how to avoid this problem.
- Data.Array and Control.Concurrent.Async
Using the traversable instance of Data.Array arrays, as suggested by Alec:
import qualified Data.Array.Unboxed as A
import Control.Concurrent.Async(mapConcurrently)
indexedArray :: A.Array (Int,Int) ((Int,Int),a)
indexedArray = -- ...
mappedArray = mapConcurrently (\((x,y),a) -> foo x y a) indexedArray
Once again, the memory consumption is very high (~3GB), even using unboxed arrays; the problem is probably the same as in approaches 1 and 3b, with a buildup of thunks consuming a lot of memory. I'm not sure how to tackle it.
The overall performance and memory consumption seems to be better with Repa than any of the other approaches, and I also appreciate the inbuilt functionality for dealing with 2-dimensional arrays and being able to map a function that uses indices. Unfortunately, most of the time I get the aforementioned runtime error (but not always!).
I remarked earlier that the only reason the return type of foo
is IO(b)
is because of non-determinism. So I would have thought I could change the output type to some Random
monad, and instead of doing unsafePerformIO
I could simply perform a runRandom
with a given seed. Unfortunately this did not solve the problem, as I kept getting the error thread blocked indefinitely in an MVar operation
anyway.
Is there any way I can salvage method 2 (Repa) to circumvent this error? Or are there other applicable methods? I understand that in general, IO necessarily breaks parallelism as there are no guarantees that the IO actions don't conflict, but at least for this use-case I believe a solution should be possible. (See: Why is there no mapM for repa arrays?)
See also the following question: Parallel mapM on Repa arrays. Note however that I do not know in advance how many random numbers my function foo
is going to need.