import dask.distributed as dd
from .dask import DaskJobExecutor
from .concurrent import ConcurrentJobExecutor
from .inline import InlineJobExecutor
Query the current Dask scheduler and return a :class:`~libertem.common.executor.JobExecutor`
that is compatible with it. See https://docs.dask.org/en/stable/scheduling.html
for the meaning of the different scheduler types.
This can be used to integrate LiberTEM in an existing Dask workflow. This
may not achieve optimal LiberTEM performance and will usually not allow GPU
processing with LiberTEM, but avoids potential compatibility issues from
changing or duplicating the Dask scheduler in an existing workflow.
.. versionadded:: 0.9.0
If a :code:`dask.distributed.Client` is set as the scheduler, return a
:class:`~libertem.executor.dask.DaskJobExecutor` using this :code:`Client`.
If the Dask scheduler is :code:`'threads'`, return a
by the same thread pool as used by Dask.
If the Dask scheduler is :code:`'synchronous'`, return an
which mimics the single-process, single-thread behaviour of Dask.
item = dask.delayed(1)
dask_scheduler = dask.base.get_scheduler(collections=[item])
# We first test for circumstances where we know how to return an adapted
# JobExecutor instance.
if isinstance(dask_scheduler, types.MethodType):
if isinstance(dask_scheduler.__self__, dd.Client):
# See https://github.com/dask/distributed/issues/6776
"Dask profiling seems to be enabled, which is known to cause issues with "
"the DM reader. "
elif dask_scheduler is dask.threaded.get:
# ConcurrentJobExecutor is currently incompatible with ProcessPoolExecutor
# since it can't pickle local functions.
# Therefore, fall through to default case for now.
# elif dask_scheduler is dask.multiprocessing.get:
# # FIXME more research needed if the pool is cached and if
# # one can get hold of it.
# return ConcurrentJobExecutor(
elif dask_scheduler is dask.local.get_sync:
# If we didn't return yet,
# we fall through to the default case.