User-defined functions: advanced topics
The UDF interface offers a wide range of features to help implement advanced functionality and to optimize the performance of an UDF. These features are optional in order to keep UDFs that don’t need them simple.
See User-defined functions (UDFs) for an introduction to basic topics.
Tiled processing
Many operations can be significantly optimized by working on stacks of frames. You can often perform loop nest optimization to improve the locality of reference, for example using numba, or using an optimized NumPy function.
As an example, applying a gain map and subtracting dark frames can be up to an order of magnitude faster when properly optimized compared to a naive NumPy implementation. These optimizations are only possible if you have access to data from more than one frame.
For very large frames, another problem arises: a stack of frames would be too large to efficiently handle, as it would no longer fit into even the L3 cache, which is the largest cache in most CPUs. For these cases, we support a tiled reading and processing strategy. Tiled means we slice the frame into disjoint rectangular regions. A tile then is the data from a single rectangular region for multiple frames.
For example, in case of K2IS data, frames have a shape of (1860, 2048)
.
When reading them with the tiled strategy, a single tile will contain data from
16 subsequent frames, and each rectangle has a shape of (930, 16)
, which
is the natural block size for K2IS data. That means the tiles will have a shape
of (16, 930, 16)
, and processing 16 frames from the data set means
reading 256 individual tiles.
Loading a tile of this size as float32 data still fits comfortably into usual L3
CPU caches (~1MB per core), and thus enables efficient processing. As a comparison, a
whole (1860, 2048)
frame is about 15MB large, and accessing it
repeatedly means having to load data from the slower main memory.
Note
You may have noticed that we talk about block sizes of 1MB as efficient in the L3 cache, but many CPUs have larger L3 caches. As the L3 cache is shared between cores, and LiberTEM tries to use multiple cores, the effectively available L3 cache has to be divided by number of cores.
Real-world example
The libertem_blobfinder.udf.correlation.SparseCorrelationUDF
uses
process_tile()
to implement a custom version of
a ApplyMasksUDF
that works on log-scaled data. The
mask stack is stored in a libertem.common.container.MaskContainer
as part of
the task data. Note how the self.meta.slice
property of type
Slice
is used to extract the region from the mask
stack that matches the tile using the facilities of a
MaskContainer
. After reshaping, transposing and log
scaling the tile data into the right memory layout, the mask stack is applied to
the data with a dot product. The result is added to the buffer in order to
merge it with the results of the other tiles because addition is the correct
merge function for a dot product. Other operations would require a different
merge function here, for example numpy.max()
if a global maximum is to
be calculated.
def process_tile(self, tile):
tile_slice = self.meta.slice
c = self.task_data.mask_container
tile_t = np.zeros(
(np.prod(tile.shape[1:]), tile.shape[0]),
dtype=tile.dtype
)
log_scale(tile.reshape((tile.shape[0], -1)).T, out=tile_t)
sl = c.get(key=tile_slice, transpose=False)
self.results.corr[:] += sl.dot(tile_t).T
Partition processing
Some algorithms can benefit from processing entire partitions, for example if
they require several passes over the data. In most cases, tiled
processing will be faster because it uses the L3 cache more efficiently.
For that reason, per-partition processing should only be used if there are clear
indications for it. Implementing
process_partition()
activates
per-partition processing for an UDF.
Precedence
By default the UDF interface looks for methods in the order
process_tile()
,
process_frame()
,
process_partition()
and will use the first
matching implementation. process_tile()
is
the most general method and should allow by-frame and by-partition processing as well.
If a UDF requires more than one implementation depending on how it is
parametrised or the data it is supplied with, the get_method()
method can be overridden to influence how the UDF will be run. This method must
return a member of the libertem.udf.base.UDF.UDF_METHOD
enum and have
the corresponding processing method implemented.
Changed in version 0.13.0: get_method()
was exposed to enable
choice of processing method at runtime
Post-processing of partition results
Post-processing allows to perform additional processing steps once the data of a
partition is completely processed with
process_frame()
,
process_tile()
or
process_partition()
. Post-processing is
particularly relevant for tiled processing since that allows to combine the
performance benefits of tiled processing for a first reduction step with
subsequent steps that require reduced data from complete frames or even a
complete partition.
Real-world example from
libertem_blobfinder.udf.correlation.SparseCorrelationUDF
which
evaluates the correlation maps that have been generated with the dot product in
the previous processing step and places the results in additional result
buffers:
def postprocess(self):
steps = 2 * self.params.steps + 1
corrmaps = self.results.corr.reshape((
-1, # frames
len(self.params.peaks), # peaks
steps, # Y steps
steps, # X steps
))
peaks = self.params.peaks
(centers, refineds, peak_values, peak_elevations) = self.output_buffers()
for f in range(corrmaps.shape[0]):
evaluate_correlations(
corrs=corrmaps[f], peaks=peaks, crop_size=self.params.steps,
out_centers=centers[f], out_refineds=refineds[f],
out_heights=peak_values[f], out_elevations=peak_elevations[f]
)
The libertem.udf.base.UDFPostprocessMixin.postprocess()
method is called
for each partition on the worker process, before the results from different
partitions have been merged.
Post-processing after merging
If you want to implement a post-processing step that is run on the main node
after merging result buffers, you can override
get_results()
:
class AverageUDF(UDF):
"""
Like SumUDF, but also computes the average
"""
def get_result_buffers(self):
return {
'sum': self.buffer(kind='sig', dtype=np.float32),
'num_frames': self.buffer(kind='single', dtype=np.uint64),
'average': self.buffer(kind='sig', dtype=np.float32, use='result_only'),
}
def process_frame(self, frame):
self.results.sum[:] += frame
self.results.num_frames[:] += 1
def merge(self, dest, src):
dest.sum[:] += src.sum
dest.num_frames[:] += src.num_frames
def get_results(self):
return {
# NOTE: 'sum' omitted here, will be returned unchanged
'average': self.results.sum / self.results.num_frames,
}
ctx.run_udf(dataset=dataset, udf=AverageUDF())
Note that get_result_buffers()
returns a placeholder entry for the
average
result using use='result_only'
, which is then filled in
get_results()
. We don’t need to repeat those buffers that should be
returned unchanged; if you want to omit a buffer from the results completely,
you can declare it as private with self.buffer(..., use='private')
in
get_result_buffers()
.
get_results()
should return the results as a dictionary of numpy
arrays, with the keys matching those returned by
get_result_buffers()
.
When returned from run_udf()
, all results are wrapped into
BufferWrapper
instances. This is done primarily to get convenient
access to a version of the result that is suitable for visualization using
data
, even if
a roi
was used, but still allow access to the raw result using
raw_data
attribute.
The detailed rules for buffer declarations, get_result_buffers
and get_results
are:
All buffers are declared in
get_result_buffers
If a buffer is only computed in
get_results
, it should be marked viause='result_only'
so it isn’t allocated on workersIf a buffer is only used as intermediary result, it should be marked via
use='private'
Not including a buffer in
get_results
means it will either be passed on unchanged, or dropped ifuse='private'
It’s an error to omit an
use='result_only'
buffer inget_results
It’s an error to include a
use='private'
buffer inget_results
All results are returned from
Context.run_udf
asBufferWrapper
instancesBy default, if
get_results
is not implemented,use='private'
buffers are dropped, and others are passed through unchanged
New in version 0.7.0: UDF.get_results()
and the use
argument for UDF.buffer()
were added.
Pre-processing
Pre-processing allows to initialize result buffers before processing or merging.
This is particularly useful to set up dtype=object
buffers, for example
ragged arrays, or to initialize buffers for operations where the neutral element
is not 0. libertem.udf.base.UDFPreprocessMixin.preprocess()
is executed after
all buffers are allocated, but before the data is processed. On the worker nodes
it is executed with views set for the whole partition masked by the current ROI.
On the central node it is executed with views set for the whole dataset masked
by the ROI.
New in version 0.3.0.
Changed in version 0.5.0: libertem.udf.base.UDFPreprocessMixin.preprocess()
is executed on the main
node, too. Views for aux data are set correctly on the main node. Previously,
it was only executed on the worker nodes.
Valid data masking
In various intermediate states, result buffers may be only partially filled with valid data. This comes from the fact that we split the dataset into partitions and then merge results together only as they become available. When doing live processing we also have the constraint that not all frames have been acquired yet, and so naturally only positions where we have received and processed data can be valid in the corresponding result buffer.
Handling partially valid data becomes important when running the UDF. For example,
live plotting needs to know what regions of the results to include. Also, when using
run_udf_iter()
you might only want to handle the already valid
parts of the results.
For UDF results, you can access
valid_mask
to access
the valid mask for that particular buffer. You can also use
masked_data
to access the data of the buffer as a numpy masked array.
For example:
# use `run_udf_iter` to get partial results while the computation is running:
for res in ctx.run_udf_iter(dataset=dataset, udf=SumSigUDF()):
# get the BufferWrapper for the 'intensity' buffer of the first UDF:
buf = res.buffers[0]['intensity']
# boolean mask that contains True for valid elements:
mask = buf.valid_mask
# get a numpy masked array that is limited to the valid elements:
masked_data = buf.masked_data
By default, the valid masks of results depend on the buffer kind:
Buffer |
Default valid mask |
---|---|
|
All-valid |
|
All-valid |
|
Parts which have been merged using |
In your UDF, you can also customize the valid mask that should be returned.
This is useful if the dataset might contain invalid frames discovered at runtime,
or if intermediate results cannot be fully valid due to missing information (e.g.
calculating gradients across partition boundaries). Another example is when using
a kind='single'
buffer which is progressively computed; valid-by-default
might be incorrect if not enough information has been seen (e.g. statistical measures
or a count of values).
Customizing the valid data mask is done in get_results()
with the with_mask()
method. For example:
class CustomValidMask(UDF):
def get_result_buffers(self):
nav_shape = self.meta.dataset_shape.nav
if self.meta.roi is not None:
nav_shape = (count_nonzero(self.meta.roi),)
return {
# Emulate a kind='nav' result buffer using a kind='sig' buffer for demonstration.
# In the real world, such result buffers can be found in the implementations of ptychography
# and iCoM since they reconstruct the object in Fourier space
'custom_2d': self.buffer(kind='single', dtype=np.float32, extra_shape=nav_shape),
}
def process_frame(self, frame):
self.results.custom_2d[self.meta.coordinates] = np.sum(frame)
def get_results(self):
# in this case, we set the valid mask of the kind=single buffer to
# match what a nav buffer would do.
valid_nav_mask = self.meta.get_valid_nav_mask()
return {
'custom_2d': self.with_mask(
self.results.custom_2d,
mask=valid_nav_mask.reshape(self.results.custom_2d.shape)
),
}
def merge(self, dest, src):
dest.custom_2d += src.custom_2d
ctx.run_udf(dataset=dataset, udf=CustomValidMask())
This example also shows the get_valid_nav_mask()
method, which is available in merge()
and
get_results()
, and gives you a mask of the
already-processed navigation elements.
New in version 0.14.0: In previous versions, only libertem.udf.base.UDFResults.damage
was
available for inspecting the mask of already processed data.
This did not account for differences between buffers, and was only available
when iterating over results, i.e. with run_udf_iter()
.
AUX data
If a parameter is an instance of BufferWrapper
that was created using the aux_data()
class method, the
UDF interface will interpret it as auxiliary data. It will set the views for
each tile/frame/partition accordingly so that accessing the parameter returns a
view of the auxiliary data matching the data portion that is currently being
processed. That way, it is possible to pass parameters individually for each
frame or to mask the signal dimension.
Note that the BufferWrapper
instance for AUX
data should always be created using the aux_data()
class
method and not directly by instantiating a
BufferWrapper
since
aux_data()
ensures that it is set up correctly.
For masks in the signal dimension that are used for dot products in combination
with per-tile processing, a MaskContainer
allows
to use more advanced slicing and transformation methods targeted at preparing
mask stacks for optimal dot product performance.
Task data
A UDF can generate task-specific intermediate data on the worker nodes by
defining a get_task_data()
method. The result is
available as an instance of UDFData
in
self.task_data
. Depending on the circumstances, this can be more
efficient than making the data available as a parameter since it avoids
pickling, network transport and unpickling.
This non-trivial example from
libertem_blobfinder.udf.correlation.SparseCorrelationUDF
creates
a MaskContainer
based on the parameters in
self.params
. This MaskContainer
is then
available as self.task_data.mask_container
within the processing
functions.
def get_task_data(self):
match_pattern = self.params.match_pattern
crop_size = match_pattern.get_crop_size()
size = (2 * crop_size + 1, 2 * crop_size + 1)
template = match_pattern.get_mask(sig_shape=size)
steps = self.params.steps
peak_offsetY, peak_offsetX = np.mgrid[-steps:steps + 1, -steps:steps + 1]
offsetY = self.params.peaks[:, 0, np.newaxis, np.newaxis] + peak_offsetY - crop_size
offsetX = self.params.peaks[:, 1, np.newaxis, np.newaxis] + peak_offsetX - crop_size
offsetY = offsetY.flatten()
offsetX = offsetX.flatten()
stack = functools.partial(
masks.sparse_template_multi_stack,
mask_index=range(len(offsetY)),
offsetX=offsetX,
offsetY=offsetY,
template=template,
imageSizeX=self.meta.dataset_shape.sig[1],
imageSizeY=self.meta.dataset_shape.sig[0]
)
# CSC matrices in combination with transposed data are fastest
container = MaskContainer(mask_factories=stack, dtype=np.float32,
use_sparse='scipy.sparse.csc')
kwargs = {
'mask_container': container,
'crop_size': crop_size,
}
return kwargs
Meta information
Advanced processing routines may require context information about the processed
data set, ROI and current data portion being processed. This information is
available as properties of the libertem.udf.base.UDF.meta
attribute of type
UDFMeta
.
Input data shapes and types
Common applications include allocating buffers with a dtype
or shape
that matches the dataset or partition via
dataset_dtype
,
input_dtype
,
dataset_shape
and
partition_shape
.
Device class
New in version 0.6.0.
The currently used compute device class can be accessed through
libertem.udf.base.UDFMeta.device_class
. It defaults to ‘cpu’ and can be ‘cuda’
for UDFs that make use of CuPy support support.
ROI and current slice
For more advanced applications, the ROI and currently processed data portion are
available as libertem.udf.base.UDFMeta.roi
and
libertem.udf.base.UDFMeta.slice
. This allows to replace the built-in masking
behavior of BufferWrapper
for result buffers
and aux data with a custom implementation. The mask container for tiled
processing example makes use of these attributes to employ a
libertem.common.container.MaskContainer
instead of a shape="sig"
buffer in order to optimize dot product performance and support sparse masks.
The slice is in the reference frame of the dataset, masked by the current ROI, with flattened navigation dimension. This example illustrates the behavior by implementing a custom version of the simple “sum over sig” example. It allocates a custom result buffer that matches the navigation dimension as it appears in processing:
import numpy as np
from libertem.udf import UDF
class PixelsumUDF(UDF):
def get_result_buffers(self):
if self.meta.roi is not None:
navsize = np.count_nonzero(self.meta.roi)
else:
navsize = np.prod(self.meta.dataset_shape.nav)
return {
'pixelsum_nav_raw': self.buffer(
kind="single",
dtype=self.meta.dataset_dtype,
extra_shape=(navsize, ),
)
}
def merge(self, dest, src):
dest.pixelsum_nav_raw[:] += src.pixelsum_nav_raw
def process_frame(self, frame):
np_slice = self.meta.slice.get(nav_only=True)
self.results.pixelsum_nav_raw[np_slice] = np.sum(frame)
Coordinates
New in version 0.6.0.
The coordinates of the current frame, tile or partition within the true dataset
navigation dimension, as opposed to the current slice that is given in flattened
nav dimensions with applied ROI, is available through
coordinates
. The following UDF simply
collects the coordinate info for demonstration purposes. A real-world example
that uses the coordinates is the UDF implementation of single side band
ptychography.
import numpy as np
from libertem.udf import UDF
class CoordUDF(UDF):
def get_result_buffers(self):
# Declare a buffer that fits the coordinates,
# i.e. one int per nav axis for each nav position
nav_dims = len(self.meta.dataset_shape.nav)
return {
'coords': self.buffer(
kind="nav",
dtype=int,
extra_shape=(nav_dims, ),
)
}
def process_tile(self, tile):
# Simply copy the coordinates into
# the result buffer
self.results.coords[:] = self.meta.coordinates
my_roi = np.zeros(dataset.shape.nav, dtype=bool)
my_roi[7, 13] = True
my_roi[11, 3] = True
res = ctx.run_udf(
dataset=dataset,
udf=CoordUDF(),
roi=my_roi
)
assert np.all(
res['coords'].raw_data == np.array([(7, 13), (11, 3)])
)
Preferred input dtype
New in version 0.4.0.
UDFs can override get_preferred_input_dtype()
to
indicate a “lowest common denominator” compatible dtype. The actual input dtype
is determined by combining the indicated preferred dtype with the input
dataset’s native dtype using numpy.result_type()
. The default preferred
dtype is numpy.float32
. Returning UDF.USE_NATIVE_DTYPE
, which is
currently identical to numpy.bool
, will switch to the dataset’s native
dtype since numpy.bool
behaves as a neutral element in
numpy.result_type()
.
If an UDF requires a specific dtype rather than only preferring it, it should override this method and additionally check the actual input type, throw an error when used incorrectly and/or implement a meaningful conversion in its processing routine since indicating a preferred dtype doesn’t enforce it. That way, unsafe conversions are performed explicitly in the UDF rather than indirectly in the back-end.
CuPy support
New in version 0.6.0.
LiberTEM can use CUDA devices through CuPy. Since CuPy largely replicates the NumPy array interface, any UDF that uses NumPy for its main processing can likely be ported to use both CPUs and CUDA devices in parallel. Some adjustments are often necessary to account for minor differences between NumPy and CuPy. CuPy is most beneficial for compute-heavy tasks with good CUDA math library support such as large Fourier transforms or matrix products.
In order to activate CuPy processing, a UDF can overwrite the
get_backends()
method. By default this returns
('numpy',)
, indicating only NumPy support. By returning ('numpy',
'cupy')
or ('cupy',)
, a UDF activates being run on both CUDA and CPU
workers, or exclusively on CUDA workers. Using cuda
instead of
cupy
schedules on CUDA workers, but without using the CuPy library. This
is useful for running code that uses CUDA in a different way, for example
integration of C++ CUDA code, and allows to skip installation of CuPy in this
situation.
The libertem.udf.base.UDF.xp
property points to the numpy
or
cupy
module, depending which back-end is currently used. By using
self.xp
instead of the usual np
for NumPy, one can write UDFs
that use the same code for CUDA and CPU processing.
Result buffers can be declared as device arrays by setting
self.buffer(..., where='device')
in
get_result_buffers()
. That allows to keep data in
the device until a partition is completely processed and the result is exported
to the leader node.
The input argument for process_*()
functions is already provided as a
CuPy array instead of NumPy array if CuPy is used.
A UDF should only use one GPU at a time. If cupy
is used, the correct
device to use is set within CuPy in the back-end and should not be modified in
the UDF itself. If cuda
is used, it is the responsibility of the user to
set the device ID to the value returned by
libertem.common.backend.get_use_cuda()
. The environment variable
CUDA_VISIBLE_DEVICES
can be set before any CUDA library is loaded to
control which devices are visible.
The run_udf()
method allows setting the
backends
attribute to ('numpy',)
('cupy',)
or ('cuda',)
to
restrict execution to CPU-only or CUDA-only on a hybrid cluster. This is mostly
useful for testing.
Sparse arrays
New in version 0.11.0.
As an extension of CuPy support, LiberTEM also supports supplying UDFs with
tiles in sparse array formats for both CPU and GPU. A UDF specifies the
supported array backends by overwriting
get_backends()
to return an iterable with the
supported formats in order of preference. Each array format is associated with a
device class so that CuPy support works analogously for both dense and sparse
formats. Dense CPU and GPU arrays are specified with the backends described in
CuPy support so that the extension to sparse arrays is backwards-compatible.
The possible backends supported by LiberTEM are available as the following
constants: libertem.udf.base.UDF.BACKEND_NUMPY
,
BACKEND_CUPY
,
BACKEND_CUDA
,
BACKEND_SPARSE_COO
,
BACKEND_SPARSE_GCXS
,
BACKEND_SPARSE_DOK
,
BACKEND_SCIPY_COO
,
BACKEND_SCIPY_CSR
,
BACKEND_SCIPY_CSC
,
BACKEND_SCIPY_COO_ARRAY
,
BACKEND_SCIPY_CSR_ARRAY
,
BACKEND_SCIPY_CSC_ARRAY
,
BACKEND_CUPY_SCIPY_COO
,
BACKEND_CUPY_SCIPY_CSR
,
BACKEND_CUPY_SCIPY_CSC
.
BACKEND_ALL
is a tuple of all backends that are
recommended for use in UDFs.
New in version 0.12.0: The sets CPU_BACKENDS
,
CUDA_BACKENDS
, and
CUPY_BACKENDS
categorize backends by device class.
SPARSE_BACKENDS
and
DENSE_BACKENDS
categorize between dense and
sparse arrays.
The backends in D2_BACKENDS
only support 2D
matrices, while the ones in ND_BACKENDS
support
n-dimensional arrays.
New in version 0.15.0: BACKEND_SCIPY_COO_ARRAY
,
BACKEND_SCIPY_CSR_ARRAY
, and
BACKEND_SCIPY_CSC_ARRAY
are added.
The frame, tile or partition is supplied with flattened signal dimensions for 2D-only backends. Furthermore, frames include a nav dimension of 1 with these backends.
The backend that is used for a partition is available through
libertem.udf.base.UDF.meta.array_backend
at runtime. Please note that it can be
different between partitions.
Internally, LiberTEM calculates an execution plan that matches the capabilities of all UDFs in a run with the capabilities of the dataset and the device class so that conversion overheads are minimized. LiberTEM can process data in sparse form from start to finish if a dataset that can produce tiles in a sparse format, such as Raw binary files in sparse CSR format, is combined with a set of UDFs that all support a sparse backend.
Since operations on sparse arrays often use an API modelled after NumPy, but
return data in various backends that may or may not allow direct assignment into
a result buffer, the libertem.udf.base.UDF.forbuf()
method converts an
array to a backend that is suitable for assigning into the specified result
buffer. It also takes care of reshaping from 2D to nD as necessary.
Simplified example implementation based on libertem.udf.sumsigudf.SumSigUDF
that
demonstrates how support for all array formats can be implemented:
class SumSigUDF(UDF):
def get_backends(self):
# Support all recommended array backends in LiberTEM.
# Please note that their APIs can differ so that comprehensive
# tests with all supported backends are required. Also, the list of
# supported backends in LiberTEM may grow in the future, which can
# require adaptations in an UDF that uses this constant.
return self.BACKEND_ALL
def get_result_buffers(self):
dtype = np.result_type(self.meta.input_dtype, np.float32)
return {
'intensity': self.buffer(
kind="nav", dtype=dtype, where='device'
),
}
def process_tile(self, tile):
# Show the backend that is currently used
print(self.meta.array_backend)
# Note the following points:
# * Using self.forbuf(arr, target) to make the result
# compatible with the result buffer.
# * Preemptively flatten the sig dimensions so that
# 2D and nD arrays work the same.
# * Work around API peculiarities, such as the axis keyword
# that is only partially supported in cupyx.scipy.sparse.
self.results.intensity[:] += self.forbuf(
np.sum(
# Flatten and sum axis 1 for cupyx.scipy.sparse support
tile.reshape((tile.shape[0], -1)),
axis=1
),
self.results.intensity
)
# Empty memory dataset for testing that returns SCIPY_CSR tiles
ds = ctx.load(
'memory',
datashape=(23, 42, 17, 4),
sig_dims=2,
array_backends=(UDF.BACKEND_SCIPY_CSR, ),
num_partitions=2,
)
ctx.run_udf(dataset=ds, udf=SumSigUDF())
scipy.sparse.csr_matrix
scipy.sparse.csr_matrix
See the implementation of libertem.udf.masks.ApplyMasksUDF
and
libertem.udf.stddev.StdDevUDF
for non-trivial examples of UDFs
that support a wide range of array formats!
Note
The underlying library for array type detection and conversion is available independent of LiberTEM at https://github.com/LiberTEM/sparseconverter/.
Threading
By default, LiberTEM uses multiprocessing with one process per CPU core for
offline processing, using the class DaskJobExecutor
.
In that scenario, UDFs should only use a single thread to avoid oversubscription.
However, when running with a single-process single-thread executor like
InlineJobExecutor
, multiple threads can be
used. In some cases this might be advantageous in combination with the inline executor.
The thread count for many common numerics
libraries is set automatically by LiberTEM, see
threads_per_worker
. For other cases the
thread count on a worker should be set by the user according to
self.meta.threads_per_worker
.
Multithreaded executors are introduced with release 0.9.0, see Executors. They run UDF functions in parallel threads within the same process. This can introduce issues with thread safety, for example shared objects being changed concurrently by multiple threads. The LiberTEM internals and core UDFs are tested to work with these executors, but user code may break unexpectedly. PyFFTW interface caching is a known issue of this category. For that reason, the threaded executors should be considererd experimental for the time being. Furthermore, setting and re-setting any global variable, for example the thread count of an external library, should be protected with a reentrant locking mechanism.
The pyFFTW cache is disabled with threaded executors because of this known bug. That can have a negative impact on performance. For performance optimization with pyFFTW, users could use the builder interface of PyFFTW or use the native FFTW object interface.
Running multiple LiberTEM Context
objects or executors
in parallel threads is not tested and can lead to unexpected interactions.
Auto UDF
The AutoUDF
class and map()
method allow to run simple functions that accept a frame as the only parameter
with an auto-generated kind="nav"
result buffer over a dataset ad-hoc
without defining an UDF class. For more advanced processing, such as custom
merge functions, post-processing or performance optimization through tiled
processing, defining an UDF class is required.
As an alternative to Auto UDF, you can use the
make_dask_array()
method to create
a dask.array from
a DataSet
to perform calculations. See
Integration with Dask arrays for more details.
The AutoUDF
class determines the output shape and type
by calling the function with a mock-up frame of the same type and shape as
a real detector frame and converting the return value to a NumPy array. The
extra_shape
and dtype
parameters for the result buffer are
derived automatically from this NumPy array.
Additional constant parameters can be passed to the function via
functools.partial()
, for example. The return value should be much smaller
than the input size for this to work efficiently.
Example: Calculate sum over the last signal axis.
import functools
result = ctx.map(
dataset=dataset,
f=functools.partial(np.sum, axis=-1)
)
# or alternatively:
from libertem.udf import AutoUDF
udf = AutoUDF(f=functools.partial(np.sum, axis=-1))
result = ctx.run_udf(dataset=dataset, udf=udf)
One-step merge (merge_all)
New in version 0.9.0.
Note
The interface described here is experimental and therefore subject to change. See Merge function for Dask array results for more information.
With the release of the DelayedJobExecutor
UDFs have the option to define a method merge_all()
which is
used by this executor to perform a one-step merge of the results from all
partitions. This is only applied by this executor, and as a result is only
used when computing with dask.delayed
.
In the case of kind='nav'
result buffers, only, and no custom merge logic,
the merge_all
implementation is automatically provided by the base
UDF class. If no merge_all
is provided, the standard merge
function is used via a different mechanism.
Note
When using merge_all
, no attempt is made to verify that it functions
identically to the merge function, which remains a requirement of the
UDF implementation if implementing a custom merge and using other executors.
The merge_all
function must have the following signature:
def merge_all(self, ordered_results):
...
return {result_name: merged_result_array, ...}
where ordered_results
is an ordered dictionary mapping between the
Slice
for each partition and a dictionary
of partial results keyed by result_name.
An example merge_all
to sum all partial results for a kind='sig'
result buffer named 'intensity'
would be the following:
def merge_all(self, ordered_results):
intensity = np.stack([b.intensity for b in ordered_results.values()]).sum(axis=0)
return {
'intensity': intensity
}
The ordering of the partial results is such that the Slice
objects which are the dictionary keys are sequential in the flattened navigation
dimension. The user can therefore safely concatenate the partial results
for a given result buffer to get a whole-analysis-sized array with a flat
navigation dimension. In the case of kind='nav'
buffers the returned
arrays must be in this same flat navigation shape.
When an ROI has been provided while running the UDF, the number of elements in
the partial results will correspond to the number of valid ROI pixels; the concatenated
result will be reshaped into a full dataset-sized array by LiberTEM
after merge_all
has been called.
Note
When using DelayedJobExecutor
and
in particular merge_all
, the user is operating directly on
dask.array.Array
objects built for lazy computation. The Dask API is largely compatible
with numpy, and will lazily build a task graph from normal numpy
functions (e.g. np.stack(arrays).sum()
above). However, care
must be taken to avoid triggering eager execution accidentally, for
example by casting using Python builtins such as int(dask_value)
.
For a degree of certainty, the user is encouraged to consider the
Dask Array API when
building merge_all functions. The same advice applies to any
post-processing applied after merging (Post-processing after merging).
The return value from merge_all
must be a dictionary of merged result arrays
with the keys matching the declared result buffers. There is, however, no requirement
to return merged results for all existing buffers, though any that are missing will not
contain results from the computation and are likely to be filled with zeros.
Dynamically updating UDF parameters
New in version 0.12.0: This is an experimental API, which we make explicit by naming
the method update_parameters_experimental
. It is not
guaranteed to work with all UDFs.
Running UDFs in a feedback loop, for example on a live data stream using LiberTEM-live, is not only helpful for adjusting microscope parameters on the fly to match the reconstruction, but also for adjusting and fine tuning UDF parameters. Usually, this would be implemented like this:
while True:
udf = YourUDF(**get_new_parameters())
ctx.run_udf(dataset=ds, udf=udf, plots=True)
This works, but limits the feedback loop to update once per complete run, which can be annoying especially for longer dwell times.
Instead, the feedback loop can be shortened by updating parameters as soon as
the first results are available. This is accomplished by using
run_udf_iter()
instead of
run_udf()
, and calling the
update_parameters_experimental()
method:
udf = YourUDF(**parameters)
while True:
result_iter = ctx.run_udf_iter(dataset=ds, udf=[udf])
for part_res in result_iter:
result_iter.update_parameters_experimental([
{'some_parameter': get_new_value()},
])
Note that this works when passing multiple UDFs into
run_udf_iter()
as a list -
we need to pass a dict
for each of them into
update_parameters_experimental()
.
The dict
only needs to contain items for the parameters you want to update,
and can be an empty dict
if you don’t want to update any parameters
for the corresponding UDF.
The call to get_new_value
here can be any mechanism to get an updated
value, be it querying a graphical user interface, or automatically calculating
a new value based on the latest result from the UDFs.
Note
This API is implemented for the following executor types: 'inline'
,
'dask'
, 'pipelined'
, and 'threads'
, as specified
in make_with()
. Meaning that if you don’t
specify an executor, with either Context
or
LiveContext
from LiberTEM-live, it should just
work.
Note also that different executors schedule tasks differently, so especially in the offline case, there may be a bit more latency before the new values for parameters are available in the UDFs.