User-defined functions: advanced topics

The UDF interface offers a wide range of features to help implement advanced functionality and to optimize the performance of an UDF. These features are optional in order to keep UDFs that don’t need them simple.

See User-defined functions (UDFs) for an introduction to basic topics.

Tiled processing

Many operations can be significantly optimized by working on stacks of frames. You can often perform loop nest optimization to improve the locality of reference, for example using numba, or using an optimized NumPy function.

As an example, applying a gain map and subtracting dark frames can be up to an order of magnitude faster when properly optimized compared to a naive NumPy implementation. These optimizations are only possible if you have access to data from more than one frame.

For very large frames, another problem arises: a stack of frames would be too large to efficiently handle, as it would no longer fit into even the L3 cache, which is the largest cache in most CPUs. For these cases, we support a tiled reading and processing strategy. Tiled means we slice the frame into disjoint rectangular regions. A tile then is the data from a single rectangular region for multiple frames.

For example, in case of K2IS data, frames have a shape of (1860, 2048). When reading them with the tiled strategy, a single tile will contain data from 16 subsequent frames, and each rectangle has a shape of (930, 16), which is the natural block size for K2IS data. That means the tiles will have a shape of (16, 930, 16), and processing 16 frames from the data set means reading 256 individual tiles.

Loading a tile of this size as float32 data still fits comfortably into usual L3 CPU caches (~1MB per core), and thus enables efficient processing. As a comparison, a whole (1860, 2048) frame is about 15MB large, and accessing it repeatedly means having to load data from the slower main memory.

Note

You may have noticed that we talk about block sizes of 1MB as efficient in the L3 cache, but many CPUs have larger L3 caches. As the L3 cache is shared between cores, and LiberTEM tries to use multiple cores, the effectively available L3 cache has to be divided by number of cores.

Real-world example

The libertem_blobfinder.udf.correlation.SparseCorrelationUDF uses process_tile() to implement a custom version of a ApplyMasksUDF that works on log-scaled data. The mask stack is stored in a libertem.common.container.MaskContainer as part of the task data. Note how the self.meta.slice property of type Slice is used to extract the region from the mask stack that matches the tile using the facilities of a MaskContainer. After reshaping, transposing and log scaling the tile data into the right memory layout, the mask stack is applied to the data with a dot product. The result is added to the buffer in order to merge it with the results of the other tiles because addition is the correct merge function for a dot product. Other operations would require a different merge function here, for example numpy.max() if a global maximum is to be calculated.

def process_tile(self, tile):
    tile_slice = self.meta.slice
    c = self.task_data.mask_container
    tile_t = np.zeros(
        (np.prod(tile.shape[1:]), tile.shape[0]),
        dtype=tile.dtype
    )
    log_scale(tile.reshape((tile.shape[0], -1)).T, out=tile_t)

    sl = c.get(key=tile_slice, transpose=False)
    self.results.corr[:] += sl.dot(tile_t).T

Partition processing

Some algorithms can benefit from processing entire partitions, for example if they require several passes over the data. In most cases, tiled processing will be faster because it uses the L3 cache more efficiently. For that reason, per-partition processing should only be used if there are clear indications for it. Implementing process_partition() activates per-partition processing for an UDF.

Precedence

By default the UDF interface looks for methods in the order process_tile(), process_frame(), process_partition() and will use the first matching implementation. process_tile() is the most general method and should allow by-frame and by-partition processing as well.

If a UDF requires more than one implementation depending on how it is parametrised or the data it is supplied with, the get_method() method can be overridden to influence how the UDF will be run. This method must return a member of the libertem.udf.base.UDF.UDF_METHOD enum and have the corresponding processing method implemented.

Changed in version 0.13.0: get_method() was exposed to enable choice of processing method at runtime

Post-processing of partition results

