Source code for libertem.udf.raw

import logging

import numpy as np

from libertem.common.math import prod, count_nonzero
from libertem.udf import UDF


log = logging.getLogger(__name__)


[docs] class PickUDF(UDF): ''' Load raw data from ROI This UDF is meant for frame picking with a very small ROI, usually a single frame. .. versionadded:: 0.4.0 Examples -------- >>> udf = PickUDF() >>> roi = np.zeros(dataset.shape.nav, dtype=bool) >>> roi[0] = True >>> result = ctx.run_udf(dataset=dataset, udf=udf, roi=roi) >>> result["intensity"].raw_data[0].shape (32, 32) ''' def __init__(self): super().__init__() def get_preferred_input_dtype(self): '' return self.USE_NATIVE_DTYPE def get_result_buffers(self): '' dtype = self.meta.input_dtype sigshape = tuple(self.meta.dataset_shape.sig) if self.meta.roi is not None: navsize = count_nonzero(self.meta.roi) else: navsize = prod(self.meta.dataset_shape.nav) warn_limit = 2**28 loaded_size = prod(sigshape) * navsize * np.dtype(dtype).itemsize if loaded_size > warn_limit: log.warning("PickUDF is loading %s bytes, exceeding warning limit %s. " "Consider using or implementing an UDF to process data on the worker " "nodes instead." % (loaded_size, warn_limit)) # We are using a "single" buffer since we mostly load single frames. A # "sig" buffer would work as well, but would require a transpose to # accomodate multiple frames in the last and not first dimension. # A "nav" buffer would allocate a NaN-filled buffer for the whole dataset. return { 'intensity': self.buffer( kind='single', extra_shape=(navsize, ) + sigshape, dtype=dtype ) } def process_tile(self, tile): '' # We work in flattened nav space with ROI applied sl = self.meta.slice.get() self.results.intensity[sl] = tile def merge(self, dest, src): '' # We receive full-size buffers from each node that # contributes at least one frame and rely on the rest being filled # with zeros correctly. dest.intensity[:] += src.intensity def merge_all(self, ordered_results): '' intensity_chunks = [b.intensity for b in ordered_results.values()] intensity_sum = np.stack(intensity_chunks, axis=0).sum(axis=0) return {'intensity': intensity_sum}