Implementing a UDF
The workflow for implementing a UDF starts with subclassing
UDF
. In the simplest case, you need to implement the
get_result_buffers()
method and
process_frame()
.
There are two very common patterns for reductions, reducing over the navigation axes into a common accumulator for all frames, keeping the shape of a single frame, or reducing over the signal axes and keeping the navigation axes.
Declaring buffers
A UDF can implement one of these reductions or combinations. To handle indexing
for you, LiberTEM needs to know about the structure of your reduction. You can
build this structure in the get_result_buffers()
method, by declaring one or more buffers.
These buffers can have a kind
declared, which corresponds to the two
reduction patterns above: kind="sig"
for reducing over the navigation
axes (and keeping the signal axes), and kind="nav"
for reducing over the
signal axes and keeping the navigation axes. There is a third,
kind="single"
, which allows to declare buffers with custom shapes that
don’t correspond directly to the data set’s shape.
It is also possible to append additional axes to the buffer’s shape using the
extra_shape
parameter.
get_result_buffers()
should return a dict
which maps
buffer names to buffer declarations. You can create a buffer declaration by calling
the buffer()
method.
The buffer name is later used to access the buffer via self.results.<buffername>
,
which returns a view into a NumPy array. For this to work, the name has to be a valid Python
identifier.
Examples of buffer declarations:
def get_result_buffers(self):
# Suppose our dataset has the shape (14, 14, 32, 32),
# where the first two dimensions represent the navigation
# dimension and the last two dimensions represent the signal
# dimension.
buffers = {
# same shape as navigation dimensions of dataset, plus two
# extra dimensions of shape (3, 2). The full shape is
# (14, 14, 3, 2) in this example. This means this buffer can
# store an array of shape (3, 2) for each frame in the dataset.
"nav_buffer": self.buffer(
kind="nav",
extra_shape=(3, 2),
dtype="float32",
),
# same shape as signal dimensions of dataset, plus an extra
# dimension of shape (2,). Consequently, the full shape is
# (32, 32, 2) in this example. That means we can store two
# float32 values for each pixel of the signal dimensions.
"sig_buffer": self.buffer(
kind="sig",
extra_shape=(2,),
dtype="float32",
),
# buffer of shape (16, 16); shape is unrelated to dataset shape
"single_buffer": self.buffer(
kind="single",
extra_shape=(16, 16),
dtype="float32",
),
}
return buffers
See below for some more real-world examples.
All NumPy dtypes are supported for buffers. That includes the object
dtype for arbitrary Python variables. The item just has to be pickleable with
cloudpickle
.
Note that buffers are only designed to pass lightweight intermediate results and thus, it is important that the size of the buffer remains small. Having too large buffers can lead to significant decline in performance.
Implementing the processing function
Now to the actual core of the processing: implementing
process_frame()
. The method signature looks like this:
class ExampleUDF(UDF):
def process_frame(self, frame):
...
The general idea is that you get a single frame from the data set, do your processing,
and write the results to one of the previously declared buffers, via self.results.<buffername>
.
When accessing a kind="nav"
buffer this way, you automatically get a view into the buffer
that corresponds to the current frame that is being processed. In case of kind="sig"
or kind="single"
, you get the whole buffer.
Intuitively, with kind="sig"
(and kind="single"
), you are most
likely implementing an operation like buf = f(buf, frame)
. That is, you
are computing a new result based on a single (new) frame and the results from all
previous frames, and overwrite the results with the new value(s).
With kind="nav"
, you compute independent results for each frame,
which are written to different positions in the result buffer. Because of the independence
between frames, you don’t need to merge with a previous value; the result is simply written
to the correct index in the result buffer (via the aforementioned view).
As an easy example, let’s have a look at a function that simply sums up each frame
to a single value. This is a kind="nav"
reduction, as we sum over all values
in the signal dimensions:
import numpy as np
from libertem.udf import UDF
class SumOverSig(UDF):
def get_result_buffers(self):
"""
Describe the buffers we need to store our results:
kind="nav" means we want to have a value for each coordinate
in the navigation dimensions. We name our buffer 'pixelsum'.
"""
return {
'pixelsum': self.buffer(
kind="nav", dtype="float32"
)
}
def process_frame(self, frame):
"""
Sum up all pixels in this frame and store the result in the
`pixelsum` buffer. `self.results.pixelsum` is a view into the
result buffer we defined above, and corresponds to the entry
for the current frame we work on. We don't have to take care
of finding the correct index for the frame we are processing
ourselves.
"""
self.results.pixelsum[:] = np.sum(frame)
res = ctx.run_udf(
udf=SumOverSig(),
dataset=dataset,
)
# to access the named buffer as a NumPy array:
res['pixelsum'].data
On a 4D data set, this operation is roughly equivalent to np.sum(arr, axis=(2, 3))
.
Merging partial results
As described above, data from multiple partitions is
processed in parallel. That also means that we need a way of merging partial
results into the final result. In the example above, we didn’t need to do anything:
we only have a kind="nav"
buffer, where merging just means assigning the
result of one partition to the right slice in the final result. This is done by
the default implementation of merge()
.
In case of kind="sig"
buffers and the corresponding reduction, assignment would
just overwrite the result from the previous partition with the one from the current partition,
and is not the correct operation. So let’s have a look at the merge method:
class ExampleUDF(UDF):
def merge(self, dest, src):
pass
dest
is the result of all previous merge calls, and src
is the
result from a single new partition. Your merge
implementation should read from both
dest
and src
and write the result back to dest
.
Here is an example demonstrating kind="sig"
buffers and the merge
function:
import numpy as np
from libertem.udf import UDF
class MaxUDF(UDF):
def get_result_buffers(self):
"""
Describe the buffers we need to store our results:
kind="sig" means we want to have a value for each coordinate
in the signal dimensions (i.e. a value for each pixel of the
diffraction patterns). We name our buffer 'maxbuf'.
"""
return {
'maxbuf': self.buffer(
kind="sig", dtype=self.meta.input_dtype
)
}
def preprocess(self):
"""
Initialize buffer with neutral element for maximum.
"""
self.results.maxbuf[:] = -np.inf
def process_frame(self, frame):
"""
In this function, we have a frame and the buffer `maxbuf`
available, which we declared above. This function is called
for all frames / diffraction patterns in the data set. The
maxbuf is a partial result, and all partial results will
later be merged (see below).
In this case, we determine the maximum from the current
maximum and the current frame, for each pixel in the
diffraction pattern.
Notes:
- You cannot rely on any particular order of frames this function
is called in.
- Your function should be pure, that is, it should not have side
effects beyond modifying the content of result buffers or task data,
and should only depend on it's input parameters, including
the UDF object :code:`self`.
"""
self.results.maxbuf[:] = np.maximum(frame, self.results.maxbuf)
def merge(self, dest, src):
"""
merge two partial results, from src into dest
"""
dest.maxbuf[:] = np.maximum(dest.maxbuf, src.maxbuf)
res = ctx.run_udf(
udf=MaxUDF(),
dataset=dataset,
)
# to access the named buffer as a NumPy array:
res['maxbuf'].data
For more complete examples, you can also have a look at the functions
implemented in the sub-modules of libertem.udf
and at our Package overview.
Passing parameters
By default, keyword arguments that are passed to the constructor of a UDF are
available as properties of self.params
. This allows clean handling and passing
of parameters in distributed execution scenarios, see below.
class MyUDF(UDF):
def process_frame(self, frame):
result = correlate_peaks(frame, self.params.peaks)
...
udf = MyUDF(peaks=peaks, other=...)
Declaring a constructor
In order to document parameters of an UDF and avoid typos with parameter names, it is good practice to define a constructor for any UDF that will be re-used and shared. Since functions of an UDF are executed both on the main node and on worker processes, UDFs will be pickled and sent over the network in the process. To avoid transferring unwanted state or unnecessary or unpicklable member variables, a clean copy of an UDF is created before transfer. This clean copy is created by re-instantiating the UDF class with the parameters that were passed to the constructor of the UDF base class.
That means a user-defined constructor has to fulfill two conditions:
It has to pass any parameters to the superclass constructor.
It has to accept exactly the parameters that it passed to the superclass constructor whenever a clean copy is created and behave the same way as in the original call.
That means modification of parameters other than assigning values for default parameters can lead to complications and should be avoided, if possible.
class MyParamUDF(UDF):
'''
This UDF demonstrates how to define a constructor for a UDF.
Parameters
----------
my_param
A parameter that is passed into the UDF for demonstration purposes.
It is mirrored in the `demo` result buffer.
'''
def __init__(self, my_param=None):
# Assigning a default parameter works,
# provided the UDF accepts it in subsequent calls
# exactly as it passed it to the superclass:
if my_param is None:
my_param = "Eins, zwei drei!"
# !!!DON'T!!! self.my_param = my_param
# !!!DON'T!!! super().__init__()
# We have to pass it to the superclass constructor instead.
# This makes sure it will be available via self.params and
# on clean copies.
# !!!DON'T!!! super().__init__(other_param=my_param)
# This would trigger a TypeError when
# MyParamUDF(other_param=my_param) is called for a clean copy.
# DO pass all parameters to the superclass exactly as this
# class would accept them
super().__init__(my_param=my_param)
# The rest of this UDF just mirrors the parameter
# back in a result buffer for demonstration.
def get_result_buffers(self):
'''
Declare a result buffer for the parameter
'''
return {
'demo': self.buffer(
kind='single',
dtype=object
),
}
def process_frame(self, frame):
'''
We assign the parameter to the result buffer
'''
self.results.demo[:] = self.params.my_param
def merge(self, dest, src):
'''
We pass through the result buffer
'''
dest.demo[:] = src.demo
res = ctx.run_udf(dataset=dataset, udf=MyParamUDF())
assert res['demo'].data[0] == "Eins, zwei drei!"
res2 = ctx.run_udf(dataset=dataset, udf=MyParamUDF(my_param=42))
assert res2['demo'].data[0] == 42
Initializing result buffers
To allow a UDF to initialize a result buffer to the correct values,
the method preprocess()
can be implemented. It is run once per partition and assigning to
kind="nav"
result buffers will assign to the results of the
whole partition. See MaxUDF
above for an example.
Running UDFs
As shown in the examples above, the run_udf()
method
of Context
is used to run UDFs. Usually, you only need to
pass an instance of your UDF and the dataset you want to run on:
udf = YourUDF(param1="value1")
res = ctx.run_udf(udf=udf, dataset=dataset)
You can also enable a progress bar:
udf = YourUDF(param1="value1")
res = ctx.run_udf(udf=udf, dataset=dataset, progress=True)
run_udf()
returns a dict
, having the buffer
names as keys (as defined in get_result_buffers()
) and
BufferWrapper
instances as values. You
can use these in any place you would use a NumPy array, for example as an argument to
NumPy functions, or you can explicitly convert them to NumPy arrays by accessing
the .data
attribute, or by calling numpy.array()
:
import numpy as np
res = ctx.run_udf(udf=udf, dataset=dataset)
# convert to NumPy array, assuming we declared a buffer
# with name `buf1`:
arr = res['buf1'].data
arr = np.array(res['buf1'])
# or directly treat as array:
np.sum(res['buf1'])
If you want to run multiple independent UDFs on a single DataSet
,
you can pass in a list of UDFs to run_udf()
. This can be faster
than making two passes over the whole DataSet
:
from libertem.udf.sum import SumUDF
# results are returned as a list of dictionaries, so we can unpack them here
# into `res` and `res_sum`:
res, res_sum = ctx.run_udf(udf=[udf, SumUDF()], dataset=dataset)
Note the return type: if you pass a list of UDFs, you also get a list of
dictionaries as a result, with the ordering of results matching the ordering of
UDFs in the run_udf()
call.
Cancellation
New in version 0.12.0.
Especially when running UDFs on live data streams, it can happen that a UDF doesn’t run to completion. An example scenario is when a continuously running preview scan is stopped, or scan parameters are changed while the scan is running.
In this case, the exception
UDFRunCancelled
is raised, which you can
handle in an appropriate way for your application:
from libertem.common.exceptions import UDFRunCancelled
try:
res = ctx.run_udf(udf=udf, dataset=dataset)
except UDFRunCancelled:
# handle cancellation, for example, start the next UDF run
pass
Regions of interest
In addition, you can pass the roi
(region of interest) parameter, to
run your UDF on a selected subset of data. roi
can be specified in several ways:
a single coordinate in the navigation dimensions, e.g.
(3, 4)
which will restrict the analysis to this frame onlya sequence of coordinates and their associated truth value, e.g.
[((0, 0), True), ((2, 3), True)]
. In this format all truth values must be identical, and the roi background value will be set to its opposite. This allows quick specification of a small number of pixels to include or exclude from the analysis.a NumPy array containing a bool mask, having the shape of the navigation axes of the dataset.
any
scipy.sparse
matrix or array orpydata.sparse.SparseArray
format, again having the same shape as the naivgation axes of the dataset
For example, to process a random subset of a 4D-STEM dataset:
import numpy as np
# If your dataset has shape `(14, 14, 32, 32)` with two signal
# and two navigation dimensions, `dataset.shape.nav`
# translates to `(14, 14)`.
roi = np.random.choice(a=[False, True], size=dataset.shape.nav)
ctx.run_udf(udf=udf, dataset=dataset, roi=roi)
Note that the result array only contains values for the selected indices, all
other indices are set to nan
(or, if the dtype doesn’t support nan,
some other, not further defined value). It is best to limit further processing
to the same roi
.
You can also access a flat array that is not filled up with nan
using
.raw_data
:
res = ctx.run_udf(udf=udf, dataset=dataset, roi=roi)
res['buf1'].raw_data
Live Plotting
New in version 0.7.0.
LiberTEM can display a live plot of the UDF results. In the most simple case,
this can be done by setting plots=True
in
run_udf()
.
ctx.run_udf(dataset=dataset, udf=udf, plots=True)
See the following items for a full demonstration, including setting up fully customized plots. The API reference can be found in Visualization.
Partial results
New in version 0.7.0.
Instead of only getting the whole result after the UDF has finished running, you
can also use run_udf_iter()
to get a generator for
partial results:
for udf_results in ctx.run_udf_iter(dataset=dataset, udf=SumUDF()):
# ... do something interesting with `udf_results`:
a = np.sum(udf_results.buffers[0]['intensity'])
# after the loop, `udf_results` contains the final results as usual
While the UDF execution is running, the UDF object should not be modified since that leads to undefined behavior. In particular, nested or concurrent execution of the same UDF objects must be avoided since it modifies the buffers that are allocated internally while a UDF is running.
Asynchronous execution
It is also possible to integrate LiberTEM into an async script or application by
passing sync=False
to run_udf_iter()
or
run_udf()
:
async for udf_results in ctx.run_udf_iter(dataset=dataset, udf=udf, sync=False):
# ... do something interesting with `udf_results`:
a = np.sum(udf_results[0]['intensity'])
# or the version without intermediate results:
udf_results = await ctx.run_udf(dataset=dataset, udf=udf, sync=False)
See the items below for a more comprehensive demonstration and documentation: