0

I would like to parallelize this code:

-        "mean": float(zonal_extract.mean().compute()),
-        "min": float(zonal_extract.min().compute()),
-        "max": float(zonal_extract.max().compute()),
-        "sum": float(zonal_extract.sum().compute()),
-        "stddev": float(zonal_extract.std().compute()),
-        "var": float(zonal_extract.var().compute()),

This is the first time that I've tried parallelizing something in python which isn't the same function being called over and over. This would be same data, different function.

attempt1

from dask import compute, delayed


results = delayed({})
results["mean"] = zonal_extract.mean
results["min"] = zonal_extract.min
results["max"] = zonal_extract.max
results["sum"] = zonal_extract.sum
results["stddev"] = zonal_extract.std
results["var"] = zonal_extract.var
results = compute(results, num_workers=4)  # , scheduler='processes'
results = {k: float(v) for k, v in results.items()}

attempt2

mean, min, max, sum, stddev, var = compute(
    zonal_extract.mean(),
    zonal_extract.min(),
    zonal_extract.max(),
    zonal_extract.sum(),
    zonal_extract.std(),
    zonal_extract.var(),
    num_workers=4,
)  # , scheduler='processes'
results = {k: float(v) for k, v in dict(mean, min, max, sum, stddev, var).items()}

This seems like a simple task but I couldn't find anything that worked. maybe it is because I'm already within a multiprocessing context and the nested threading (this is probably something that doesn't exist but it sounds cool) or something is error:

    L = Parallel(n_jobs=-1)(
  File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 1056, in __call__
    self.retrieve()
  File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 935, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/usr/local/lib/python3.9/dist-packages/joblib/_parallel_backends.py", line 542, in wrap_future_result
    return future.result(timeout=timeout)
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
TypeError: Delayed objects of unspecified length are not iterable

real    0m25.048s
user    0m46.943s

edit:

ohhh it is because the delayed function is overwriting joblib's

from dask import compute, delayed
from joblib import Parallel, delayed
M_x
  • 782
  • 1
  • 8
  • 26
jaksco
  • 423
  • 7
  • 18
  • if someone has 1500 reputation they can create the tag `rioxarray` or `python-rioxarray` – jaksco Nov 13 '21 at 02:42
  • I feel like this SO question is very similar but I'm not quite understanding it https://stackoverflow.com/questions/48728383/dask-delayed-object-of-unspecified-length-not-iterable-error-when-combining-dict – jaksco Nov 13 '21 at 02:46

3 Answers3

1

dask.compute will recurse into dicts for you.

You can write it like the following:

results = dict(
    mean=dask.delayed(zonal_extract.mean)(),
    min=dask.delayed(zonal_extract.min)()
    # and more
)

results = dask.compute(results)[0]

The basic idea is that you can have delayed computations nested into tuples, lists, dicts etc that you pass to dask.compute. What was needed here was just to make full-fledged delayed objects out of the function calls.

We can be more “efficient” in terms of not repeating ourselves:


computations = {k: dask.delayed(getattr(zonal_extract, k))()
                for k in "mean min max sum std var".split()}
results = dask.compute(computations)[0]

If I take a step back, I guess it looks like this parallelization is on a too low level - these are all aggregations that are that are not so intense in arithmetic operations, and they all traverse the same data to do it. var is just the square of the std, which is even more trival to speed up in that sense.

creanion
  • 2,319
  • 2
  • 13
  • 17
  • I believe that `.min()` and the other class methods might already be dask delayed functions within rioxarray even though they look the same as pandas, they need to have `.compute()` called on them to be evaluated. but I agree with everything you said. It seems like performance is much better with single-thread rasterio and I should parallelize across data (splitting up zones) instead of by function. Dask/rioxarray took up 38x more CPU time and 2x more realtime compared to single-thread rasterio in my zonal stats case (but I think rioxarray could process much larger files if I ran out of RAM) – jaksco Nov 15 '21 at 17:41
1

@creanion's answer is a good one, but I'd also point out that there shouldn't be a need to wrap the operations like mean(), var(), stddev() etc in dask.delayed objects: these are already lazy operations, so you can call dask.compute() on them directly.

So a minimal example without the delayed wrappers would be:

import dask
import dask.array as da

# Generate some fake data
zonal_extract = da.random.uniform(size=(100,), chunks=10)

summary_stats = {
    "mean": zonal_extract.mean(),
    "std": zonal_extract.std(),
    "var": zonal_extract.var(),
    "min": zonal_extract.min(),
    "max": zonal_extract.max(),
}

# traverse=True is default, but being explicit
summary_stats_computed, = dask.compute(summary_stats, traverse=True)

which produces (with my random number rolls):

{'mean': 0.4903848677019127,
 'std': 0.30733105780457826,
 'var': 0.09445237909128101,
 'min': 0.000996718178509548,
 'max': 0.9981326789252434}
Ian Rose
  • 81
  • 3
0

The main issue was that I was importing two things with the same function name. Changing this

from dask import compute, delayed
from joblib import Parallel, delayed

to this

import dask
from joblib import Parallel, delayed

then the second attempt code started working

    mean, min, max, sum, stddev, var = dask.compute(
        zonal_extract.mean(),
        zonal_extract.min(),
        zonal_extract.max(),
        zonal_extract.sum(),
        zonal_extract.std(),
        zonal_extract.var(),
        num_workers=3,
    )

    results = {
        k: float(v)
        for k, v in dict(
            mean=mean, min=min, max=max, sum=sum, stddev=stddev, var=var
        ).items()
    }

but if someone has a way to actually use dicts with dask and not name things three times I will gladly accept that answer

jaksco
  • 423
  • 7
  • 18