.. note:: The features described below are experimental and under development. .. testsetup:: * import numpy as np from libertem.api import Context from libertem.executor.inline import InlineJobExecutor ctx = Context(executor=InlineJobExecutor()) dataset = ctx.load("memory", datashape=(16, 16, 16, 16), sig_dims=2) .. _`dask`: Dask integration ================ .. versionadded:: 0.9.0 Since version 0.9, LiberTEM supports seamless integration with workflows that are based on Dask arrays. That means Dask arrays can serve as input for LiberTEM, and UDF computations can produce Dask arrays. Additionally, :meth:`~libertem.contrib.daskadapter.make_dask_array` can create Dask arrays from LiberTEM datasets. Linking with HyperSpy Lazy Signals ---------------------------------- LiberTEM's Dask integration features can be used to combine features from HyperSpy and LiberTEM in a single analysis workflow: .. toctree:: :maxdepth: 2 hyperspy-integration .. _`scheduler`: Scheduler --------- LiberTEM uses Dask.distributed as the default method to execute tasks. Unless instructed otherwise, LiberTEM keeps the default Dask scheduler as-is and only uses the Dask :code:`Client` internally to make sure existing workflows keep running as before. However, for a closer integration it can be beneficial to use the same scheduler for both LiberTEM and other Dask computations. There are several options for that: * Set the LiberTEM Dask cluster as default Dask scheduler: * Use :code:`Context.make_with('dask-make-default')` (:meth:`~libertem.api.Context.make_with`) * Pass :code:`client_kwargs={'set_as_default': True}` to :meth:`~libertem.executor.dask.DaskJobExecutor.connect` or :meth:`~libertem.executor.dask.DaskJobExecutor.make_local` * Use an existing Dask scheduler: * Use :code:`Context.make_with('dask-integration')` to start an executor that is compatible with the current Dask scheduler. * Use dask.delayed: * :class:`libertem.executor.delayed.DelayedJobExecutor` can return UDF computations as Dask arrays. The scheduler will only be determined when :code:`compute()` is called downstream using the normal mechanisms of Dask. .. _daskarray: Load datasets as Dask arrays ---------------------------- The :meth:`~libertem.contrib.daskadapter.make_dask_array` function can generate a `distributed Dask array `_ from a :class:`~libertem.io.dataset.base.DataSet` using its partitions as blocks. The typical LiberTEM partition size is close to the optimum size for Dask array blocks under most circumstances. The dask array is accompanied with a map of optimal workers. This map should be passed to the :meth:`compute` method in order to construct the blocks on the workers that have them in local storage. .. testcode:: make_dask_array from libertem.contrib.daskadapter import make_dask_array # Construct a Dask array from the dataset # The second return value contains information # on workers that hold parts of a dataset in local # storage to ensure optimal data locality dask_array, workers = make_dask_array(dataset) result = dask_array.sum(axis=(-1, -2)).compute(workers=workers) Load Dask arrays as datasets ---------------------------- LiberTEM datasets can be created from Dask arrays by using :meth:`libertem.api.Context.load` with filetype :code:`'dask'`. See :ref:`daskds` for details. The performance can vary and depends on chunking, executor, I/O method and Dask array creation method. Please `contact us `_ or `create an Issue `_ for questions, bug reports and other feedback! This basic example shows running a UDF on a Dask array: .. testcode:: from_dask import dask.array as da from libertem.udf.sum import SumUDF # Create a Context that integrates well with # the current Dask scheduler ctx = Context.make_with('dask-integration') a = da.random.random((23, 42, 17, 19)) ds = ctx.load('dask', a, sig_dims=2) res = ctx.run_udf(dataset=ds, udf=SumUDF()) assert np.allclose( a.sum(axis=(0, 1)).compute(), res['intensity'].raw_data ) See also :ref:`executors` on how to set up compatible schedulers for both Dask and LiberTEM. .. _`delayed_udfs`: Create Dask arrays with UDFs ---------------------------- Using a :class:`~libertem.executor.delayed.DelayedJobExecutor` with a :class:`~libertem.api.Context` lets :class:`~libertem.api.Context.run_udf` return results as Dask arrays. In addition to the usual :attr:`~libertem.common.buffers.BufferWrapper.data` and :attr:`~libertem.common.buffers.BufferWrapper.raw_data` properties, which provide results as numpy arrays eagerly, the results are made available as dask arrays using the attributes :attr:`~libertem.executor.utils.dask_buffer.DaskResultBufferWrapper.delayed_data` and :attr:`~libertem.executor.utils.dask_buffer.DaskResultBufferWrapper.delayed_raw_data`. The computation is only performed when the :code:`compute()` method is called on a Dask array result. .. testcode:: to_dask from libertem.api import Context from libertem.executor.delayed import DelayedJobExecutor from libertem.udf.sumsigudf import SumSigUDF ctx = Context(executor=DelayedJobExecutor()) res = ctx.run_udf(dataset=dataset, udf=SumSigUDF()) print(res['intensity'].delayed_data) .. testoutput:: to_dask dask.array This method allows to create large intermediate results of :code:`kind='nav'` efficiently in the form of Dask arrays. The partial results can stay as ephemeral array chunks on the workers only while they are needed for downstream calculations instead of transporting them through the cluster and instantiating the full result on the main node. For calls to :class:`~libertem.api.Context.run_udf` which run multiple UDFs or have with UDFs which return multiple results, it is strongly recommended to compute results with a single call to :code:`compute()`, in order to re-use computation over the dataset. By default, Dask does not cache intermediate results from prior runs, so individual calls to :code:`compute()` require a complete re-run of the UDFs that were passed to :code:`run_udf`. .. _`merge_all`: .. _dask merge_all: Merge function for Dask array results ------------------------------------- .. note:: See :ref:`udf merge_all` for more information. LiberTEM already uses an efficient default merging method to create Dask arrays for UDFs that only use :code:`kind='nav'` buffers and don't specify their own :code:`merge()`. For all UDFs that define their own :code:`merge()`, :class:`~libertem.executor.delayed.DelayedJobExecutor` will use this existing :code:`udf.merge()` function to assemble the final results, in the same way it is used to assemble partial results in each partition. This is carried out by wrapping the :code:`udf.merge()` in :code:`dask.delayed` calls. However, the user can also specify a :meth:`~libertem.udf.base.UDFMergeAllMixin.merge_all()` method on their UDF that combines all partial results from the workers to the complete result in a single step. This allows the :class:`~libertem.executor.delayed.DelayedJobExecutor` to produce a streamlined task tree, which then gives Dask greater scope to parallelise the merge step and reduce data transfers. This example shows the task tree first with the built-in wrapper for :code:`udf.merge()` and then the stramlined one after defining :code:`merge_all()`. In tests the streamlined variant was twice as fast as the built-in wrapper. .. testsetup:: merge_all import numpy as np from libertem.udf.base import UDF from libertem.executor.delayed import DelayedJobExecutor .. testcode:: merge_all class MySumUDF(UDF): def get_result_buffers(self): return { 'intensity': self.buffer(kind='sig', dtype=self.meta.input_dtype) } def process_tile(self, tile): self.results.intensity[:] += np.sum(tile, axis=0) def merge(self, dest, src): dest.intensity[:] += src.intensity ctx = Context(executor=DelayedJobExecutor()) result = ctx.run_udf(udf=MySumUDF(), dataset=dataset) result['intensity'].delayed_data.visualize() .. image:: ./images/tree-default-merge.png :width: 300px :alt: Task tree with wrapper for standard merge. .. testcode:: merge_all class MySumMergeUDF(MySumUDF): # Define the merge_all() method for the UDF above def merge_all(self, ordered_results): # List and not generator for NumPy dispatch to work chunks = [b.intensity for b in ordered_results.values()] # NumPy will dispatch the stacking to the appropriate method # for the chunks. # See also https://numpy.org/doc/stable/user/basics.dispatch.html stack = np.stack(chunks) # Perform computation on the stacked chunks # equivalent to the normal merge() intensity = stack.sum(axis=0) # Return a dictionary mapping buffer name to new content return {'intensity': intensity} result2 = ctx.run_udf(udf=MySumMergeUDF(), dataset=dataset) result2['intensity'].delayed_data.visualize() .. image:: ./images/tree-merge-all.png :width: 300px :alt: Task tree with :code:`merge_all()`. .. testcleanup:: merge_all assert np.allclose( result['intensity'].raw_data, result2['intensity'].raw_data, ) The argument :code:`ordered_results` is a dictionary of partial results results for that UDF. The dictionary is keyed by :class:`~libertem.common.slice.Slice` objects, one for each partition processed, and is ordered in the flat navigation dimension. Each partial result is itself a dictionary with a key for each result declared in :code:`udf.get_result_buffers()`. The :code:`ordered_results` dictionary is created such that the :code:`merge_all` method can safely concatenate the elements in the case of :code:`'nav'`-shaped results. Any applied ROI is automatically taken into account after the call to :code:`merge_all`. The return value from the function must be a dictionary of merged result arrays with the keys matching the declared result buffers. There is, however, no requirement to return merged results for all existing buffers, though any that are missing will not contain results from the computation and are likely to be filled with zeros. CUDA and scheduling ------------------- A native LiberTEM Dask cluster uses resource tags to schedule work on CPUs or CUDA devices based on an UDF's capability. For Dask integration a fallback was implemented that allows running computations that can run on a CPU on a native LiberTEM Dask cluster without requiring resource tags. However, CUDA-only computations will require passing the appropriate tags as :code:`resources` argument to :code:`compute()`. :meth:`libertem.executor.delayed.DelayedJobExecutor.get_resources_from_udfs` returns the appropriate resources for a given set of UDFs based on their capabilities. .. note:: At this time the combination of CUDA-requiring UDFs and :class:`~libertem.executor.delayed.DelayedJobExecutor` is not well-tested. At a minimum, the merge process carried out on the main node will not take place with GPU-backed Dask arrays, though this is under consideration for the future.