from collections import defaultdict, OrderedDict
from contextlib import contextmanager
import typing
from typing import (
Any, Optional,
TypeVar, Union, TYPE_CHECKING
)
from collections.abc import AsyncGenerator, Generator, Iterator, Mapping, Sequence, Iterable
from typing_extensions import Protocol, Literal
from typing import runtime_checkable
import warnings
import logging
import uuid
import cloudpickle
import numpy as np
from opentelemetry import trace, context as opentelemetry_context
from sparseconverter import (
BACKENDS, CUPY_BACKENDS, D2_BACKENDS, DENSE_BACKENDS, ND_BACKENDS, SPARSE_BACKENDS,
CUDA_BACKENDS, CPU_BACKENDS, CUDA,
ArrayBackend, cheapest_pair, check_shape, get_backend,
make_like, result_type, for_backend
)
from libertem.common.tracing import attach_to_parent, TracedThreadPoolExecutor
from libertem.common.progress import ProgressManager, PartitionProgressTracker, PartitionTrackerNoOp
from libertem.io.dataset.base.tiling import DataTile
from libertem.io.dataset.base import DataSetMeta
from libertem.utils.devices import has_cupy
from libertem.warnings import UseDiscouragedWarning
from libertem.common.exceptions import UDFException
from libertem.common.buffers import (
BufferWrapper, AuxBufferWrapper, PlaceholderBufferWrapper,
BufferKind, BufferUse, BufferLocation, ArrayWithMask,
)
from libertem.common import Shape, Slice
from libertem.common.udf import TilingPreferences, UDFProtocol, UDFMethod
from libertem.common.math import count_nonzero, prod
from libertem.io.dataset.base import (
TilingScheme, Negotiator, Partition, DataSet, get_coordinates
)
from libertem.io.corrections import CorrectionSet
from libertem.io.dataset.base.roi import roi_for_partition
from libertem.common.backend import get_use_cuda, get_device_class
from libertem.common.async_utils import async_generator_eager
from libertem.executor.inline import InlineJobExecutor
from libertem.common.executor import (
Environment, JobExecutor, TaskCommHandler, NoopCommHandler, TaskProtocol,
JobCancelledError, ResourceDef,
)
from libertem.common.exceptions import UDFRunCancelled
if TYPE_CHECKING:
from numpy import typing as nt
from opentelemetry.trace import SpanContext
from libertem.common.progress import ProgressReporter
tracer = trace.get_tracer(__name__)
log = logging.getLogger(__name__)
BackendSpec = Union[ArrayBackend, Iterable[ArrayBackend]]
DeviceClass = Literal['cpu', 'cuda']
UDFKwarg = Union[Any, AuxBufferWrapper]
UDFKwargs = dict[str, UDFKwarg]
ExecutionPlan = dict[ArrayBackend, Iterable["UDF"]]
class ResultsForDataSet:
def __init__(
self,
gen: Generator[tuple[tuple["UDFData", ...], TaskProtocol], None, None],
params: "UDFParams",
params_handle,
executor: JobExecutor,
udfs: list["UDF"],
):
self._gen = gen
self._params = params
self._params_handle = params_handle
self._executor = executor
self._udfs = udfs
def __iter__(self):
return self
def __next__(self) -> tuple[tuple["UDFData", ...], TaskProtocol]:
return next(self._gen)
def close(self):
self._gen.close()
def throw(self, exc: Exception):
return self._gen.throw(exc)
def update_parameters_experimental(self, patch: list[dict[str, Any]]):
log.debug("ResultsForDataSet.update_parameters_experimental: %s", patch)
# update parameters on workers (via the executor):
self._params.patch(patch=patch)
self._executor.scatter_update_patch(self._params_handle, patch)
# update UDFs on the main node:
for udf, params in zip(self._udfs, patch):
udf._update_parameters_patch(kwargs_patch=params)
def _get_dtype(
udfs: list["UDF"],
dtype: "nt.DTypeLike",
corrections: Optional[CorrectionSet],
array_backends: Iterable[ArrayBackend],
) -> "nt.DTypeLike":
tmp_dtype: "nt.DTypeLike"
if corrections is not None and corrections.have_corrections():
tmp_dtype = np.result_type(np.float32, dtype)
else:
tmp_dtype = np.dtype(dtype)
for udf in udfs:
tmp_dtype = result_type(
udf.get_preferred_input_dtype(),
tmp_dtype,
*array_backends,
)
return tmp_dtype
class TileConverter:
def __init__(self, tile: DataTile):
self._tile = tile
# prime cache with identity
self._cache = {get_backend(tile.data): tile.data}
def get(self, backend):
'''
Get a converted tile for the specified backend.
Return from cache if any of them is in the cache,
otherwise choose cheapest option to convert from availabe cache.
'''
res = self._cache.get(backend, None)
if res is None:
source, target = cheapest_pair(self._cache.keys(), (backend, ))
res = for_backend(self._cache[source], target)
if backend in ND_BACKENDS:
res = res.reshape(self._tile.shape)
self._cache[backend] = res
return res
def _format_plan(plan: ExecutionPlan) -> list[str]:
res = []
for k, v in plan.items():
udf_classes = ",".join([
str(c.__class__.__name__)
for c in v
])
res.append(
f"{str(k)}: {udf_classes}"
)
return res
def _execution_plan(
udfs, ds: Union[DataSet, DataSetMeta], device_class: Optional[DeviceClass] = None,
available_backends: Iterable[ArrayBackend] = BACKENDS
) -> tuple[ArrayBackend, ExecutionPlan]:
'''
Calculate which array format to use for which UDFs
'''
remaining = list(udfs)
execution_plan = OrderedDict()
# preserve order
if ds.array_backends is None:
raise ValueError("Available dataset backends need to be known.")
ds_backends = tuple(ds.array_backends)
available_backends = frozenset(available_backends)
if device_class is None:
# No preference, for example to prepare running for dataset
native_backends = available_backends
elif device_class == 'cuda':
native_backends = available_backends.intersection(CUDA_BACKENDS)
elif device_class == 'cpu':
native_backends = available_backends.intersection(CPU_BACKENDS)
else:
raise ValueError(f"Unknown device class{device_class}, allowed are 'cuda' and 'cpu'.")
aggregate_udf_aversion: dict[ArrayBackend, float] = defaultdict(float)
for udf in remaining:
backends = _get_canonical_backends(udf.get_backends())
for i, b in enumerate(backends):
aggregate_udf_aversion[b] += i / len(backends)
backend_popularity = sorted(
aggregate_udf_aversion.keys(),
key=lambda b: aggregate_udf_aversion[b]
)
def find_popular(remaining, restrict_to, preference_order):
'''
Parameters
----------
preference_order: Order of preference for tie breaking if several formats can work for
the same number of UDFs
'''
popular = defaultdict(OrderedDict)
for udf in remaining:
for b in _get_canonical_backends(udf.get_backends()):
if b in restrict_to:
# Use dict with value True to make sure UDFs are not counted double
popular[b][udf] = True
most_popular = sorted(popular.keys(), key=lambda b: len(popular[b]), reverse=True)
if most_popular:
max_len = len(popular[most_popular[0]])
most_popular = [b for b in most_popular if len(popular[b]) >= max_len]
if len(most_popular) > 1:
for b in preference_order:
if b in most_popular:
return b, list(popular[b].keys())
b = most_popular[0]
return b, list(popular[b].keys())
else:
return None, None
def apply(backend, udfs):
if backend is not None:
execution_plan[backend] = udfs
for udf in udfs:
remaining.remove(udf)
# First assign UDFs that work without conversion
# with a tile format native to dataset and device class
backend, selected_udfs = find_popular(
remaining,
restrict_to=native_backends.intersection(ds_backends),
preference_order=ds_backends
)
if backend is not None:
apply(backend, selected_udfs)
# The following covers the common case that the dataset is CPU-only
# and we require conversion to a CuPy format
# We should avoid costly conversion dense to sparse or sparse to dense
# by default when transforming between CPU and CuPy.
# Assign UDFs with sparse formats if the dataset has any sparse options
if SPARSE_BACKENDS.intersection(ds_backends):
backend, selected_udfs = find_popular(
remaining,
restrict_to=native_backends.intersection(SPARSE_BACKENDS),
preference_order=backend_popularity
)
if backend is not None:
apply(backend, selected_udfs)
# Assign a dense native format if the dataset has dense options
if DENSE_BACKENDS.intersection(ds_backends):
backend, selected_udfs = find_popular(
remaining,
restrict_to=native_backends.intersection(DENSE_BACKENDS),
preference_order=backend_popularity
)
if backend is not None:
apply(backend, selected_udfs)
# Then assign UDFs that work with an array format that matches the devie class
# That means arrays will be converted to allow execution on GPU or CPU even if the
# dataset can't deliver that natively and it requires conversion between sparse and dense.
# That is the case for the dataset default NumPy and UDFs that require sparse input.
while True:
backend, selected_udfs = find_popular(
remaining,
restrict_to=native_backends,
preference_order=backend_popularity
)
if backend is not None:
apply(backend, selected_udfs)
else:
break
# From here we have the "CPU on GPU fallback". A GPU on CPU fallback
# is attempted as well, may trigger an error downstream.
# Assign remaining UDFs that work with an array backend native to the dataset
backend, selected_udfs = find_popular(
remaining,
restrict_to=available_backends.intersection(ds_backends),
preference_order=ds_backends
)
if backend:
apply(backend, selected_udfs)
# Finally assign UDFs to any array backend even if they don't match the
# dataset or device class.
while True:
backend, selected_udfs = find_popular(
remaining,
restrict_to=available_backends,
preference_order=backend_popularity
)
if backend is not None:
apply(backend, selected_udfs)
else:
break
if remaining:
for r in remaining:
for b in _get_canonical_backends(r.get_backends()):
if b not in BACKENDS:
raise ValueError(f"UDF {r} returned invalid backend {b}.")
raise RuntimeError(f"Could not find backends for remaining {remaining}.")
# Optimize the execution plan
reordered_plan = OrderedDict()
available = []
required = list(execution_plan.keys())
# First, determine the start point
ds_backend, right = cheapest_pair(ds_backends, required)
reordered_plan[right] = execution_plan[right]
available.append(ds_backend)
available.append(right)
required.remove(right)
# Now assign the remaining ones
while required:
_, right = cheapest_pair(available, required)
reordered_plan[right] = execution_plan[right]
required.remove(right)
# Now we should have covered all
trace.get_current_span().add_event(
'execution_plan', {
'ds_backend': ds_backend,
'execution_plan': _format_plan(reordered_plan),
}
)
return ds_backend, reordered_plan
class MergeAttrMapping:
def __init__(self, dict_input: dict[str, np.ndarray]):
self._dict: dict[str, np.ndarray] = dict_input
def __iter__(self) -> Iterator[str]:
return iter(self._dict)
def __contains__(self, k: str) -> bool:
return k in self._dict
def __getattr__(self, k: str) -> np.ndarray:
return self._dict[k]
def __setattr__(self, k: str, v: np.ndarray) -> None:
if k in ['_dict']:
super().__setattr__(k, v)
else:
self._dict[k][:] = v
def __getitem__(self, k: str) -> np.ndarray:
warnings.warn(
"dict-like access is discouraged, as it can be "
"confusing vs. using attribute access",
UseDiscouragedWarning,
stacklevel=2,
)
return self._dict[k]
T = TypeVar('T')
class UDFData:
'''
Container for result buffers, return value from running UDFs
'''
def __init__(self, data: dict[str, BufferWrapper]):
self._data = data
self._views: dict[str, np.ndarray] = {}
def __repr__(self) -> str:
return "<UDFData: %r>" % (
self._data
)
def __getattr__(self, k: str) -> Union[np.ndarray, BufferWrapper]:
if k.startswith("_"):
raise AttributeError("no such attribute: %s" % k)
try:
return self._get_view_or_data(k)
except KeyError as e:
raise AttributeError(str(e))
def get_buffer(self, name: str) -> BufferWrapper:
"""
Return the `BufferWrapper` for buffer `name`
.. versionadded:: 0.7.0
"""
return self._data[name]
def set_buffer(self, name: str, buffer: BufferWrapper) -> None:
"""
Replace or set the `BufferWrapper` for buffer `name`
"""
assert isinstance(buffer, BufferWrapper)
self._data[name] = buffer
def get(
self, k: str, default: Optional[T] = None
) -> Optional[Union[T, np.ndarray, BufferWrapper]]:
try:
return self.__getattr__(k)
except (KeyError, AttributeError):
return default
def __setattr__(self, k: str, v: "nt.ArrayLike") -> None:
if not k.startswith("_"):
# convert UDFData.some_attr = something to array slice assignment
getattr(self, k)[:] = v
else:
super().__setattr__(k, v)
def _get_view_or_data(self, k: str) -> Union[np.ndarray, BufferWrapper]:
if k in self._views:
return self._views[k]
res = self._data[k]
if isinstance(res, BufferWrapper) and res.raw_data is not None:
return res.raw_data
return res
def __getitem__(self, k: str) -> BufferWrapper:
warnings.warn(
"dict-like access is discouraged, as it can be "
"confusing vs. using attribute access. Please use `get_buffer` instead, "
"if you really need the `BufferWrapper` and not the current view",
UseDiscouragedWarning,
stacklevel=2,
)
return self._data[k]
def __contains__(self, k: str) -> bool:
return k in self._data
def items(self):
return self._data.items()
def keys(self):
return self._data.keys()
def values(self):
return self._data.values()
def as_dict(self) -> dict[str, BufferWrapper]:
return dict(self.items())
def get_proxy(self) -> MergeAttrMapping:
return MergeAttrMapping({
k: (self._views[k] if k in self._views else self._data[k].raw_data)
for k, v in self.items()
if v and v.has_data()
})
def _get_buffers(
self, filter_allocated: bool = False
) -> Generator[tuple[str, BufferWrapper], None, None]:
for k, buf in self._data.items():
if not hasattr(buf, 'has_data') or (buf.has_data() and filter_allocated):
continue
yield k, buf
def allocate_for_part(self, partition: Partition, roi: Optional[np.ndarray], lib=None) -> None:
"""
allocate all BufferWrapper instances in this namespace.
for pre-allocated buffers (i.e. aux data), only set shape and roi
"""
for k, buf in self._get_buffers():
buf.set_shape_partition(partition, roi)
for k, buf in self._get_buffers(filter_allocated=True):
buf.allocate(lib=lib)
def allocate_for_full(self, dataset: DataSet, roi: Optional[np.ndarray]) -> None:
ds_partitions = [*dataset.get_partitions()]
for k, buf in self._get_buffers():
buf.set_shape_ds(dataset.shape, roi)
buf.add_partitions(ds_partitions)
for k, buf in self._get_buffers(filter_allocated=True):
buf.allocate()
def set_view_for_dataset(self, dataset: DataSet) -> None:
for k, buf in self._get_buffers():
self._views[k] = buf.get_view_for_dataset(dataset)
def set_view_for_partition(self, partition: Partition) -> None:
for k, buf in self._get_buffers():
self._views[k] = buf.get_view_for_partition(partition)
def set_view_for_tile(self, partition: Partition, tile: DataTile) -> None:
for k, buf in self._get_buffers():
self._views[k] = buf.get_view_for_tile(partition, tile)
def set_contiguous_view_for_tile(self, partition: Partition, tile: DataTile) -> None:
# .. versionadded:: 0.5.0
for k, buf in self._get_buffers():
self._views[k] = buf.get_contiguous_view_for_tile(partition, tile)
def flush(self, debug: bool = False) -> None:
# .. versionadded:: 0.5.0
for k, buf in self._get_buffers():
buf.flush(debug=debug)
def export(self) -> None:
# .. versionadded:: 0.6.0
for k, buf in self._get_buffers():
buf.export()
def set_view_for_frame(self, partition: Partition, tile: DataTile, frame_idx: int) -> None:
for k, buf in self._get_buffers():
self._views[k] = buf.get_view_for_frame(partition, tile, frame_idx)
def clear_views(self) -> None:
self._views = {}
class UDFKwargsWrapper(UDFData):
"""
Wrapper for UDF kwargs, used for slicing/viewing AUX data
"""
def __init__(self, data: UDFKwargs):
super().__init__(data)
self._data = data
self._views = {}
def _get_buffers(
self, filter_allocated: bool = False
) -> Generator[tuple[str, AuxBufferWrapper], None, None]:
for k, buf in self._data.items():
if isinstance(buf, AuxBufferWrapper):
if buf.has_data() and filter_allocated:
continue
yield k, buf
def new_for_partition(self, partition: Partition, roi: np.ndarray):
for k, buf in self._get_buffers():
self._data[k] = buf.new_for_partition(partition, roi)
[docs]
@runtime_checkable
class UDFFrameMixin(Protocol):
'''
Implement :code:`process_frame` for per-frame processing.
'''
[docs]
def process_frame(self, frame: np.ndarray):
"""
Implement this method to process the data on a frame-by-frame manner.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
- `self.meta` - meta data about the current operation and data set
Parameters
----------
frame : numpy.ndarray or cupy.ndarray
A single frame or signal element from the dataset.
The shape is the same as `dataset.shape.sig`. In case of pixelated
STEM / scanning diffraction data this is 2D, for spectra 1D etc.
"""
raise NotImplementedError()
[docs]
@runtime_checkable
class UDFTileMixin(Protocol):
'''
Implement :code:`process_tile` for per-tile processing.
'''
[docs]
def process_tile(self, tile: np.ndarray):
"""
Implement this method to process the data in a tiled manner.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
- `self.meta` - meta data about the current operation and data set
Parameters
----------
tile : numpy.ndarray or cupy.ndarray
A small number N of frames or signal elements from the dataset.
The shape is (N,) + `dataset.shape.sig`. In case of pixelated
STEM / scanning diffraction data this is 3D, for spectra 2D etc.
"""
raise NotImplementedError()
[docs]
@runtime_checkable
class UDFPartitionMixin(Protocol):
'''
Implement :code:`process_partition` for per-partition processing.
'''
[docs]
def process_partition(self, partition: np.ndarray) -> None:
"""
Implement this method to process the data partitioned into large
(100s of MiB) partitions.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
- `self.meta` - meta data about the current operation and data set
Note
----
Only use this method if you know what you are doing; especially if
you are running a processing pipeline with multiple steps, or multiple
processing pipelines at the same time, performance may be adversely
impacted.
Parameters
----------
partition : numpy.ndarray or cupy.ndarray
A large number N of frames or signal elements from the dataset.
The shape is (N,) + `dataset.shape.sig`. In case of pixelated
STEM / scanning diffraction data this is 3D, for spectra 2D etc.
"""
raise NotImplementedError()
[docs]
@runtime_checkable
class UDFPreprocessMixin(Protocol):
'''
Implement :code:`preprocess` to initialize the result buffers of a partition on the worker
before the partition data is processed.
.. versionadded:: 0.3.0
'''
[docs]
def preprocess(self) -> None:
"""
Implement this method to preprocess the result data for a partition.
This can be useful to initialize arrays of
:code:`dtype='object'` with the correct container types, for example.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
"""
raise NotImplementedError()
[docs]
@runtime_checkable
class UDFPostprocessMixin(Protocol):
'''
Implement :code:`postprocess` to modify the resulf buffers of a partition on the worker
after the partition data has been completely processed, but before it is returned to the
main node for the final merging step.
'''
[docs]
def postprocess(self) -> None:
"""
Implement this method to postprocess the result data for a partition.
This can be useful in combination with process_tile() to implement
a postprocessing step that requires the reduced results for whole frames.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
"""
raise NotImplementedError()
[docs]
@runtime_checkable
class UDFMergeAllMixin(Protocol):
[docs]
def merge_all(
self, ordered_results: 'OrderedDict[Slice, MergeAttrMapping]'
) -> Mapping[str, 'nt.ArrayLike']:
"""
Combine stack of ordered partial results :code:`ordered_results` to form complete result.
Combining can be more efficient than direct merging into a result buffer
for cases where the results are not NumPy arrays.
Currently this is only applicable for the
:class:`libertem.executor.delayed.DelayedJobExecutor`
where it provides an efficient pathway to construct Dask arrays from delayed UDF results.
The input and the returned arrays are in flattened navigation dimension with ROI applied.
Data available in this method:
- :code:`self.params` - the parameters of this UDF
For UDFs with only :code:`kind='nav'` result buffers a default implementation
is used automatically.
Parameters
----------
ordered_results
Ordered dict mapping partition slice to UDF partial result
Returns
-------
dict[buffername] -> array_like
Dictionary mapping result buffer name to buffer content
Note
----
This function is running on the main node, which means `self.results`
and `self.task_data` are not available.
"""
raise NotImplementedError()
def _default_merge_all(udf, ordered_results: 'OrderedDict[Slice, MergeAttrMapping]'):
if udf.requires_custom_merge_all:
raise NotImplementedError(
"Default merging only works for kind='nav' buffers. "
"Please implement a suitable custom merge_all function."
)
result_chunks = defaultdict(list)
for b in ordered_results.values():
for key in b:
result_chunks[key].append(getattr(b, key))
result = {
# checking above assures that we only have kind='nav'
# where concatenation is the correct method.
k: np.concatenate(val)
for k, val in result_chunks.items()
}
return result
class UDFBase(UDFProtocol):
'''
Base class for UDFs with helper functions.
'''
def __init__(self, *args, **kwargs) -> None:
# this should not execute - it is mostly for getting the typing right:
self.params = UDFKwargsWrapper({})
self.results = UDFData({})
def _update_parameters_patch(self, kwargs_patch):
"""
Update the parameters _for this UDF instance_ by applying the patch in
`kwargs_patch`, which is a dict containing the key/value combinations to
update.
For properly supporting AUX data (i.e. `BufferWrapper`-typed parameters),
make sure to set the correct views afterwards.
:meta private:
"""
new_kwargs = {}
new_kwargs.update(self._kwargs)
new_kwargs.update(kwargs_patch)
self.params = UDFKwargsWrapper(new_kwargs)
def get_task_data(self) -> dict[str, Any]:
raise NotImplementedError()
def get_result_buffers(self) -> dict[str, BufferWrapper]:
raise NotImplementedError()
def allocate_for_part(self, partition: Partition, roi: Optional[np.ndarray]) -> None:
for ns in [self.results]:
ns.allocate_for_part(partition, roi, lib=self.xp)
def allocate_for_full(self, dataset: DataSet, roi: Optional[np.ndarray]) -> None:
for ns in [self.params, self.results]:
ns.allocate_for_full(dataset, roi)
def set_views_for_dataset(self, dataset: DataSet) -> None:
for ns in [self.params]:
ns.set_view_for_dataset(dataset)
def set_views_for_partition(self, partition: Partition) -> None:
for ns in [self.params, self.results]:
ns.set_view_for_partition(partition)
def set_views_for_tile(self, partition: Partition, tile: DataTile) -> None:
for ns in [self.params, self.results]:
ns.set_view_for_tile(partition, tile)
def set_contiguous_views_for_tile(self, partition: Partition, tile: DataTile) -> None:
# .. versionadded:: 0.5.0
for ns in [self.params, self.results]:
ns.set_contiguous_view_for_tile(partition, tile)
def flush(self, debug: bool = False) -> None:
# .. versionadded:: 0.5.0
for ns in [self.params, self.results]:
ns.flush(debug=debug)
def set_views_for_frame(self, partition: Partition, tile: DataTile, frame_idx: int):
for ns in [self.params, self.results]:
ns.set_view_for_frame(partition, tile, frame_idx)
def clear_views(self) -> None:
for ns in [self.params, self.results]:
ns.clear_views()
def init_task_data(self) -> None:
self.task_data = UDFData(self.get_task_data())
def init_result_buffers(self, executor=None) -> None:
"""
Create the UDFData instance containing BufferWrappers
for results of this UDF. Same method used for
Dataset- and Partition-sized buffers.
The executor argument is provided on the main node
only to allow modification of Dataset-sized buffers
buffers into more specialized forms.
#TODO use another mechanism to distinguish between
dataset and parititon-sized buffers when initializing,
such that we can modify buffers in both cases.
:meta private:
"""
self.results = UDFData(self.get_result_buffers())
if executor is not None:
for name, buffer in self.results.items():
new_buffer = executor.modify_buffer_type(buffer)
self.results.set_buffer(name, new_buffer)
def export_results(self) -> None:
# .. versionadded:: 0.6.0.dev0
self.results.export()
def set_meta(self, meta: UDFMeta) -> None:
self.meta = meta
def set_slice(self, slice_: Slice) -> None:
self.meta.slice = slice_
def set_tile_idx(self, idx: int) -> None:
self.meta.tiling_scheme_idx = idx
def set_backend(self, backend: ArrayBackend) -> None:
assert backend in _get_canonical_backends(self.get_backends())
self._backend = backend
def get_backends(self) -> BackendSpec:
raise NotImplementedError() # see impl in UDF.get_backends
@property
def xp(self):
'''
Compute back-end library to use.
Generally, use :code:`self.xp` instead of :code:`np` to use NumPy or CuPy transparently
.. versionadded:: 0.6.0
'''
# Implemented as property and not variable to avoid pickling issues
# tests/udf/test_simple_udf.py::test_udf_pickle
if self._backend in CPU_BACKENDS or self._backend == CUDA:
return np
elif self._backend in CUPY_BACKENDS:
# Re-importing should be fast, right?
# Importing only here to avoid superfluous import
import cupy
# mocking for testing without actual CUDA device
# import numpy as cupy
return cupy
else:
raise ValueError(f"Backend can be {BACKENDS}, got {self._backend}")
def get_method(self) -> UDFMethod:
"""
Return a member of the :attr:`libertem.udf.base.UDF.UDF_METHOD`
enum to indicate which processing method to use during :code:`run_udf`.
By default, this method returns based on the process method
precedence of tile > frame > partition.
The return is checked at runtime to ensure the requested method
is impemented on the UDF itself.
.. versionchanged:: 0.13.0
:meth:`~libertem.udf.base.UDF.get_method()` was exposed to enable
choice of processing method at runtime
Returns
-------
UDFMethod
A member of the enum :attr:`libertem.udf.base.UDF.UDF_METHOD`
"""
if isinstance(self, UDFTileMixin):
return UDFMethod.TILE
elif isinstance(self, UDFFrameMixin):
return UDFMethod.FRAME
elif isinstance(self, UDFPartitionMixin):
return UDFMethod.PARTITION
else:
raise TypeError("UDF should implement one of the `process_*` methods")
def _check_results(self, decl, arr, name):
"""
Check results in `arr` for buffer `name` for consistency with the
declaration in `decl`.
1) All returned buffers need to be declared
2) Private buffers can't be returned from `get_results`
3) The `dtype` of `arr` needs to be compatible with the declared `dtype`
"""
if name not in decl:
raise UDFException(
"buffer '%s' is not declared in `get_result_buffers` "
"(hint: `self.buffer(..., use='result_only')`" % name
)
buf_decl = decl[name]
if buf_decl.use == "private":
raise UDFException("Don't return `use='private'` buffers from `get_results`")
if np.dtype(arr.dtype).kind != np.dtype(buf_decl.dtype).kind:
raise UDFException(
"the returned ndarray '%s' has a different dtype kind (%s) "
"than declared (%s)" % (
name, arr.dtype, buf_decl.dtype
)
)
def get_results(self) -> dict[str, np.ndarray]:
raise NotImplementedError()
_default_merge_all = UDFMergeAllMixin.merge_all
def _do_merge_all(self, ordered_results: 'OrderedDict[Slice, MergeAttrMapping]'):
if isinstance(self, UDFMergeAllMixin):
results_tmp = self.merge_all(ordered_results)
else:
# This will raise NotImplemementedError if preconditions for default merge
# are not met
results_tmp = _default_merge_all(self, ordered_results)
if not set(results_tmp.keys()).issubset(set(self.results.keys())):
raise ValueError(f'Returned result names from merge_all ({[*results_tmp.keys()]}) '
'are not contained within declared result buffer names '
f'({[*self.results.keys()]})')
for key, value in results_tmp.items():
# This SHOULD throw errors if sth doesn't match up about
# buffer name, shape or dtype
self.results.get_buffer(key).replace_array(value)
def _do_get_results(self) -> Mapping[str, BufferWrapper]:
results_tmp = self.get_results()
decl = self.get_result_buffers()
# include any results that were not explicitly included, but have non-private `use`:
results_tmp.update({
k: getattr(self.results, k)
for k, v in decl.items()
if k not in results_tmp and v.use is None
})
results_buffer_cls = {
k: self.results.get_buffer(k).result_buffer_type()
for k in results_tmp.keys()
}
# wrap numpy results into `ResultBuffer`s:
results = {}
for name, arr in results_tmp.items():
mask = None
if isinstance(arr, ArrayWithMask):
mask = arr.mask
arr = arr.arr
self._check_results(decl, arr, name)
buf_decl = decl[name]
if mask is None:
valid_mask = self.meta.get_valid_nav_mask()
assert valid_mask is not None
mask = buf_decl.make_default_mask(
valid_nav_mask=valid_mask,
dataset_shape=self.meta.dataset_shape,
roi=self.meta.roi,
)
buf = results_buffer_cls[name](
kind=buf_decl.kind, extra_shape=buf_decl.extra_shape,
dtype=buf_decl.dtype,
data=arr,
)
buf.valid_mask = mask
buf.set_shape_ds(self.meta.dataset_shape, self.meta.roi)
results[name] = buf
return results
[docs]
class UDF(UDFBase):
"""
The main user-defined functions interface. You can implement your functionality
by overriding methods on this class.
If you override `__init__`, please take care,
as it is called multiple times during evaluation of a UDF. You can handle
some pre-conditioning of parameters, but you also have to accept the results
as input again.
Arguments passed as `**kwargs` will be automatically available on `self.params`
when running the UDF.
Example
-------
>>> class MyUDF(UDF):
... def __init__(self, param1, param2="def2", **kwargs):
... param1 = int(param1)
... if "param3" not in kwargs:
... raise TypeError("missing argument param3")
... super().__init__(param1=param1, param2=param2, **kwargs)
Parameters
----------
kwargs
Input parameters. They are scattered to the worker processes and
available as `self.params` from here on.
Values can be `BufferWrapper` instances, which, when accessed via
`self.params.the_key_here`, will automatically return a view corresponding
to the current unit of data (frame, tile, partition).
"""
def __init__(self, **kwargs: UDFKwarg) -> None:
self._backend = 'numpy' # default so that self.xp can always be used
self._kwargs = kwargs
self.params = UDFKwargsWrapper(kwargs)
self.task_data = UDFData({})
self.results = UDFData({})
self._requires_custom_merge = None
self._requires_custom_merge_all = None
def copy(self) -> "UDF":
return self.__class__(**self._kwargs)
@classmethod
def new_for_partition(
cls,
kwargs: dict[str, Any],
partition: Partition,
roi: np.ndarray,
) -> "UDF":
new_instance = cls(**kwargs)
new_instance.params.new_for_partition(partition, roi)
return new_instance
[docs]
def copy_for_partition(self, partition: Partition, roi: np.ndarray) -> "UDF":
"""
create a copy of the UDF, specifically slicing aux data to the
specified pratition and roi
"""
return self.__class__.new_for_partition(self._kwargs, partition, roi)
[docs]
def get_task_data(self) -> dict[str, Any]:
"""
Initialize per-task data.
Per-task data can be mutable. Override this function
to allocate temporary buffers, or to initialize
system resources.
If you want to distribute static data, use
parameters instead.
Data available in this method:
- `self.params` - the input parameters of this UDF
- `self.meta` - relevant metadata, see :class:`UDFMeta` documentation.
Returns
-------
dict
Flat dict with string keys. Keys should
be valid python identifiers, which allows
access via `self.task_data.the_key_here`.
"""
return {}
[docs]
def get_result_buffers(self) -> dict[str, BufferWrapper]:
"""
Return result buffer declaration.
Values of the returned dict should be :class:`~libertem.common.buffers.BufferWrapper`
instances, which, when accessed via :code:`self.results.key`,
will automatically return a view corresponding to the
current unit of data (frame, tile, partition).
The values also need to be serializable via pickle.
Data available in this method:
- :code:`self.params` - the parameters of this UDF
- :code:`self.meta` - relevant metadata, see :class:`UDFMeta` documentation.
Please note that partition metadata will not be set when this method is
executed on the head node.
Returns
-------
dict
Flat dict with string keys. Keys should
be valid python identifiers, which allows
access via `self.results.the_key_here`.
"""
raise NotImplementedError()
@property
def requires_custom_merge(self):
"""
Determine if buffers with :code:`kind != 'nav'` that are not :code:`use
!= 'result_only'` are present where the default merge doesn't work
.. versionadded:: 0.5.0
.. versionchanged:: 0.12.0
Ignore :code:`use != 'result_only'` buffer since they are not present
in the merge function
"""
if self._requires_custom_merge is None:
buffers = self.get_result_buffers()
self._requires_custom_merge = any(
buffer.kind != 'nav' and buffer.use != 'result_only' for buffer in buffers.values()
)
return self._requires_custom_merge
@property
def requires_custom_merge_all(self):
"""
Determine if buffers with :code:`kind != 'nav'` are present where
the default merge doesn't work
.. versionadded:: 0.12.0
"""
if self._requires_custom_merge_all is None:
buffers = self.get_result_buffers()
self._requires_custom_merge_all = any(
buffer.kind != 'nav' for buffer in buffers.values()
)
return self._requires_custom_merge_all
[docs]
def merge(self, dest: MergeAttrMapping, src: MergeAttrMapping):
"""
Merge a partial result `src` into the current global result `dest`.
Data available in this method:
- `self.params` - the parameters of this UDF
Parameters
----------
dest
global results; you can access the ndarrays for each
buffer name (from `get_result_buffers`) by attribute access
(:code:`dest.your_buffer_name`)
src
results for a partition; you can access the ndarrays for each
buffer name (from `get_result_buffers`) by attribute access
(:code:`src.your_buffer_name`)
Note
----
This function is running on the main node, which means `self.results`
and `self.task_data` are not available.
"""
if self.requires_custom_merge:
raise NotImplementedError(
"Default merging only works for kind='nav' buffers. "
"Please implement a suitable custom merge function."
)
for k in dest:
check_cast(getattr(src, k), getattr(dest, k))
getattr(dest, k)[:] = getattr(src, k)
[docs]
def get_results(self) -> dict[str, np.ndarray]:
"""
Get results, allowing a postprocessing step on the main node after
a result has been merged. See also: :class:`UDFPostprocessMixin`.
.. versionadded:: 0.7.0
Note
----
You should return all values as numpy arrays, they will be wrapped
in `BufferWrapper` instances before they are returned to the user.
See the :ref:`udf final post processing` section in the documentation for
details and examples.
Returns
-------
results : dict
A `dict` containing the final post-processed results.
"""
for k in self.results.keys():
buf = self.results.get_buffer(k)
if buf.use == "result_only":
raise UDFException(
"don't know how to set use='result_only' buffer '%s';"
" please implement `get_results`" % k
)
decls = self.get_result_buffers()
return {
k: getattr(self.results, k)
for k in self.results.keys()
if decls[k].use != "private"
}
[docs]
def get_tiling_preferences(self) -> TilingPreferences:
"""
Configure tiling preferences. Return a dictionary with the
following keys:
- "depth": number of frames/frame parts to stack on top of each other
- "total_size": total size of a tile in bytes
.. versionadded:: 0.6.0
"""
return {
"depth": UDF.TILE_DEPTH_DEFAULT,
"total_size": UDF.TILE_SIZE_MAX,
}
[docs]
def get_backends(self) -> BackendSpec:
'''
Signal which computation back-ends the UDF can use.
Allowed are values in :attr:`libertem.udf.base.UDF.BACKEND_ALL`.
:code:`(UDF.BACKEND_NUMPY, )` is returned by default for CPU-based computation.
.. versionadded:: 0.6.0
.. versionchanged:: 0.11.0
Extended from just NumPy vs. CuPy to include more generic specification of backends,
including sparse arrays. See :ref:`sparse` for details! The backend specifications are
backwards-compatible with the previously supported values.
Returns
-------
backends
A single value or iterable containing values from the supported backends
in :attr:`libertem.udf.base.UDF.BACKEND_ALL`
'''
return (UDF.BACKEND_NUMPY, )
[docs]
def forbuf(self, arr, target):
'''
Convert array to backend that is compatible with result buffers and
reshape.
This function should be wrapped around assignment to result buffers if
the array :code:`arr` might have an incompatible array backend and/or
have a flattened sig dimension. It converts any of the supported input
backends into the array backend of the target buffer. If the argument is
already matching the buffer or is not recognized, it is a no-op.
In particular, the result of array operations on a sparse input tile
can't always be merged into dense result buffers directly since arrays
from the :mod:`sparse` package are not converted to NumPy arrays
automatically.
.. versionadded:: 0.11.0
Parameters
----------
arr : array-like
Any array-like object that is supported by the `sparseconverter`
package or allows the desired operation with the result buffer at
hand.
target : array-like
The LiberTEM result buffer that the desired operation targets.
Examples
--------
>>> def process_tile(self, tile):
... res = ... # Processing result from tile
... self.results.intensity[:] += self.forbuf(
... res,
... self.results.intensity
... )
See :ref:`sparse` for a complete example how to use this function in the
contaxt of a UDF!
'''
res = make_like(arr, target, strict=False)
return res
def cleanup(self) -> None: # FIXME: name? implement cleanup as context manager somehow?
pass
[docs]
@staticmethod
def with_mask(data: np.ndarray, mask: Union[np.ndarray, bool]) -> ArrayWithMask:
"""
Add a mask to indicate the valid parts in a result. The mask
should be an array of bools, with a shape matching
the `data` array. The mask should have `True` in positions
that are valid, and `False` otherwise.
You can pass `mask=True` or `mask=False` as shortcuts for
all-valid or all-invalid, and also pass in masks that can
be broadcast to `data`.
This should be used in :meth:`get_results`.
See the :ref:`udf valid data masking` section in the
documentation for details and more examples.
Example
-------
>>> class SomeUDF(UDF):
... # ... other methods omitted for brevity ...
...
... def get_results(self):
... return {
... 'some_result': self.with_mask(
... data=self.results.some_result,
... mask=False,
... ),
... }
...
"""
return ArrayWithMask(data, mask=mask)
[docs]
def buffer(
self,
kind: BufferKind,
extra_shape: tuple[int, ...] = (),
dtype: "nt.DTypeLike" = "float32",
where: BufferLocation = None,
use: Optional[BufferUse] = None,
) -> BufferWrapper:
'''
Use this method to create :class:`~ libertem.common.buffers.BufferWrapper` objects
in :meth:`get_result_buffers`.
Parameters
----------
kind : "nav", "sig" or "single"
The abstract shape of the buffer, corresponding either to the navigation
or the signal dimensions of the dataset, or a single value.
extra_shape : optional, tuple of int or a Shape object
You can specify additional dimensions for your data. For example, if
you want to store 2D coords, you would specify :code:`(2,)` here.
If this is specified as a Shape object, it is converted to a tuple first.
dtype : string or numpy dtype
The dtype of this buffer
where : string or None
:code:`None` means NumPy array, specify :code:`'device'` to use a device buffer
(for example on a CUDA device)
.. versionadded:: 0.6.0
use : "private", "result_only" or None
If you specify :code:`"private"` here, the result will only be made available
to internal functions, like :meth:`process_frame`, :meth:`merge` or
:meth:`get_results`. It will not be available to the user of the UDF, which means
you can use this to hide implementation details that are likely to change later.
Specify :code:`"result_only"` here if the buffer is only used in :meth:`get_results`,
this means we don't have to allocate and return it on the workers without actually
needing it.
:code:`None` means the buffer is used both as a final and intermediate result.
.. versionadded:: 0.7.0
'''
if use is not None and use.lower() == "result_only":
return PlaceholderBufferWrapper(kind, extra_shape, dtype, use=use)
return BufferWrapper(kind, extra_shape, dtype, where, use=use)
[docs]
@classmethod
def aux_data(cls, data, kind, extra_shape=(), dtype="float32"):
"""
Use this method to create auxiliary data. Auxiliary data should
have a shape like `(dataset.shape.nav, extra_shape)` and on access,
an appropriate view will be created. For example, if you access
aux data in `process_frame`, you will get the auxiliary data for
the current frame you are processing.
Example
-------
We create a UDF to demonstrate the behavior:
>>> class MyUDF(UDF):
... def get_result_buffers(self):
... # Result buffer for debug output
... return {'aux_dump': self.buffer(kind='nav', dtype='object')}
...
... def process_frame(self, frame):
... # Extract value of aux data for demonstration
... self.results.aux_dump[:] = str(self.params.aux_data[:])
...
>>> # for each frame, provide three values from a sequential series:
>>> aux1 = MyUDF.aux_data(
... data=np.arange(np.prod(dataset.shape.nav) * 3, dtype=np.float32),
... kind="nav", extra_shape=(3,), dtype="float32"
... )
>>> udf = MyUDF(aux_data=aux1)
>>> res = ctx.run_udf(dataset=dataset, udf=udf)
process_frame for frame (0, 7) received a view of aux_data with values [21., 22., 23.]:
>>> res['aux_dump'].data[0, 7]
'[21. 22. 23.]'
"""
buf = AuxBufferWrapper(kind, extra_shape, dtype)
buf.set_buffer(data)
return buf
[docs]
class NoOpUDF(UDF):
'''
A UDF that does nothing and returns nothing.
This is useful for testing.
Parameters
----------
preferred_input_dtype : numpy.dtype
Perform dtype conversion. By default, this is :attr:`UDF.USE_NATIVE_DTYPE`.
'''
def __init__(self, preferred_input_dtype=UDF.USE_NATIVE_DTYPE) -> None:
super().__init__(preferred_input_dtype=preferred_input_dtype)
[docs]
def process_tile(self, tile):
'''
Do nothing.
'''
pass
[docs]
def get_result_buffers(self):
'''
No result buffers.
'''
return {}
def check_cast(fromvar, tovar):
if not np.can_cast(fromvar.dtype, tovar.dtype, casting='safe'):
# FIXME exception or warning?
raise TypeError(f"Unsafe automatic casting from {fromvar.dtype} to {tovar.dtype}")
class UDFParams:
def __init__(
self,
kwargs: list[dict],
roi: Optional[np.ndarray],
corrections: Optional[CorrectionSet],
tiling_scheme: TilingScheme,
):
"""
Container class for UDF parameters for multiple UDFs
Parameters
----------
kwargs : List[dict]
List of kwargs which can be used to re-create the UDF instances
that were passed to `run_for_dataset`
roi
Boolean array to select parts of the navigation space
corrections
Corrections to apply
"""
self._kwargs = kwargs
self._roi = roi
self._corrections = corrections
self._tiling_scheme = tiling_scheme
@classmethod
def from_udfs(
cls,
udfs: Iterable[UDF],
roi: Optional[np.ndarray],
corrections: Optional[CorrectionSet],
tiling_scheme: TilingScheme,
):
kwargs = [udf._kwargs for udf in udfs]
return cls(kwargs, roi, corrections, tiling_scheme)
def patch(self, patch):
for p, kwargs in zip(patch, self._kwargs):
kwargs.update(p)
@property
def roi(self):
return self._roi
@property
def corrections(self):
return self._corrections
@property
def kwargs(self):
return self._kwargs
@property
def tiling_scheme(self):
return self._tiling_scheme
class Task:
"""
A computation on a partition. Inherit from this class and implement ``__call__``
for your specific computation.
.. versionchanged:: 0.4.0
Moved from libertem.job.base to libertem.udf.base as part of Job API deprecation
"""
def __init__(self, partition: Partition, idx: int, span_context: "SpanContext"):
self.partition = partition
self.idx = idx
self._span_context = span_context
self._progress = False
def get_partition(self):
return self.partition
def get_locations(self):
return self.partition.get_locations()
def get_resources(self) -> ResourceDef:
'''
Specify the resources that a Task will use.
The resources are designed to work with resource tags in Dask clusters:
See https://distributed.dask.org/en/latest/resources.html
The resources allow scheduling of CPU-only compute, CUDA-only compute,
(CPU or CUDA) hybrid compute, and service tasks in such a way that all
resources are used without oversubscription. Furthermore, they
distinguish if the given resource can be accessed with a transparent
NumPy ndarray interface -- namely if CuPy is installed to access CUDA
resources.
Each CPU worker gets one CPU, one compute and one ndarray resource
assigned. Each CUDA worker gets one CUDA and one compute resource
assigned. If CuPy is installed, it additionally gets an ndarray resource
assigned. A service worker doesn't get any resources assigned.
A CPU-only task consumes one CPU, one ndarray and one compute resource,
i.e. it will be scheduled only on CPU workers. A CUDA-only task consumes
one CUDA, one compute and possibly an ndarray resource, i.e. it will
only be scheduled on CUDA workers. A hybrid task that can run on both
CPU or CUDA using a transparent ndarray interface only consumes a
compute and an ndarray resource, i.e. it can be scheduled on CPU workers
or CUDA workers with CuPy. A service task doesn't request any resources
and can therefore run on CPU, CUDA and service workers.
Which device a hybrid task uses is only decided at runtime by the
environment variables that are set on each worker process.
That way, CPU-only and CUDA-only tasks can run in parallel without
oversubscription, and hybrid tasks use whatever compute workers are free
at the time. Furthermore, it allows using CUDA without installing CuPy.
See https://github.com/LiberTEM/LiberTEM/pull/760 for detailed
discussion.
Compute tasks will not run on service workers, so that these can serve
shorter service tasks with lower latency.
At the moment, workers only get a single "item" of each resource
assigned since we run one process per CPU. Requesting more of a resource
than any of the workers has causes a :class:`RuntimeError`, including
requesting a resource that is not available at all.
For that reason one has to make sure that the workers are set up with
the correct resources and matching environment.
:meth:`libertem.utils.devices.detect`,
:meth:`libertem.executor.dask.cluster_spec` and
:meth:`libertem.executor.dask.DaskJobExecutor.make_local` can be used to
set up a local cluster correctly.
.. versionadded:: 0.6.0
'''
# default: run only on CPU and do computation
# No CUDA support for the deprecated Job interface
return {'CPU': 1, 'compute': 1, 'ndarray': 1}
def get_tracing_span_context(self) -> "SpanContext":
return self._span_context
def __call__(self, params: UDFParams, env: Environment):
raise NotImplementedError()
def report_progress(self):
self._progress = True
def _get_canonical_backends(backends: Optional[BackendSpec]) -> Sequence[ArrayBackend]:
"""
Convert from either an iterable of backends or a simple string form into a
canonical form of Sequence[str]
"""
if backends is None:
return ()
if isinstance(backends, str):
backends = (backends,)
return tuple(backends)
def get_resources_for_backends(
udf_backends: list[BackendSpec],
user_backends: Optional[BackendSpec]
) -> ResourceDef:
"""
Find the resource definition that is appropriate for the backends specified
by the UDFs and the user. This is the intersection between the backend specified
in each UDF, and the backend that was sepected by the user.
Parameters
----------
udf_backends
The backends specified by each UDF that we want to run
user_backends
The backends specified by the user
See :meth:`Task.get_resources` for details.
"""
udf_backends_canonical = [
_get_canonical_backends(bs)
for bs in udf_backends
]
user_backends_canonical = _get_canonical_backends(user_backends)
needs_cuda = 0
needs_cpu = 0
needs_ndarray = 0
# Limit to externally specified backends
for backend_set in udf_backends_canonical:
if user_backends_canonical:
backends = set(user_backends_canonical).intersection(backend_set)
else:
backends = set(backend_set)
needs_cuda += backends.isdisjoint(CPU_BACKENDS)
needs_cpu += backends.isdisjoint(CUDA_BACKENDS)
needs_ndarray += CUDA not in backends
if needs_cuda and needs_cpu:
raise ValueError(
"There is no common supported UDF backend (have: %r, limited to %r)"
% (udf_backends, user_backends_canonical)
)
result: ResourceDef = {'compute': 1}
if needs_cpu:
result['CPU'] = 1
if needs_cuda:
result['CUDA'] = 1
if needs_ndarray:
result['ndarray'] = 1
return result
class UDFTask(Task):
def __init__(
self,
partition: Partition,
idx: int,
udf_classes: list[type[UDF]],
udf_backends: list[BackendSpec],
user_backends: Optional[BackendSpec],
runner_cls: type['UDFPartRunner'],
span_context: "SpanContext",
task_frames: int,
):
"""
A computation for a single partition. The parameters that stay the same
for the whole dataset are excluded here and supplied by the executor in
__call__.
Parameters
----------
partition : Partition
The partition to work on
idx : int
the index of the task, used to identify results (?)
udf_classes : List[Type[UDF]]
The UDFs to run
backends : List[str]
The specified backends we want to run on
task_frames : int
The number of frames this task must process (ROI included)
"""
super().__init__(partition=partition, idx=idx, span_context=span_context)
self._udf_classes = udf_classes
self._udf_backends = udf_backends
self._user_backends = user_backends
self._runner_cls = runner_cls
self._task_frames = task_frames
def __call__(self, params: UDFParams, env: Environment) -> tuple[UDFData, ...]:
with self._propagate_tracing(), tracer.start_as_current_span("UDFTask.__call__"):
udfs = [
cls.new_for_partition(kwargs, self.partition, params.roi)
for cls, kwargs in zip(self._udf_classes, params.kwargs)
]
return self._runner_cls(udfs, progress=self._progress).run_for_partition(
self.partition, params, env, self._user_backends,
)
@contextmanager
def _propagate_tracing(self):
"""
Initialize the current tracing context from the :code:`SpanContext`
given in `self._span_context`, if we don't already have a running
tracing context.
"""
if opentelemetry_context.get_current():
yield
else:
with attach_to_parent(self._span_context):
yield
def get_runner_cls(self) -> type["UDFPartRunner"]:
return self._runner_cls
def get_udf_classes(self) -> list[type["UDF"]]:
return self._udf_classes
def get_resources(self) -> ResourceDef:
"""
Intersection of resources of all UDFs, throws if empty.
See docstring of super class for details.
"""
return get_resources_for_backends(self._udf_backends, user_backends=self._user_backends)
def __repr__(self):
return f"<UDFTask {self._udf_classes!r}>"
@property
def task_frames(self) -> int:
"""
The number of frames to be processed in this task
(accounting for ROI)
"""
return self._task_frames
class UDFPartRunner:
def __init__(self, udfs: list[UDF], debug: bool = False, progress: bool = False):
self._udfs = udfs
self._debug = debug
self._progress = progress
def run_for_partition(
self,
partition: Partition,
params: UDFParams,
env: Environment,
backend_choice: BackendSpec
) -> tuple[UDFData, ...]:
roi = params.roi
corrections = params.corrections
if backend_choice is None:
backend_choice = BACKENDS
backend_choice = frozenset(_get_canonical_backends(backend_choice))
with env.enter():
try:
previous_id = None
device_class = get_device_class()
if device_class == 'cpu':
available_backends = backend_choice.intersection(CPU_BACKENDS)
elif device_class == 'cuda':
if has_cupy():
available_backends = backend_choice.intersection(BACKENDS)
else:
available_backends = backend_choice.intersection(
CPU_BACKENDS.union((CUDA, ))
)
else:
raise RuntimeError(f"Unknown device class {device_class}.")
# This will raise an exception if unavailable backends would be used
ds_backend, execution_plan = _execution_plan(
udfs=self._udfs,
ds=partition.meta,
device_class=device_class,
available_backends=available_backends,
)
if CUPY_BACKENDS.intersection(execution_plan.keys()):
# Avoid importing if not used
import cupy
device = get_use_cuda()
previous_id = cupy.cuda.Device().id
cupy.cuda.Device(device).use()
dtype = self._init_udfs(
execution_plan, partition, roi, corrections, device_class, env,
params.tiling_scheme,
)
partition.set_corrections(corrections)
if env.worker_context is not None:
partition.set_worker_context(env.worker_context)
self._run_udfs(
ds_backend, execution_plan, partition, params.tiling_scheme, roi, dtype
)
self._wrapup_udfs(partition)
finally:
if previous_id is not None:
cupy.cuda.Device(previous_id).use()
# Make sure results are in the same order as the UDFs
return tuple(udf.results for udf in self._udfs)
def _run_udfs(
self,
ds_backend: ArrayBackend,
execution_plan: ExecutionPlan,
partition: Partition,
tiling_scheme: TilingScheme,
roi: Optional[np.ndarray],
dtype,
) -> None:
# FIXME pass information on target location (numpy or cupy)
# to dataset so that is can already move it there.
# In the future, it might even decode data on the device instead of CPU
tiles = partition.get_tiles(
tiling_scheme=tiling_scheme,
roi=roi, dest_dtype=dtype,
array_backend=ds_backend,
)
# type explicitly to help mypy
partition_progress: Union[PartitionProgressTracker, PartitionTrackerNoOp]
if self._progress:
partition_progress = PartitionProgressTracker(partition)
else:
partition_progress = PartitionTrackerNoOp()
partition_progress.signal_start()
# Save udf process_ method to avoid repeated isinstance() checks
# in the future could be computed along with the execution_plan
# or better still computed on the main node to ensure consistency
# with the tiling scheme
udf_methods = {
backend: tuple(udf.get_method() for udf in udfs)
for backend, udfs in execution_plan.items()
}
for backend, udfs in execution_plan.items():
for udf in udfs:
udf.meta.set_valid_nav_mask(None)
try:
for tile in tiles:
converter = TileConverter(tile)
for backend, udfs in execution_plan.items():
methods = udf_methods[backend]
device_tile = converter.get(backend)
self._run_tile(
zip(udfs, methods), partition, tile, backend, device_tile, roi=roi
)
partition_progress.signal_tile_complete(tile)
except AttributeError as e:
removed = {
'tile_slice': 'self.meta.slice',
'scheme_idx': 'self.meta.tiling_scheme_idx',
}
for r, repl in removed.items():
if r in e.args[0]:
raise AttributeError(
f'Attribute {r} for input tiles was removed. Please use {repl} instead.'
)
raise
# We could signal partition completion here, but this is already
# handled on the main node via ProgressManager.finalize_task(task)
# as it is the fallback method for executors that don't support comms
# partition_progress.signal_complete()
def _init_udfs(
self,
execution_plan: ExecutionPlan,
partition: Partition,
roi: Optional[np.ndarray],
corrections: CorrectionSet,
device_class,
env: Environment,
tiling_scheme: TilingScheme,
) -> "nt.DTypeLike":
dtype = _get_dtype(self._udfs, partition.dtype, corrections, execution_plan.keys())
for backend, udfs in execution_plan.items():
pslice_for_roi = partition.slice.adjust_for_roi(roi)
meta = UDFMeta(
partition_slice=pslice_for_roi,
dataset_shape=partition.meta.shape,
roi=roi,
dataset_dtype=partition.dtype,
input_dtype=dtype,
tiling_scheme=tiling_scheme,
corrections=corrections,
device_class=device_class,
threads_per_worker=env.threads_per_worker,
array_backend=backend,
)
for udf in udfs:
udf.set_backend(backend)
udf.set_meta(meta)
# validate that get_method returns a valid method type
# the presence of the method was checked on the main node
if udf.get_method() not in tuple(UDFMethod): # pragma: no cover
# Should never be reached
raise UDFException('UDF.get_method() returned unrecognized')
udf.set_slice(pslice_for_roi)
udf.init_result_buffers()
udf.allocate_for_part(partition, roi)
udf.init_task_data()
if isinstance(udf, UDFPreprocessMixin):
udf.clear_views()
udf.preprocess()
return dtype
def _run_tile(
self,
udfs_and_methods: Iterable[
tuple[
UDF,
UDFMethod,
]
],
partition: Partition,
tile: DataTile,
array_backend: ArrayBackend,
device_tile: Any,
roi: Optional[np.ndarray],
) -> None:
for udf, udf_method in udfs_and_methods:
if udf_method == UDFMethod.TILE:
udf.set_contiguous_views_for_tile(partition, tile)
udf.set_slice(tile.tile_slice)
udf.set_tile_idx(tile.scheme_idx)
udf_tile = typing.cast(UDFTileMixin, udf)
udf_tile.process_tile(device_tile)
elif udf_method == UDFMethod.FRAME:
tile_slice = tile.tile_slice
udf_frame = typing.cast(UDFFrameMixin, udf)
for frame_idx, frame in enumerate(device_tile):
frame_slice = Slice(
origin=(tile_slice.origin[0] + frame_idx,) + tile_slice.origin[1:],
shape=Shape((1,) + tuple(tile_slice.shape)[1:],
sig_dims=tile_slice.shape.sig.dims),
)
# Internal checks for dataset consistency
assert frame_slice.shape[0] == 1
# assert .... or True to allow disabling this check
# in a production run for performance
if array_backend in ND_BACKENDS:
# The sig dimension is squeezed out
assert check_shape(frame, frame_slice.shape[1:]) or True
elif array_backend in D2_BACKENDS:
# The matrix backend of the 2D arrays adds a sig dimension of 1
assert check_shape(frame, frame_slice.shape) or True
udf.set_slice(frame_slice)
udf.set_views_for_frame(partition, tile, frame_idx)
udf_frame.process_frame(frame)
elif udf_method == UDFMethod.PARTITION:
# Internal checks for dataset consistency
assert partition.slice.adjust_for_roi(roi) == tile.tile_slice
udf.set_views_for_tile(partition, tile)
udf.set_slice(tile.tile_slice)
udf_partition = typing.cast(UDFPartitionMixin, udf)
udf_partition.process_partition(device_tile)
else: # pragma: no cover
# Should never be reached!
raise UDFException('UDF.get_method() return unrecognized '
'method during _run_udfs')
def _wrapup_udfs(
self,
partition: Partition
) -> None:
for udf in self._udfs:
udf.flush(self._debug)
if isinstance(udf, UDFPostprocessMixin):
udf.clear_views()
udf.postprocess()
udf.cleanup()
udf.clear_views()
udf.export_results()
if self._debug:
try:
cloudpickle.loads(cloudpickle.dumps(partition))
except TypeError:
raise TypeError("could not pickle partition")
try:
cloudpickle.loads(cloudpickle.dumps(
[u.results for u in self._udfs]
))
except TypeError:
raise TypeError("could not pickle results")
class UDFRunner:
@staticmethod
def _apply_part_result(
udfs: Iterable[UDF],
damage: BufferWrapper,
part_results: tuple[UDFData, ...],
task: TaskProtocol,
):
raw_damage = damage.raw_data
assert raw_damage is not None
for results, udf in zip(part_results, udfs):
udf.meta.set_valid_nav_mask(raw_damage)
udf.set_views_for_partition(task.get_partition())
udf.merge(
dest=udf.results.get_proxy(),
src=results.get_proxy()
)
v = damage.get_view_for_partition(task.get_partition())
v[:] = True
@staticmethod
def _make_udf_result(udfs: Iterable[UDF], damage: BufferWrapper) -> "UDFResults":
raw_damage = damage.raw_data
assert raw_damage is not None
for udf in udfs:
udf.meta.set_valid_nav_mask(raw_damage)
udf.clear_views()
return UDFResults(
buffers=tuple(
udf._do_get_results()
for udf in udfs
),
damage=damage
)
def __init__(self, udfs: list[UDF], debug: bool = False,
progress_reporter: Optional['ProgressReporter'] = None):
self._udfs = udfs
self._debug = debug
self._pool = TracedThreadPoolExecutor(tracer, max_workers=4)
self._progress_reporter = progress_reporter
@classmethod
def inspect_udf(
cls,
udf: UDF,
dataset: DataSet,
roi: Optional[np.ndarray] = None,
) -> dict[str, BufferWrapper]:
"""
Return result buffer declarations for a given UDF/DataSet/roi combination
"""
ds_backend, execution_plan = _execution_plan(
udfs=(udf, ),
ds=dataset,
device_class=None,
# For the sake of dtype determination we'll assume all backends are available.
# On the workers some might be missing. The cupyx.scipy.sparse backends might
# trigger an up-conversion to float that may or may not happen in reality.
# By being inclusive with backends here we make sure that the result buffers
# can hold whatever the workers may be sending.
available_backends=BACKENDS,
)
dtype = _get_dtype(
[udf],
dataset.dtype,
corrections=None,
array_backends=execution_plan.keys(),
)
meta = UDFMeta(
partition_slice=None,
dataset_shape=dataset.shape,
roi=None,
dataset_dtype=dataset.dtype,
input_dtype=dtype,
corrections=None,
)
udf = udf.copy()
udf.set_meta(meta)
buffers = udf.get_result_buffers()
for buf in buffers.values():
buf.set_shape_ds(dataset.shape, roi)
return buffers
@classmethod
def dry_run(cls, udfs, dataset, roi=None):
"""
Return result buffers for a given UDF/DataSet/roi combination
exactly as running the UDFs would, just skipping execution and
merging of the processing tasks.
This can be used to create an empty result to initialize live plots
before running an UDF.
"""
with tracer.start_as_current_span("UDFRunner.dry_run"):
runner = cls(udfs)
executor = InlineJobExecutor()
res = runner.run_for_dataset(
dataset=dataset,
executor=executor,
roi=roi,
dry=True
)
return res
def _debug_task_pickling(self, tasks: list[UDFTask]) -> None:
if self._debug:
cloudpickle.loads(cloudpickle.dumps(tasks))
def _check_preconditions(self, dataset: DataSet, roi: Optional[np.ndarray]) -> None:
if roi is not None and prod(roi.shape) != prod(dataset.shape.nav):
raise ValueError(
"roi: incompatible shapes: {} (roi) vs {} (dataset)".format(
roi.shape, dataset.shape.nav
)
)
def _prepare_run_for_dataset(
self,
dataset: DataSet,
executor: JobExecutor,
roi: Optional[np.ndarray],
corrections: Optional[CorrectionSet],
backends: Optional[BackendSpec],
dry: bool,
) -> tuple[list[UDFTask], UDFParams]:
self._check_preconditions(dataset, roi)
if backends is None:
backends = BACKENDS
backends = _get_canonical_backends(backends)
ds_backend, execution_plan = _execution_plan(
udfs=self._udfs,
ds=dataset,
# To include all possible backends without preference for device class
device_class=None,
# For the sake of dtype determination we'll assume all backends are available.
# On the workers some might be missing. The cupyx.scipy.sparse backends might
# trigger an up-conversion to float that may or may not happen in reality.
# By being inclusive with backends here we make sure that the result buffers
# can hold whatever the workers may be sending.
available_backends=backends,
)
dtype = _get_dtype(self._udfs, dataset.dtype, corrections, execution_plan.keys())
meta = UDFMeta(
partition_slice=None,
dataset_shape=dataset.shape,
roi=roi,
dataset_dtype=dataset.dtype,
input_dtype=dtype,
corrections=corrections,
# We don't specify the dataset input backend here since
# that will be determined dynamically on the workers.
)
for udf in self._udfs:
udf.set_meta(meta)
# ideally the UDF would know the set of workers it might run on
# here, e.g. backend information, to choose the best method,
# as-is the backend will always be numpy by default
udf_method = udf.get_method()
if udf_method not in tuple(UDFMethod):
raise UDFException('UDF.get_method() returned unrecognized value')
elif udf_method == UDFMethod.TILE and not isinstance(udf, UDFTileMixin):
raise UDFException(f'UDF declared method {udf_method.value} but '
'does not implement process_tile.')
elif udf_method == UDFMethod.FRAME and not isinstance(udf, UDFFrameMixin):
raise UDFException(f'UDF declared method {udf_method.value} but '
'does not implement process_frame.')
elif udf_method == UDFMethod.PARTITION and not isinstance(udf, UDFPartitionMixin):
raise UDFException(f'UDF declared method {udf_method.value} but '
'does not implement process_partition.')
udf.init_result_buffers(executor=executor)
udf.allocate_for_full(dataset, roi)
if isinstance(udf, UDFPreprocessMixin):
udf.set_views_for_dataset(dataset)
udf.preprocess()
with tracer.start_as_current_span('tileshape negotiation'):
neg = Negotiator()
# FIXME take compute backend into consideration as well
# Other boundary conditions when moving input data to device
# FIXME: approximate partition shape here
partition = next(dataset.get_partitions())
tiling_scheme = neg.get_scheme(
udfs=self._udfs,
approx_partition_shape=partition.shape,
dataset=dataset,
read_dtype=meta.input_dtype,
roi=roi,
corrections=corrections,
)
params = UDFParams.from_udfs(
udfs=self._udfs,
roi=roi,
corrections=corrections,
tiling_scheme=tiling_scheme,
)
if dry:
tasks = []
else:
tasks = list(self._make_udf_tasks(dataset, roi, backends))
return (tasks, params)
def run_for_dataset(
self,
dataset: DataSet,
executor: JobExecutor,
roi: Optional[np.ndarray] = None,
progress: bool = False,
corrections: Optional[CorrectionSet] = None,
backends: Optional[BackendSpec] = None,
dry: bool = False
) -> "UDFResults":
with tracer.start_as_current_span("UDFRunner.run_for_dataset"):
for res in self.run_for_dataset_sync(
dataset=dataset,
executor=executor.ensure_sync(),
roi=roi,
progress=progress,
corrections=corrections,
backends=backends,
dry=dry,
iterate=False
):
pass
return res # type: ignore
def results_for_dataset_sync(
self,
dataset: DataSet,
executor: JobExecutor,
roi: Optional[np.ndarray] = None,
progress: bool = False,
corrections: Optional[CorrectionSet] = None,
backends: Optional[BackendSpec] = None,
dry: bool = False,
) -> tuple[
Iterable[tuple[tuple[UDFData, ...], TaskProtocol]],
Any,
UDFParams
]:
with tracer.start_as_current_span("_prepare_run_for_dataset"):
tasks, params = self._prepare_run_for_dataset(
dataset, executor, roi, corrections, backends, dry
)
with tracer.start_as_current_span("before inner work"):
cancel_id = str(uuid.uuid4())
self._debug_task_pickling(tasks)
executor = executor.ensure_sync()
if dry:
task_comm_handler: TaskCommHandler = NoopCommHandler()
else:
task_comm_handler = dataset.get_task_comm_handler()
def _inner():
pman = None
try:
if progress and tasks:
pman = ProgressManager(tasks, cancel_id, reporter=self._progress_reporter)
pman.connect(task_comm_handler)
for task in tasks:
task.report_progress()
with executor.scatter(params) as params_handle:
# XXX yuck, refactor this?
yield params_handle
if tasks:
for res in executor.run_tasks(
tasks,
params_handle,
cancel_id,
task_comm_handler,
):
if progress:
pman.finalize_task(res[1])
yield res
finally:
if progress and tasks and pman is not None:
pman.close()
it = _inner()
params_handle = next(it)
return it, params_handle, params
def run_for_dataset_sync(
self,
dataset: DataSet,
executor: JobExecutor,
roi: Optional[np.ndarray] = None,
progress: bool = False,
corrections: Optional[CorrectionSet] = None,
backends: Optional[BackendSpec] = None,
dry: bool = False,
iterate: bool = True
) -> ResultsForDataSet:
executor = executor.ensure_sync()
result_iter, params_handle, params = self.results_for_dataset_sync(
dataset=dataset,
executor=executor,
roi=roi,
progress=progress,
corrections=corrections,
backends=backends,
dry=dry,
)
damage = BufferWrapper(kind='nav', dtype=bool)
damage.set_shape_ds(dataset.shape, roi)
damage.allocate()
def _inner():
num_results = 0
try:
for part_results, task in result_iter:
num_results += 1
with tracer.start_as_current_span("_apply_part_result -> UDF.merge"):
self._apply_part_result(
udfs=self._udfs,
damage=damage,
part_results=part_results,
task=task
)
if iterate:
yield self._make_udf_result(
udfs=self._udfs,
damage=damage
)
if num_results == 0 or not iterate:
yield self._make_udf_result(
udfs=self._udfs,
damage=damage
)
except JobCancelledError:
raise UDFRunCancelled(f"UDF run cancelled after {num_results} partitions")
gen = _inner()
return ResultsForDataSet(
gen=gen,
params_handle=params_handle,
params=params,
executor=executor,
udfs=self._udfs,
)
async def run_for_dataset_async(
self,
dataset: DataSet,
executor: JobExecutor,
cancel_id,
roi: Optional[np.ndarray] = None,
corrections: Optional[CorrectionSet] = None,
backends: Optional[BackendSpec] = None,
progress: bool = False,
dry: bool = False,
iterate: bool = True,
) -> AsyncGenerator["UDFResults", None]:
with tracer.start_as_current_span("UDFRunner.run_for_dataset_async"):
gen = self.run_for_dataset_sync(
dataset=dataset,
executor=executor.ensure_sync(),
roi=roi,
progress=progress,
corrections=corrections,
backends=backends,
dry=dry,
iterate=iterate,
)
async for res in async_generator_eager(gen, pool=self._pool):
yield res
def _roi_for_partition(self, roi: np.ndarray, partition: Partition) -> np.ndarray:
return roi_for_partition(roi, partition)
def _make_udf_tasks(
self,
dataset: DataSet,
roi: Optional[np.ndarray],
backends: Optional[BackendSpec]
) -> Generator[UDFTask, None, None]:
udf_backends = [
udf.get_backends()
for udf in self._udfs
]
span = trace.get_current_span()
span_context = span.get_span_context()
for idx, partition in enumerate(dataset.get_partitions()):
partition.set_idx(idx)
num_frames = partition.get_frame_count(roi)
if not num_frames:
# roi is empty for this partition or zero-length, ignore
continue
udf_classes = [
udf.__class__
for udf in self._udfs
]
tasks = UDFTask(
partition=partition, idx=idx, udf_classes=udf_classes,
udf_backends=udf_backends,
user_backends=backends,
runner_cls=self.get_part_runner_cls(),
span_context=span_context,
task_frames=num_frames,
)
yield tasks
@classmethod
def get_part_runner_cls(cls) -> type['UDFPartRunner']:
return UDFPartRunner
UDFResultDict = Mapping[str, BufferWrapper]
[docs]
class UDFResults:
'''
Container class to combine UDF results with additional information.
This class allows to return additional information from UDF execution
together with UDF result buffers. This is currently used to pass
"damage" information when running UDFs as an iterator using
:meth:`libertem.api.Context.run_udf_iter`. "Damage" is
a map of the nav space that is set to :code:`True`
for all positions that have already been processed.
.. versionadded:: 0.7.0
Parameters
----------
buffers
Iterable containing the result buffer dictionaries for each of the UDFs being executed
damage : BufferWrapper
:class:`libertem.common.buffers.BufferWrapper` of :code:`kind='nav'`, :code:`dtype=bool`.
It is set to :code:`True` for all positions in nav space that have been processed already.
'''
def __init__(self, buffers: Iterable[UDFResultDict], damage: BufferWrapper):
self.buffers = tuple(buffers)
self.damage = damage