Post-processing allows to perform additional processing steps once the data of a partition is completely processed with process_frame(), process_tile() or process_partition(). Post-processing is particularly relevant for tiled processing since that allows to combine the performance benefits of tiled processing for a first reduction step with subsequent steps that require reduced data from complete frames or even a complete partition.

Real-world example from libertem_blobfinder.udf.correlation.SparseCorrelationUDF which evaluates the correlation maps that have been generated with the dot product in the previous processing step and places the results in additional result buffers:

def postprocess(self):
    steps = 2 * self.params.steps + 1
    corrmaps = self.results.corr.reshape((
        -1,  # frames
        len(self.params.peaks),  # peaks
        steps,  # Y steps
        steps,  # X steps
    ))
    peaks = self.params.peaks
    (centers, refineds, peak_values, peak_elevations) = self.output_buffers()
    for f in range(corrmaps.shape[0]):
        evaluate_correlations(
            corrs=corrmaps[f], peaks=peaks, crop_size=self.params.steps,
            out_centers=centers[f], out_refineds=refineds[f],
            out_heights=peak_values[f], out_elevations=peak_elevations[f]
        )

The libertem.udf.base.UDFPostprocessMixin.postprocess() method is called for each partition on the worker process, before the results from different partitions have been merged.

Post-processing after merging

If you want to implement a post-processing step that is run on the main node after merging result buffers, you can override get_results():

class AverageUDF(UDF):
    """
    Like SumUDF, but also computes the average
    """
    def get_result_buffers(self):
        return {
            'sum': self.buffer(kind='sig', dtype=np.float32),
            'num_frames': self.buffer(kind='single', dtype=np.uint64),
            'average': self.buffer(kind='sig', dtype=np.float32, use='result_only'),
        }

    def process_frame(self, frame):
        self.results.sum[:] += frame
        self.results.num_frames[:] += 1

    def merge(self, dest, src):
        dest.sum[:] += src.sum
        dest.num_frames[:] += src.num_frames

    def get_results(self):
        return {
            # NOTE: 'sum' omitted here, will be returned unchanged
            'average': self.results.sum / self.results.num_frames,
        }

ctx.run_udf(dataset=dataset, udf=AverageUDF())

Note that get_result_buffers() returns a placeholder entry for the average result using use='result_only', which is then filled in get_results(). We don’t need to repeat those buffers that should be returned unchanged; if you want to omit a buffer from the results completely, you can declare it as private with self.buffer(..., use='private') in get_result_buffers().

get_results() should return the results as a dictionary of numpy arrays, with the keys matching those returned by get_result_buffers().

When returned from run_udf(), all results are wrapped into BufferWrapper instances. This is done primarily to get convenient access to a version of the result that is suitable for visualization using data, even if a roi was used, but still allow access to the raw result using raw_data attribute.

The detailed rules for buffer declarations, get_result_buffers and get_results are:

  1. All buffers are declared in get_result_buffers

  2. If a buffer is only computed in get_results, it should be marked via use='result_only' so it isn’t allocated on workers

  3. If a buffer is only used as intermediary result, it should be marked via use='private'

  4. Not including a buffer in get_results means it will either be passed on unchanged, or dropped if use='private'

  5. It’s an error to omit an use='result_only' buffer in get_results

  6. It’s an error to include a use='private' buffer in get_results

  7. All results are returned from Context.run_udf as BufferWrapper instances

  8. By default, if get_results is not implemented, use='private' buffers are dropped, and others are passed through unchanged

New in version 0.7.0: UDF.get_results() and the use argument for UDF.buffer() were added.

Pre-processing

Pre-processing allows to initialize result buffers before processing or merging. This is particularly useful to set up dtype=object buffers, for example ragged arrays, or to initialize buffers for operations where the neutral element is not 0. libertem.udf.base.UDFPreprocessMixin.preprocess() is executed after all buffers are allocated, but before the data is processed. On the worker nodes it is executed with views set for the whole partition masked by the current ROI. On the central node it is executed with views set for the whole dataset masked by the ROI.

New in version 0.3.0.

