I would like to parallelize this code:
- "mean": float(zonal_extract.mean().compute()),
- "min": float(zonal_extract.min().compute()),
- "max": float(zonal_extract.max().compute()),
- "sum": float(zonal_extract.sum().compute()),
- "stddev": float(zonal_extract.std().compute()),
- "var": float(zonal_extract.var().compute()),
This is the first time that I've tried parallelizing something in python which isn't the same function being called over and over. This would be same data, different function.
attempt1
from dask import compute, delayed
results = delayed({})
results["mean"] = zonal_extract.mean
results["min"] = zonal_extract.min
results["max"] = zonal_extract.max
results["sum"] = zonal_extract.sum
results["stddev"] = zonal_extract.std
results["var"] = zonal_extract.var
results = compute(results, num_workers=4) # , scheduler='processes'
results = {k: float(v) for k, v in results.items()}
attempt2
mean, min, max, sum, stddev, var = compute(
zonal_extract.mean(),
zonal_extract.min(),
zonal_extract.max(),
zonal_extract.sum(),
zonal_extract.std(),
zonal_extract.var(),
num_workers=4,
) # , scheduler='processes'
results = {k: float(v) for k, v in dict(mean, min, max, sum, stddev, var).items()}
This seems like a simple task but I couldn't find anything that worked. maybe it is because I'm already within a multiprocessing context and the nested threading (this is probably something that doesn't exist but it sounds cool) or something is error:
L = Parallel(n_jobs=-1)(
File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 1056, in __call__
self.retrieve()
File "/usr/local/lib/python3.9/dist-packages/joblib/parallel.py", line 935, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "/usr/local/lib/python3.9/dist-packages/joblib/_parallel_backends.py", line 542, in wrap_future_result
return future.result(timeout=timeout)
File "/usr/lib/python3.9/concurrent/futures/_base.py", line 445, in result
return self.__get_result()
File "/usr/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
TypeError: Delayed objects of unspecified length are not iterable
real 0m25.048s
user 0m46.943s
edit:
ohhh it is because the delayed function is overwriting joblib's
from dask import compute, delayed
from joblib import Parallel, delayed