1

I would like to process a number of large datasets in parallel. Unfortunately the speedup I am getting from using Threads.@threads is very sublinear, as the following simplified example shows.

(I'm very new to Julia, so apologies if I missed something obvious)

Let's create some dummy input data - 8 dataframes with 2 integer columns each and 10 million rows:

using DataFrames

n = 8
dfs = Vector{DataFrame}(undef, n)
for i = 1:n
    dfs[i] = DataFrame(Dict("x1" => rand(1:Int64(1e7), Int64(1e7)), "x2" => rand(1:Int64(1e7), Int64(1e7))))
end

Now do some processing on each dataframe (group by x1 and sum x2)

function process(df::DataFrame)::DataFrame
    combine([:x2] => sum, groupby(df, :x1))
end

Finally, compare the speed of doing the processing on a single dataframe with doing it on all 8 dataframes in parallel. The machine I'm running this on has 50 cores and Julia was started with 50 threads, so ideally there should not be much of a time difference.

julia> dfs_res = Vector{DataFrame}(undef, n)

julia> @time for i = 1:1
           dfs_res[i] = process(dfs[i])
       end
  3.041048 seconds (57.24 M allocations: 1.979 GiB, 4.20% gc time)

julia> Threads.nthreads()
50

julia> @time Threads.@threads for i = 1:n
           dfs_res[i] = process(dfs[i])
       end
  5.603539 seconds (455.14 M allocations: 15.700 GiB, 39.11% gc time)

So the parallel run takes almost twice as long per dataset (this gets worse with more datasets). I have a feeling this has something to do with inefficient memory management. GC time is pretty high for the second run. And I assume the preallocation with undef isn't efficient for DataFrames. Pretty much all the examples I've seen for parallel processing in Julia are done on numeric arrays with fixed and a-priori known sizes. However here the datasets could have arbitrary sizes, columns etc. In R workflows like that can be done very efficiently with mclapply. Is there something similar (or a different but efficient pattern) in Julia? I chose to go with threads and not multi-processing to avoid copying data (Julia doesn't seem to support the fork process model like R / mclapply).

cno
  • 645
  • 4
  • 14
  • Did you set `JULIA_NUM_THREADS`? What does `Threads.nthreads()` output for you? – Simeon Schaub Sep 08 '20 at 12:37
  • Yes I did, see output above. It's `50`. – cno Sep 08 '20 at 12:58
  • Note that in your example if you use `:x2 => sum` instead of `[:x2] => sum`, the operation is much faster and there are much fewer allocations. That's because DataFrames has a fast path for common reductions over a single column. We may improve this to also cover `[:x2] => sum`, but in general better not use vectors if you know you have a single column. (Also, your example is quite extreme as there is on average only one row per group. So it's not really representative of most workflows I would say -- and more groups means more allocations, so a longer GC time, which isn't multithreaded.) – Milan Bouchet-Valat Sep 10 '20 at 21:18

1 Answers1

1

Multithreading in Julia does not scale well beyond 16 threads. Hence you need to use multiprocessing instead. Your code might look like this:

using DataFrames, Distributed
addprocs(4) # or 50
@everywhere using DataFrames, Distributed

n = 8
dfs = Vector{DataFrame}(undef, n)
for i = 1:n
    dfs[i] = DataFrame(Dict("x1" => rand(1:Int64(1e7), Int64(1e7)), "x2" => rand(1:Int64(1e7), Int64(1e7))))
end

@everywhere function process(df::DataFrame)::DataFrame
    combine([:x2] => sum, groupby(df, :x1))
end

dfs_res = @distributed (vcat) for i = 1:n
      df = process(dfs[i])
      (i, myid(), df)
end

What is important in this type of code is that transferring data between processes takes time. So sometimes you might want just to keep separate DataFrames on separate workers. Like always - it depends on your processing architecture.

Edit some notes on the performance

For testing have your code in functions and use consts (or use BenchamrTools.jl)

using DataFrames

const dfs = [DataFrame(Dict("x1" => rand(1:Int64(1e7), Int64(1e7)), "x2" => rand(1:Int64(1e7), Int64(1e7)))) for i in 1:8 ]

function process(df::DataFrame)::DataFrame
    combine([:x2] => sum, groupby(df, :x1))
end

function p1!(res, d)
    for i = 1:8
        res[i] = process(dfs[i])
    end
end


function p2!(res, d)
     Threads.@threads for i = 1:8
        res[i] = process(dfs[i])
    end
end

const dres = Vector{DataFrame}(undef, 8)

And here result

julia> GC.gc();@time p1!(dres, dfs)
 30.840718 seconds (507.28 M allocations: 16.532 GiB, 6.42% gc time)

julia> GC.gc();@time p1!(dres, dfs)
 30.827676 seconds (505.66 M allocations: 16.451 GiB, 7.91% gc time)

julia> GC.gc();@time p2!(dres, dfs)
 18.002533 seconds (505.77 M allocations: 16.457 GiB, 23.69% gc time)

julia> GC.gc();@time p2!(dres, dfs)
 17.675169 seconds (505.66 M allocations: 16.451 GiB, 23.64% gc time)

Why the difference is only approx 2x on an 8 cores machine - because we have spent most of the time garbage collecting! (look at output in your question - the problem is the same) When you use less RAM you will see a better multithreading speed-up up to 3x.

Przemyslaw Szufel
  • 40,002
  • 3
  • 32
  • 62
  • Thanks! This approach does seem to be faster for this example, although I was hoping to avoid the copying overhead of multiprocessing (my real use case is much larger). Also note that in my example above the effective number of threads is 8, so well below the 16 you say would be infeasible. – cno Sep 08 '20 at 13:00
  • In your example you are incorrectly measuring the time because you measure both the compile time and the runtime. The compile time for Julia is significantly longer for multi-threaded than single threaded code and for distributed code compiling takes even longer. Have a look at: https://stackoverflow.com/questions/60867667/how-to-obtain-the-execution-time-of-a-function-in-julia In either case when you start measuring correctly you will be happy about the performance up to around 16 threads – Przemyslaw Szufel Sep 08 '20 at 14:08
  • I did execute each command a few times - does that not mean it will use the cached compiled version after the first time? Otherwise how could I measure it better? – cno Sep 08 '20 at 14:10
  • I added some comments on performance testing. Anyway running on 50 cores you should use Distributed not Threads, have plenty of memory to avoid excessive GCs and have a good workflow and data distribution strategy among the workers. – Przemyslaw Szufel Sep 08 '20 at 14:38
  • Thanks for the pointer about more precise measuring of execution time. However, the basic conclusion is still the same - a very sublinear scaling with multithreading. I don't think it has anything to do with number of cores / threads, since this is a deliberately small example using only 8. Agree about the GC. The question is then - how to avoid it? I'm using only basic functionality of the DataFrames library in this example. – cno Sep 08 '20 at 15:24
  • The easiest way to avoid GC - rent out a machine with huge RAM in the cloud (e.g. AWS). This is almost always the cheapest option in scenarios like this one. If not than you need to look for a more memory efficient implementation of the task you plan to do. This is again problem dependent, there is no one-fits-all solution. – Przemyslaw Szufel Sep 08 '20 at 15:43
  • Hmm not sure. The machine I ran this on has >300G RAM of which >200G are free. The Julia process' max RAM usage at no point went over 25G. So I don't think it's a RAM availability problem. – cno Sep 08 '20 at 15:45