Changed in version 0.5.0: libertem.udf.base.UDFPreprocessMixin.preprocess() is executed on the main node, too. Views for aux data are set correctly on the main node. Previously, it was only executed on the worker nodes.

Valid data masking

In various intermediate states, result buffers may be only partially filled with valid data. This comes from the fact that we split the dataset into partitions and then merge results together only as they become available. When doing live processing we also have the constraint that not all frames have been acquired yet, and so naturally only positions where we have received and processed data can be valid in the corresponding result buffer.

Handling partially valid data becomes important when running the UDF. For example, live plotting needs to know what regions of the results to include. Also, when using run_udf_iter() you might only want to handle the already valid parts of the results.

For UDF results, you can access valid_mask to access the valid mask for that particular buffer. You can also use masked_data to access the data of the buffer as a numpy masked array.

For example:

# use `run_udf_iter` to get partial results while the computation is running:
for res in ctx.run_udf_iter(dataset=dataset, udf=SumSigUDF()):

    # get the BufferWrapper for the 'intensity' buffer of the first UDF:
    buf = res.buffers[0]['intensity']

    # boolean mask that contains True for valid elements:
    mask = buf.valid_mask

    # get a numpy masked array that is limited to the valid elements:
    masked_data = buf.masked_data

By default, the valid masks of results depend on the buffer kind:

Buffer kind

Default valid mask

'single'

All-valid

'sig'

All-valid

'nav'

Parts which have been merged using merge()

In your UDF, you can also customize the valid mask that should be returned. This is useful if the dataset might contain invalid frames discovered at runtime, or if intermediate results cannot be fully valid due to missing information (e.g. calculating gradients across partition boundaries). Another example is when using a kind='single' buffer which is progressively computed; valid-by-default might be incorrect if not enough information has been seen (e.g. statistical measures or a count of values).

Customizing the valid data mask is done in get_results() with the with_mask() method. For example:

class CustomValidMask(UDF):
    def get_result_buffers(self):
        nav_shape = self.meta.dataset_shape.nav

        if self.meta.roi is not None:
            nav_shape = (count_nonzero(self.meta.roi),)

        return {
            # Emulate a kind='nav' result buffer using a kind='sig' buffer for demonstration.
            # In the real world, such result buffers can be found in the implementations of ptychography
            # and iCoM since they reconstruct the object in Fourier space
            'custom_2d': self.buffer(kind='single', dtype=np.float32, extra_shape=nav_shape),
        }

    def process_frame(self, frame):
        self.results.custom_2d[self.meta.coordinates] = np.sum(frame)

    def get_results(self):
        # in this case, we set the valid mask of the kind=single buffer to
        # match what a nav buffer would do.
        valid_nav_mask = self.meta.get_valid_nav_mask()

        return {
            'custom_2d': self.with_mask(
                self.results.custom_2d,
                mask=valid_nav_mask.reshape(self.results.custom_2d.shape)
            ),
        }

    def merge(self, dest, src):
        dest.custom_2d += src.custom_2d


ctx.run_udf(dataset=dataset, udf=CustomValidMask())

This example also shows the get_valid_nav_mask() method, which is available in merge() and get_results(), and gives you a mask of the already-processed navigation elements.

New in version 0.14.0: In previous versions, only libertem.udf.base.UDFResults.damage was available for inspecting the mask of already processed data. This did not account for differences between buffers, and was only available when iterating over results, i.e. with run_udf_iter().

AUX data

If a parameter is an instance of BufferWrapper that was created using the aux_data() class method, the UDF interface will interpret it as auxiliary data. It will set the views for each tile/frame/partition accordingly so that accessing the parameter returns a view of the auxiliary data matching the data portion that is currently being processed. That way, it is possible to pass parameters individually for each frame or to mask the signal dimension.

Note that the BufferWrapper instance for AUX data should always be created using the aux_data() class method and not directly by instantiating a BufferWrapper since aux_data() ensures that it is set up correctly.

