User-defined functions¶

A common case for analyzing big EM data sets is running a reduction operation on each individual detector frame or other small subsets of a data set and then combining the results of these reductions to form the complete result. This should cover a wide range of use cases, from simple mathematical operations, for example statistics, to complex image processing and analysis, like feature extraction.

The user-defined functions (UDF) interface of LiberTEM allows you to run your own reduction functions easily, without having to worry about parallelizing, I/O, the details of buffer management and so on. This corresponds to a simplified MapReduce programming model, where the intermediate re-keying and shuffling step is omitted.

It can be helpful to review some general concepts before reading the following sections.

Getting started¶

The easiest way of running a function over your data is using the map() method of the LiberTEM API.

For example, to calculate the sum over the last signal axis:

import functools
import numpy as np

result = ctx.map(
dataset=dataset,
f=functools.partial(np.sum, axis=-1)
)
# access the result as NumPy array:
np.array(result)
# or, alternatively:
result.data


The function specified via the f parameter is called for each frame / diffraction pattern.

See Auto UDF for details. This is most suited for simple functions; once you have parameters or want to re-use some data across function calls, you should create a UDF subclass instead.

How UDFs works¶

To allow for parallel processing, data is first divided into partitions along the navigation axes, which are worked on by different worker processes. Then, for each frame of a partition, a user-defined function process_frame() is called, which is free to do any imaginable processing.

As a result of splitting the data set into partitions, the results then need to be merged back together. This is accomplished by calling the merge() method after all frames of a partition are processed.

In pseudocode, data is processed in the following way:

result = empty
for partition in get_partitions(dataset):
partition_result = empty
for frame, frame_slice in get_frames(partition):
frame_result = process_frame(frame)
partition_result[frame_slice] = frame_result
merge(dest=result, src=partition_result)


In reality, the loop over partitions is run in parallel using multiple worker processes, potentially on multiple computers. The loop over individual frames is run in the worker processes, and the merge function is run in the main process, accumulating the results, every time the results for a partition are available.

In addition to process_frame(), there are two more methods available for overriding, to work on larger/different units of data at the same time: process_tile() and process_partition(). They can be used for optimizing some operations, and are documented in the advanced topics section.

Implementing a UDF¶

The workflow for implementing a UDF starts with subclassing UDF. In the simplest case, you need to implement the get_result_buffers() method and process_frame().

There are two very common patterns for reductions, reducing over the navigation axes into a common accumulator for all frames, keeping the shape of a single frame, or reducing over the signal axes and keeping the navigation axes.

A UDF can implement one of these reductions or combinations. To handle indexing for you, LiberTEM needs to know about the structure of your reduction. You can build this structure in the get_result_buffers() method, by declaring one or more buffers.

Declaring buffers¶

These buffers can have a kind declared, which corresponds to the two reduction patterns above: kind="sig" for reducing over the navigation axes (and keeping the signal axes), and kind="nav" for reducing over the signal axes and keeping the navigation axes. There is a third, kind="single", which allows to declare buffers with custom shapes that don’t correspond directly to the data set’s shape.

It is also possible to append additional axes to the buffer’s shape using the extra_shape parameter.

get_result_buffers() should return a dict which maps buffer names to buffer declarations. You can create a buffer declaration by calling the buffer() method.

The buffer name is later used to access the buffer via self.results.<buffername>, which returns a view into a NumPy array. For this to work, the name has to be a valid Python identifier.

Examples of buffer declarations:

def get_result_buffers(self):
# Suppose our dataset has the shape (14, 14, 32, 32),
# where the first two dimensions represent the navigation
# dimension and the last two dimensions represent the signal
# dimension.

buffers = {
# same shape as navigation dimensions of dataset, plus two
# extra dimensions of shape (3, 2). The full shape is
# (14, 14, 3, 2) in this example. This means this buffer can
# store an array of shape (3, 2) for each frame in the dataset.
"nav_buffer": self.buffer(
kind="nav",
extra_shape=(3, 2),
dtype="float32",
),

# same shape as signal dimensions of dataset, plus an extra
# dimension of shape (2,). Consequently, the full shape is
# (32, 32, 2) in this example. That means we can store two
# float32 values for each pixel of the signal dimensions.
"sig_buffer": self.buffer(
kind="sig",
extra_shape=(2,),
dtype="float32",
),

# buffer of shape (16, 16); shape is unrelated to dataset shape
"single_buffer": self.buffer(
kind="single",
extra_shape=(16, 16),
dtype="float32",
),

}

return buffers


See below for some more real-world examples.

All NumPy dtypes are supported for buffers. That includes the object dtype for arbitrary Python variables. The item just has to be pickleable with cloudpickle.

Note that buffers are only designed to pass lightweight intermediate results and thus, it is important that the size of the buffer remains small. Having too large buffers can lead to significant decline in performance.

Implementing the processing function¶

Now to the actual core of the processing: implementing process_frame(). The method signature looks like this:

class ExampleUDF(UDF):
def process_frame(self, frame):
...


The general idea is that you get a single frame from the data set, do your processing, and write the results to one of the previously declared buffers, via self.results.<buffername>. When accessing a kind="nav" buffer this way, you automatically get a view into the buffer that corresponds to the current frame that is being processed. In case of kind="sig" or kind="single", you get the whole buffer.

Intuitively, with kind="sig" (and kind="single"), you are most likely implementing an operation like buf = f(buf, frame). That is, you are computing a new result based on a single (new) frame and the results from all previous frames, and overwrite the results with the new value(s).

With kind="nav", you compute independent results for each frame, which are written to different positions in the result buffer. Because of the independence between frames, you don’t need to merge with a previous value; the result is simply written to the correct index in the result buffer (via the aforementioned view).

As an easy example, let’s have a look at a function that simply sums up each frame to a single value. This is a kind="nav" reduction, as we sum over all values in the signal dimensions:

import numpy as np
from libertem.udf import UDF

class SumOverSig(UDF):
def get_result_buffers(self):
"""
Describe the buffers we need to store our results:
kind="nav" means we want to have a value for each coordinate
in the navigation dimensions. We name our buffer 'pixelsum'.
"""
return {
'pixelsum': self.buffer(
kind="nav", dtype="float32"
)
}

def process_frame(self, frame):
"""
Sum up all pixels in this frame and store the result in the
pixelsum buffer. self.results.pixelsum is a view into the
result buffer we defined above, and corresponds to the entry
for the current frame we work on. We don't have to take care
of finding the correct index for the frame we are processing
ourselves.
"""
self.results.pixelsum[:] = np.sum(frame)

res = ctx.run_udf(
udf=SumOverSig(),
dataset=dataset,
)

# to access the named buffer as a NumPy array:
res['pixelsum'].data


On a 4D data set, this operation is roughly equivalent to np.sum(arr, axis=(2, 3)).

As described above, data from multiple partitions is processed in parallel. That also means that we need a way of merging partial results into the final result. In the example above, we didn’t need to do anything: we only have a kind="nav" buffer, where merging just means assigning the result of one partition to the right slice in the final result. This is done by the default implementation of merge().

In case of kind="sig" buffers and the corresponding reduction, assignment would just overwrite the result from the previous partition with the one from the current partition, and is not the correct operation. So let’s have a look at the merge method:

class ExampleUDF(UDF):
def merge(self, dest, src):
pass


dest is the result of all previous merge calls, and src is the result from a single new partition. Your merge implementation should read from both dest and src and write the result back to dest.

Here is an example demonstrating kind="sig" buffers and the merge function:

import numpy as np
from libertem.udf import UDF

class MaxUDF(UDF):
def get_result_buffers(self):
"""
Describe the buffers we need to store our results:
kind="sig" means we want to have a value for each coordinate
in the signal dimensions (i.e. a value for each pixel of the
diffraction patterns). We name our buffer 'maxbuf'.
"""
return {
'maxbuf': self.buffer(
kind="sig", dtype=self.meta.input_dtype
)
}

def preprocess(self):
"""
Initialize buffer with neutral element for maximum.
"""
self.results.maxbuf[:] = np.float('-inf')

def process_frame(self, frame):
"""
In this function, we have a frame and the buffer maxbuf
available, which we declared above. This function is called
for all frames / diffraction patterns in the data set. The
maxbuf is a partial result, and all partial results will
later be merged (see below).

In this case, we determine the maximum from the current
maximum and the current frame, for each pixel in the
diffraction pattern.

Notes:

- You cannot rely on any particular order of frames this function
is called in.
- Your function should be pure, that is, it should not have side
effects beyond modifying the content of result buffers or task data,
and should only depend on it's input parameters, including
the UDF object :code:self.
"""
self.results.maxbuf[:] = np.maximum(frame, self.results.maxbuf)

def merge(self, dest, src):
"""
merge two partial results, from src into dest
"""
dest['maxbuf'][:] = np.maximum(dest['maxbuf'], src['maxbuf'])

res = ctx.run_udf(
udf=MaxUDF(),
dataset=dataset,
)

# to access the named buffer as a NumPy array:
res['maxbuf'].data


For more complete examples, you can also have a look at the functions implemented in the sub-modules of libertem.udf and at LiberTEM-blobfinder.

Passing parameters¶

By default, keyword arguments that are passed to the constructor of a UDF are available as properties of self.params:

class MyUDF(UDF):

def process_frame(self, frame):
result = correlate_peaks(frame, self.params.peaks)
...

udf = MyUDF(peaks=peaks, other=...)


Running UDFs¶

As shown in the examples above, the run_udf() method of Context is used to run UDFs. Usually, you only need to pass an instance of your UDF and the dataset you want to run on:

udf = YourUDF(param1="value1")
res = ctx.run_udf(udf=udf, dataset=dataset)


You can also enable a progress bar:

udf = YourUDF(param1="value1")
res = ctx.run_udf(udf=udf, dataset=dataset, progress=True)


run_udf() returns a dict, having the buffer names as keys (as defined in get_result_buffers()) and BufferWrapper instances as values. You can use these in any place you would use a NumPy array, for example as an argument to NumPy functions, or you can explicitly convert them to NumPy arrays by accessing the .data attribute, or by calling numpy.array():

import numpy as np

res = ctx.run_udf(udf=udf, dataset=dataset)
# convert to NumPy array, assuming we declared a buffer
# with name buf1:
arr = res['buf1'].data
arr = np.array(res['buf1'])

# or directly treat as array:
np.sum(res['buf1'])


Regions of interest¶

In addition, you can pass the roi (region of interest) parameter, to run your UDF on a selected subset of data. roi should be a NumPy array containing a bool mask, having the shape of the navigation axes of the dataset. For example, to process a random subset of a 4D-STEM dataset:

import numpy as np

# If your dataset has shape (14, 14, 32, 32) with two signal
# and two navigation dimensions, dataset.shape.nav
# translates to (14, 14).
roi = np.random.choice(a=[False, True], size=dataset.shape.nav)
ctx.run_udf(udf=udf, dataset=dataset, roi=roi)


Note that the result array only contains values for the selected indices, all other indices are set to nan (or, if the dtype doesn’t support nan, some other, not further defined value). It is best to limit further processing to the same roi.

You can also access a flat array that is not filled up with nan using .raw_data:

res = ctx.run_udf(udf=udf, dataset=dataset, roi=roi)
res['buf1'].raw_data