For masks in the signal dimension that are used for dot products in combination with per-tile processing, a MaskContainer allows to use more advanced slicing and transformation methods targeted at preparing mask stacks for optimal dot product performance.

Task data

A UDF can generate task-specific intermediate data on the worker nodes by defining a get_task_data() method. The result is available as an instance of UDFData in self.task_data. Depending on the circumstances, this can be more efficient than making the data available as a parameter since it avoids pickling, network transport and unpickling.

This non-trivial example from libertem_blobfinder.udf.correlation.SparseCorrelationUDF creates a MaskContainer based on the parameters in self.params. This MaskContainer is then available as self.task_data.mask_container within the processing functions.

def get_task_data(self):
    match_pattern = self.params.match_pattern
    crop_size = match_pattern.get_crop_size()
    size = (2 * crop_size + 1, 2 * crop_size + 1)
    template = match_pattern.get_mask(sig_shape=size)
    steps = self.params.steps
    peak_offsetY, peak_offsetX = np.mgrid[-steps:steps + 1, -steps:steps + 1]

    offsetY = self.params.peaks[:, 0, np.newaxis, np.newaxis] + peak_offsetY - crop_size
    offsetX = self.params.peaks[:, 1, np.newaxis, np.newaxis] + peak_offsetX - crop_size

    offsetY = offsetY.flatten()
    offsetX = offsetX.flatten()

    stack = functools.partial(
        masks.sparse_template_multi_stack,
        mask_index=range(len(offsetY)),
        offsetX=offsetX,
        offsetY=offsetY,
        template=template,
        imageSizeX=self.meta.dataset_shape.sig[1],
        imageSizeY=self.meta.dataset_shape.sig[0]
    )
    # CSC matrices in combination with transposed data are fastest
    container = MaskContainer(mask_factories=stack, dtype=np.float32,
        use_sparse='scipy.sparse.csc')

    kwargs = {
        'mask_container': container,
        'crop_size': crop_size,
    }
    return kwargs

Meta information

Advanced processing routines may require context information about the processed data set, ROI and current data portion being processed. This information is available as properties of the libertem.udf.base.UDF.meta attribute of type UDFMeta.

Input data shapes and types

Common applications include allocating buffers with a dtype or shape that matches the dataset or partition via dataset_dtype, input_dtype, dataset_shape and partition_shape.

Device class

New in version 0.6.0.

The currently used compute device class can be accessed through libertem.udf.base.UDFMeta.device_class. It defaults to ‘cpu’ and can be ‘cuda’ for UDFs that make use of CuPy support support.

ROI and current slice

For more advanced applications, the ROI and currently processed data portion are available as libertem.udf.base.UDFMeta.roi and libertem.udf.base.UDFMeta.slice. This allows to replace the built-in masking behavior of BufferWrapper for result buffers and aux data with a custom implementation. The mask container for tiled processing example makes use of these attributes to employ a libertem.common.container.MaskContainer instead of a shape="sig" buffer in order to optimize dot product performance and support sparse masks.

The slice is in the reference frame of the dataset, masked by the current ROI, with flattened navigation dimension. This example illustrates the behavior by implementing a custom version of the simple “sum over sig” example. It allocates a custom result buffer that matches the navigation dimension as it appears in processing:

import numpy as np

from libertem.udf import UDF

class PixelsumUDF(UDF):
    def get_result_buffers(self):
        if self.meta.roi is not None:
            navsize = np.count_nonzero(self.meta.roi)
        else:
            navsize = np.prod(self.meta.dataset_shape.nav)
        return {
            'pixelsum_nav_raw': self.buffer(
                kind="single",
                dtype=self.meta.dataset_dtype,
                extra_shape=(navsize, ),
            )
        }

    def merge(self, dest, src):
        dest.pixelsum_nav_raw[:] += src.pixelsum_nav_raw

    def process_frame(self, frame):
        np_slice = self.meta.slice.get(nav_only=True)
        self.results.pixelsum_nav_raw[np_slice] = np.sum(frame)

Coordinates

New in version 0.6.0.

The coordinates of the current frame, tile or partition within the true dataset navigation dimension, as opposed to the current slice that is given in flattened nav dimensions with applied ROI, is available through coordinates. The following UDF simply collects the coordinate info for demonstration purposes. A real-world example that uses the coordinates is the UDF implementation of single side band ptychography.

import numpy as np

from libertem.udf import UDF

class CoordUDF(UDF):
    def get_result_buffers(self):
        # Declare a buffer that fits the coordinates,
        # i.e. one int per nav axis for each nav position
        nav_dims = len(self.meta.dataset_shape.nav)
        return {
            'coords': self.buffer(
                kind="nav",
                dtype=int,
                extra_shape=(nav_dims, ),
            )
        }

    def process_tile(self, tile):
        # Simply copy the coordinates into
        # the result buffer
        self.results.coords[:] = self.meta.coordinates

my_roi = np.zeros(dataset.shape.nav, dtype=bool)
my_roi[7, 13] = True
my_roi[11, 3] = True

res = ctx.run_udf(
    dataset=dataset,
    udf=CoordUDF(),
    roi=my_roi
)

assert np.all(
    res['coords'].raw_data == np.array([(7, 13), (11, 3)])
)

Preferred input dtype

New in version 0.4.0.

UDFs can override get_preferred_input_dtype() to indicate a “lowest common denominator” compatible dtype. The actual input dtype is determined by combining the indicated preferred dtype with the input dataset’s native dtype using numpy.result_type(). The default preferred dtype is numpy.float32. Returning UDF.USE_NATIVE_DTYPE, which is currently identical to numpy.bool, will switch to the dataset’s native dtype since numpy.bool behaves as a neutral element in numpy.result_type().

If an UDF requires a specific dtype rather than only preferring it, it should override this method and additionally check the actual input type, throw an error when used incorrectly and/or implement a meaningful conversion in its processing routine since indicating a preferred dtype doesn’t enforce it. That way, unsafe conversions are performed explicitly in the UDF rather than indirectly in the back-end.

CuPy support

New in version 0.6.0.

LiberTEM can use CUDA devices through CuPy. Since CuPy largely replicates the NumPy array interface, any UDF that uses NumPy for its main processing can likely be ported to use both CPUs and CUDA devices in parallel. Some adjustments are often necessary to account for minor differences between NumPy and CuPy. CuPy is most beneficial for compute-heavy tasks with good CUDA math library support such as large Fourier transforms or matrix products.

In order to activate CuPy processing, a UDF can overwrite the get_backends() method. By default this returns ('numpy',), indicating only NumPy support. By returning ('numpy', 'cupy') or ('cupy',), a UDF activates being run on both CUDA and CPU workers, or exclusively on CUDA workers. Using cuda instead of cupy schedules on CUDA workers, but without using the CuPy library. This is useful for running code that uses CUDA in a different way, for example integration of C++ CUDA code, and allows to skip installation of CuPy in this situation.

The libertem.udf.base.UDF.xp property points to the numpy or cupy module, depending which back-end is currently used. By using self.xp instead of the usual np for NumPy, one can write UDFs that use the same code for CUDA and CPU processing.

Result buffers can be declared as device arrays by setting self.buffer(..., where='device') in get_result_buffers(). That allows to keep data in the device until a partition is completely processed and the result is exported to the leader node.

The input argument for process_*() functions is already provided as a CuPy array instead of NumPy array if CuPy is used.

A UDF should only use one GPU at a time. If cupy is used, the correct device to use is set within CuPy in the back-end and should not be modified in the UDF itself. If cuda is used, it is the responsibility of the user to set the device ID to the value returned by libertem.common.backend.get_use_cuda(). The environment variable CUDA_VISIBLE_DEVICES can be set before any CUDA library is loaded to control which devices are visible.

The run_udf() method allows setting the backends attribute to ('numpy',) ('cupy',) or ('cuda',) to restrict execution to CPU-only or CUDA-only on a hybrid cluster. This is mostly useful for testing.

Sparse arrays

New in version 0.11.0.

As an extension of CuPy support, LiberTEM also supports supplying UDFs with tiles in sparse array formats for both CPU and GPU. A UDF specifies the supported array backends by overwriting get_backends() to return an iterable with the supported formats in order of preference. Each array format is associated with a device class so that CuPy support works analogously for both dense and sparse formats. Dense CPU and GPU arrays are specified with the backends described in CuPy support so that the extension to sparse arrays is backwards-compatible.

The possible backends supported by LiberTEM are available as the following constants: libertem.udf.base.UDF.BACKEND_NUMPY, BACKEND_CUPY, BACKEND_CUDA, BACKEND_SPARSE_COO, BACKEND_SPARSE_GCXS, BACKEND_SPARSE_DOK, BACKEND_SCIPY_COO, BACKEND_SCIPY_CSR, BACKEND_SCIPY_CSC, BACKEND_SCIPY_COO_ARRAY, BACKEND_SCIPY_CSR_ARRAY, BACKEND_SCIPY_CSC_ARRAY, BACKEND_CUPY_SCIPY_COO, BACKEND_CUPY_SCIPY_CSR, BACKEND_CUPY_SCIPY_CSC.

BACKEND_ALL is a tuple of all backends that are recommended for use in UDFs.

New in version 0.12.0: The sets CPU_BACKENDS, CUDA_BACKENDS, and CUPY_BACKENDS categorize backends by device class. SPARSE_BACKENDS and DENSE_BACKENDS categorize between dense and sparse arrays. The backends in D2_BACKENDS only support 2D matrices, while the ones in ND_BACKENDS support n-dimensional arrays.

The frame, tile or partition is supplied with flattened signal dimensions for 2D-only backends. Furthermore, frames include a nav dimension of 1 with these backends.

The backend that is used for a partition is available through libertem.udf.base.UDF.meta.array_backend at runtime. Please note that it can be different between partitions.

Internally, LiberTEM calculates an execution plan that matches the capabilities of all UDFs in a run with the capabilities of the dataset and the device class so that conversion overheads are minimized. LiberTEM can process data in sparse form from start to finish if a dataset that can produce tiles in a sparse format, such as Raw binary files in sparse CSR format, is combined with a set of UDFs that all support a sparse backend.

Since operations on sparse arrays often use an API modelled after NumPy, but return data in various backends that may or may not allow direct assignment into a result buffer, the libertem.udf.base.UDF.forbuf() method converts an array to a backend that is suitable for assigning into the specified result buffer. It also takes care of reshaping from 2D to nD as necessary.

Simplified example implementation based on libertem.udf.sumsigudf.SumSigUDF that demonstrates how support for all array formats can be implemented:

class SumSigUDF(UDF):
    def get_backends(self):
        # Support all recommended array backends in LiberTEM.
        # Please note that their APIs can differ so that comprehensive
        # tests with all supported backends are required. Also, the list of
        # supported backends in LiberTEM may grow in the future, which can
        # require adaptations in an UDF that uses this constant.
        return self.BACKEND_ALL

    def get_result_buffers(self):
        dtype = np.result_type(self.meta.input_dtype, np.float32)
        return {
            'intensity': self.buffer(
                kind="nav", dtype=dtype, where='device'
            ),
        }

    def process_tile(self, tile):
        # Show the backend that is currently used
        print(self.meta.array_backend)

        # Note the following points:
        # * Using self.forbuf(arr, target) to make the result
        #   compatible with the result buffer.
        # * Preemptively flatten the sig dimensions so that
        #   2D and nD arrays work the same.
        # * Work around API peculiarities, such as the axis keyword
        #   that is only partially supported in cupyx.scipy.sparse.
        self.results.intensity[:] += self.forbuf(
            np.sum(
                # Flatten and sum axis 1 for cupyx.scipy.sparse support
                tile.reshape((tile.shape[0], -1)),
                axis=1
            ),
            self.results.intensity
        )

# Empty memory dataset for testing that returns SCIPY_CSR tiles
ds = ctx.load(
    'memory',
    datashape=(23, 42, 17, 4),
    sig_dims=2,
    array_backends=(UDF.BACKEND_SCIPY_CSR, ),
    num_partitions=2,
)

ctx.run_udf(dataset=ds, udf=SumSigUDF())
scipy.sparse.csr_matrix
scipy.sparse.csr_matrix

See the implementation of libertem.udf.masks.ApplyMasksUDF and libertem.udf.stddev.StdDevUDF for non-trivial examples of UDFs that support a wide range of array formats!

Note

The underlying library for array type detection and conversion is available independent of LiberTEM at https://github.com/LiberTEM/sparseconverter/.

Threading

By default, LiberTEM uses multiprocessing with one process per CPU core for offline processing, using the class DaskJobExecutor. In that scenario, UDFs should only use a single thread to avoid oversubscription.

However, when running with a single-process single-thread executor like InlineJobExecutor, multiple threads can be used. In some cases this might be advantageous in combination with the inline executor. The thread count for many common numerics libraries is set automatically by LiberTEM, see threads_per_worker. For other cases the thread count on a worker should be set by the user according to self.meta.threads_per_worker.

Multithreaded executors are introduced with release 0.9.0, see Executors. They run UDF functions in parallel threads within the same process. This can introduce issues with thread safety, for example shared objects being changed concurrently by multiple threads. The LiberTEM internals and core UDFs are tested to work with these executors, but user code may break unexpectedly. PyFFTW interface caching is a known issue of this category. For that reason, the threaded executors should be considererd experimental for the time being. Furthermore, setting and re-setting any global variable, for example the thread count of an external library, should be protected with a reentrant locking mechanism.

The pyFFTW cache is disabled with threaded executors because of this known bug. That can have a negative impact on performance. For performance optimization with pyFFTW, users could use the builder interface of PyFFTW or use the native FFTW object interface.

Running multiple LiberTEM Context objects or executors in parallel threads is not tested and can lead to unexpected interactions.

Auto UDF

The AutoUDF class and map() method allow to run simple functions that accept a frame as the only parameter with an auto-generated kind="nav" result buffer over a dataset ad-hoc without defining an UDF class. For more advanced processing, such as custom merge functions, post-processing or performance optimization through tiled processing, defining an UDF class is required.

As an alternative to Auto UDF, you can use the make_dask_array() method to create a dask.array from a DataSet to perform calculations. See Integration with Dask arrays for more details.

The AutoUDF class determines the output shape and type by calling the function with a mock-up frame of the same type and shape as a real detector frame and converting the return value to a NumPy array. The extra_shape and dtype parameters for the result buffer are derived automatically from this NumPy array.

Additional constant parameters can be passed to the function via functools.partial(), for example. The return value should be much smaller than the input size for this to work efficiently.

Example: Calculate sum over the last signal axis.

import functools

result = ctx.map(
    dataset=dataset,
    f=functools.partial(np.sum, axis=-1)
)

# or alternatively:
from libertem.udf import AutoUDF

udf = AutoUDF(f=functools.partial(np.sum, axis=-1))
result = ctx.run_udf(dataset=dataset, udf=udf)

One-step merge (merge_all)

New in version 0.9.0.

Note

The interface described here is experimental and therefore subject to change. See Merge function for Dask array results for more information.

With the release of the DelayedJobExecutor UDFs have the option to define a method merge_all() which is used by this executor to perform a one-step merge of the results from all partitions. This is only applied by this executor, and as a result is only used when computing with dask.delayed.

In the case of kind='nav' result buffers, only, and no custom merge logic, the merge_all implementation is automatically provided by the base UDF class. If no merge_all is provided, the standard merge function is used via a different mechanism.

Note

When using merge_all, no attempt is made to verify that it functions identically to the merge function, which remains a requirement of the UDF implementation if implementing a custom merge and using other executors.

The merge_all function must have the following signature:

def merge_all(self, ordered_results):
    ...
    return {result_name: merged_result_array, ...}

where ordered_results is an ordered dictionary mapping between the Slice for each partition and a dictionary of partial results keyed by result_name.

An example merge_all to sum all partial results for a kind='sig' result buffer named 'intensity' would be the following:

def merge_all(self, ordered_results):
    intensity = np.stack([b.intensity for b in ordered_results.values()]).sum(axis=0)
    return {
        'intensity': intensity
    }

The ordering of the partial results is such that the Slice objects which are the dictionary keys are sequential in the flattened navigation dimension. The user can therefore safely concatenate the partial results for a given result buffer to get a whole-analysis-sized array with a flat navigation dimension. In the case of kind='nav' buffers the returned arrays must be in this same flat navigation shape.

When an ROI has been provided while running the UDF, the number of elements in the partial results will correspond to the number of valid ROI pixels; the concatenated result will be reshaped into a full dataset-sized array by LiberTEM after merge_all has been called.

Note

When using DelayedJobExecutor and in particular merge_all, the user is operating directly on dask.array.Array objects built for lazy computation. The Dask API is largely compatible with numpy, and will lazily build a task graph from normal numpy functions (e.g. np.stack(arrays).sum() above). However, care must be taken to avoid triggering eager execution accidentally, for example by casting using Python builtins such as int(dask_value). For a degree of certainty, the user is encouraged to consider the Dask Array API when building merge_all functions. The same advice applies to any post-processing applied after merging (Post-processing after merging).

The return value from merge_all must be a dictionary of merged result arrays with the keys matching the declared result buffers. There is, however, no requirement to return merged results for all existing buffers, though any that are missing will not contain results from the computation and are likely to be filled with zeros.

Dynamically updating UDF parameters

New in version 0.12.0: This is an experimental API, which we make explicit by naming the method update_parameters_experimental. It is not guaranteed to work with all UDFs.

Running UDFs in a feedback loop, for example on a live data stream using LiberTEM-live, is not only helpful for adjusting microscope parameters on the fly to match the reconstruction, but also for adjusting and fine tuning UDF parameters. Usually, this would be implemented like this:

while True:
    udf = YourUDF(**get_new_parameters())
    ctx.run_udf(dataset=ds, udf=udf, plots=True)

This works, but limits the feedback loop to update once per complete run, which can be annoying especially for longer dwell times.

Instead, the feedback loop can be shortened by updating parameters as soon as the first results are available. This is accomplished by using run_udf_iter() instead of run_udf(), and calling the update_parameters_experimental() method:

udf = YourUDF(**parameters)
while True:
    result_iter = ctx.run_udf_iter(dataset=ds, udf=[udf])
    for part_res in result_iter:
        result_iter.update_parameters_experimental([
            {'some_parameter': get_new_value()},
        ])

Note that this works when passing multiple UDFs into run_udf_iter() as a list - we need to pass a dict for each of them into update_parameters_experimental(). The dict only needs to contain items for the parameters you want to update, and can be an empty dict if you don’t want to update any parameters for the corresponding UDF.

The call to get_new_value here can be any mechanism to get an updated value, be it querying a graphical user interface, or automatically calculating a new value based on the latest result from the UDFs.

Note

This API is implemented for the following executor types: 'inline', 'dask', 'pipelined', and 'threads', as specified in make_with(). Meaning that if you don’t specify an executor, with either Context or LiveContext from LiberTEM-live, it should just work.

Note also that different executors schedule tasks differently, so especially in the offline case, there may be a bit more latency before the new values for parameters are available in the UDFs.