from collections import defaultdict
from contextlib import contextmanager
from typing import (
Any, AsyncGenerator, Dict, Generator, Iterator, Mapping, Optional, List,
Tuple, Type, Iterable, TypeVar, Union, Set, TYPE_CHECKING
)
from typing_extensions import Protocol, runtime_checkable, Literal
import warnings
import logging
import uuid
import cloudpickle
import numpy as np
from opentelemetry import trace, context as opentelemetry_context
from libertem.common.tracing import attach_to_parent, TracedThreadPoolExecutor
from libertem.common.progress import ProgressManager, PartitionProgressTracker, PartitionTrackerNoOp
from libertem.io.dataset.base.tiling import DataTile
from libertem.warnings import UseDiscouragedWarning
from libertem.exceptions import UDFException
from libertem.common.buffers import (
BufferWrapper, AuxBufferWrapper, PlaceholderBufferWrapper,
BufferKind, BufferUse, BufferLocation,
)
from libertem.common import Shape, Slice
from libertem.common.udf import TilingPreferences, UDFProtocol
from libertem.common.math import prod
from libertem.io.dataset.base import (
TilingScheme, Negotiator, Partition, DataSet, get_coordinates
)
from libertem.io.corrections import CorrectionSet
from libertem.io.dataset.base.roi import roi_for_partition
from libertem.common.backend import get_use_cuda, get_device_class
from libertem.common.async_utils import async_generator_eager
from libertem.executor.inline import InlineJobExecutor
from libertem.common.executor import (
Environment, JobExecutor, TaskCommHandler, NoopCommHandler, TaskProtocol,
)
if TYPE_CHECKING:
from typing import OrderedDict
from numpy import typing as nt
from opentelemetry.trace import SpanContext
from libertem.common.progress import ProgressReporter
tracer = trace.get_tracer(__name__)
log = logging.getLogger(__name__)
Backend = Literal['numpy', 'cuda', 'cupy']
BackendSpec = Union[Backend, Iterable[Backend]]
ResourceDef = Dict[
Literal[
'CPU', 'compute', 'ndarray', 'CUDA',
],
int
]
DeviceClass = Literal['cpu', 'cuda']
UDFKwarg = Union[Any, AuxBufferWrapper]
UDFKwargs = Dict[str, UDFKwarg]
def _get_dtype(
udfs: List["UDF"],
dtype: "nt.DTypeLike",
corrections: Optional[CorrectionSet]
) -> "nt.DTypeLike":
tmp_dtype: "nt.DTypeLike"
if corrections is not None and corrections.have_corrections():
tmp_dtype = np.result_type(np.float32, dtype)
else:
tmp_dtype = np.dtype(dtype)
for udf in udfs:
tmp_dtype = np.result_type(
udf.get_preferred_input_dtype(),
tmp_dtype
)
return tmp_dtype
class MergeAttrMapping:
def __init__(self, dict_input: Dict[str, np.ndarray]):
self._dict: Dict[str, np.ndarray] = dict_input
def __iter__(self) -> Iterator[str]:
return iter(self._dict)
def __contains__(self, k: str) -> bool:
return k in self._dict
def __getattr__(self, k: str) -> np.ndarray:
return self._dict[k]
def __setattr__(self, k: str, v: np.ndarray) -> None:
if k in ['_dict']:
super().__setattr__(k, v)
else:
self._dict[k][:] = v
def __getitem__(self, k: str) -> np.ndarray:
warnings.warn(
"dict-like access is discouraged, as it can be "
"confusing vs. using attribute access",
UseDiscouragedWarning,
stacklevel=2,
)
return self._dict[k]
T = TypeVar('T')
class UDFData:
'''
Container for result buffers, return value from running UDFs
'''
def __init__(self, data: Dict[str, BufferWrapper]):
self._data = data
self._views: Dict[str, np.ndarray] = {}
def __repr__(self) -> str:
return "<UDFData: %r>" % (
self._data
)
def __getattr__(self, k: str) -> Union[np.ndarray, BufferWrapper]:
if k.startswith("_"):
raise AttributeError("no such attribute: %s" % k)
try:
return self._get_view_or_data(k)
except KeyError as e:
raise AttributeError(str(e))
def get_buffer(self, name: str) -> BufferWrapper:
"""
Return the `BufferWrapper` for buffer `name`
.. versionadded:: 0.7.0
"""
return self._data[name]
def set_buffer(self, name: str, buffer: BufferWrapper) -> None:
"""
Replace or set the `BufferWrapper` for buffer `name`
"""
assert isinstance(buffer, BufferWrapper)
self._data[name] = buffer
def get(
self, k: str, default: Optional[T] = None
) -> Optional[Union[T, np.ndarray, BufferWrapper]]:
try:
return self.__getattr__(k)
except (KeyError, AttributeError):
return default
def __setattr__(self, k: str, v: "nt.ArrayLike") -> None:
if not k.startswith("_"):
# convert UDFData.some_attr = something to array slice assignment
getattr(self, k)[:] = v
else:
super().__setattr__(k, v)
def _get_view_or_data(self, k: str) -> Union[np.ndarray, BufferWrapper]:
if k in self._views:
return self._views[k]
res = self._data[k]
if isinstance(res, BufferWrapper) and res.raw_data is not None:
return res.raw_data
return res
def __getitem__(self, k: str) -> BufferWrapper:
warnings.warn(
"dict-like access is discouraged, as it can be "
"confusing vs. using attribute access. Please use `get_buffer` instead, "
"if you really need the `BufferWrapper` and not the current view",
UseDiscouragedWarning,
stacklevel=2,
)
return self._data[k]
def __contains__(self, k: str) -> bool:
return k in self._data
def items(self):
return self._data.items()
def keys(self):
return self._data.keys()
def values(self):
return self._data.values()
def as_dict(self) -> Dict[str, BufferWrapper]:
return dict(self.items())
def get_proxy(self) -> MergeAttrMapping:
return MergeAttrMapping({
k: (self._views[k] if k in self._views else self._data[k].raw_data)
for k, v in self.items()
if v and v.has_data()
})
def _get_buffers(
self, filter_allocated: bool = False
) -> Generator[Tuple[str, BufferWrapper], None, None]:
for k, buf in self._data.items():
if not hasattr(buf, 'has_data') or (buf.has_data() and filter_allocated):
continue
yield k, buf
def allocate_for_part(self, partition: Partition, roi: Optional[np.ndarray], lib=None) -> None:
"""
allocate all BufferWrapper instances in this namespace.
for pre-allocated buffers (i.e. aux data), only set shape and roi
"""
for k, buf in self._get_buffers():
buf.set_shape_partition(partition, roi)
for k, buf in self._get_buffers(filter_allocated=True):
buf.allocate(lib=lib)
def allocate_for_full(self, dataset: DataSet, roi: Optional[np.ndarray]) -> None:
ds_partitions = [*dataset.get_partitions()]
for k, buf in self._get_buffers():
buf.set_shape_ds(dataset.shape, roi)
buf.add_partitions(ds_partitions)
for k, buf in self._get_buffers(filter_allocated=True):
buf.allocate()
def set_view_for_dataset(self, dataset: DataSet) -> None:
for k, buf in self._get_buffers():
self._views[k] = buf.get_view_for_dataset(dataset)
def set_view_for_partition(self, partition: Partition) -> None:
for k, buf in self._get_buffers():
self._views[k] = buf.get_view_for_partition(partition)
def set_view_for_tile(self, partition: Partition, tile: DataTile) -> None:
for k, buf in self._get_buffers():
self._views[k] = buf.get_view_for_tile(partition, tile)
def set_contiguous_view_for_tile(self, partition: Partition, tile: DataTile) -> None:
# .. versionadded:: 0.5.0
for k, buf in self._get_buffers():
self._views[k] = buf.get_contiguous_view_for_tile(partition, tile)
def flush(self, debug: bool = False) -> None:
# .. versionadded:: 0.5.0
for k, buf in self._get_buffers():
buf.flush(debug=debug)
def export(self) -> None:
# .. versionadded:: 0.6.0
for k, buf in self._get_buffers():
buf.export()
def set_view_for_frame(self, partition: Partition, tile: DataTile, frame_idx: int) -> None:
for k, buf in self._get_buffers():
self._views[k] = buf.get_view_for_frame(partition, tile, frame_idx)
def clear_views(self) -> None:
self._views = {}
class UDFKwargsWrapper(UDFData):
"""
Wrapper for UDF kwargs, used for slicing/viewing AUX data
"""
def __init__(self, data: UDFKwargs):
super().__init__(data)
self._data = data
self._views = {}
def _get_buffers(
self, filter_allocated: bool = False
) -> Generator[Tuple[str, AuxBufferWrapper], None, None]:
for k, buf in self._data.items():
if isinstance(buf, AuxBufferWrapper):
if buf.has_data() and filter_allocated:
continue
yield k, buf
def new_for_partition(self, partition: Partition, roi: np.ndarray):
for k, buf in self._get_buffers():
self._data[k] = buf.new_for_partition(partition, roi)
[docs]@runtime_checkable
class UDFFrameMixin(Protocol):
'''
Implement :code:`process_frame` for per-frame processing.
'''
[docs] def process_frame(self, frame: np.ndarray):
"""
Implement this method to process the data on a frame-by-frame manner.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
- `self.meta` - meta data about the current operation and data set
Parameters
----------
frame : numpy.ndarray or cupy.ndarray
A single frame or signal element from the dataset.
The shape is the same as `dataset.shape.sig`. In case of pixelated
STEM / scanning diffraction data this is 2D, for spectra 1D etc.
"""
raise NotImplementedError()
[docs]@runtime_checkable
class UDFTileMixin(Protocol):
'''
Implement :code:`process_tile` for per-tile processing.
'''
[docs] def process_tile(self, tile: np.ndarray):
"""
Implement this method to process the data in a tiled manner.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
- `self.meta` - meta data about the current operation and data set
Parameters
----------
tile : numpy.ndarray or cupy.ndarray
A small number N of frames or signal elements from the dataset.
The shape is (N,) + `dataset.shape.sig`. In case of pixelated
STEM / scanning diffraction data this is 3D, for spectra 2D etc.
"""
raise NotImplementedError()
[docs]@runtime_checkable
class UDFPartitionMixin(Protocol):
'''
Implement :code:`process_partition` for per-partition processing.
'''
[docs] def process_partition(self, partition: np.ndarray) -> None:
"""
Implement this method to process the data partitioned into large
(100s of MiB) partitions.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
- `self.meta` - meta data about the current operation and data set
Note
----
Only use this method if you know what you are doing; especially if
you are running a processing pipeline with multiple steps, or multiple
processing pipelines at the same time, performance may be adversely
impacted.
Parameters
----------
partition : numpy.ndarray or cupy.ndarray
A large number N of frames or signal elements from the dataset.
The shape is (N,) + `dataset.shape.sig`. In case of pixelated
STEM / scanning diffraction data this is 3D, for spectra 2D etc.
"""
raise NotImplementedError()
[docs]@runtime_checkable
class UDFPreprocessMixin(Protocol):
'''
Implement :code:`preprocess` to initialize the result buffers of a partition on the worker
before the partition data is processed.
.. versionadded:: 0.3.0
'''
[docs] def preprocess(self) -> None:
"""
Implement this method to preprocess the result data for a partition.
This can be useful to initialize arrays of
:code:`dtype='object'` with the correct container types, for example.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
"""
raise NotImplementedError()
[docs]@runtime_checkable
class UDFPostprocessMixin(Protocol):
'''
Implement :code:`postprocess` to modify the resulf buffers of a partition on the worker
after the partition data has been completely processed, but before it is returned to the
main node for the final merging step.
'''
[docs] def postprocess(self) -> None:
"""
Implement this method to postprocess the result data for a partition.
This can be useful in combination with process_tile() to implement
a postprocessing step that requires the reduced results for whole frames.
Data available in this method:
- `self.params` - the parameters of this UDF
- `self.task_data` - task data created by `get_task_data`
- `self.results` - the result buffer instances
"""
raise NotImplementedError()
[docs]@runtime_checkable
class UDFMergeAllMixin(Protocol):
[docs] def merge_all(
self, ordered_results: 'OrderedDict[Slice, MergeAttrMapping]'
) -> Mapping[str, 'nt.ArrayLike']:
"""
Combine stack of ordered partial results `ordered_results` to form complete result.
Combining can be more efficient than direct merging into a result buffer
for cases where the results are not NumPy arrays.
Currently this is only applicable for the
:class:`libertem.executor.delayed.DelayedJobExecutor`
where it provides an efficient pathway to construct Dask arrays from delayed UDF results.
The input and the returned arrays are in flattened navigation dimension with ROI applied.
Data available in this method:
- `self.params` - the parameters of this UDF
For UDFs with only :code:`kind='nav'` result buffers a default implementation
is used automatically.
Parameters
----------
ordered_results
Ordered dict mapping partition slice to UDF partial result
Returns
-------
dict[buffername] -> array_like
Dictionary mapping result buffer name to buffer content
Note
----
This function is running on the main node, which means `self.results`
and `self.task_data` are not available.
"""
raise NotImplementedError()
def _default_merge_all(udf, ordered_results: 'OrderedDict[Slice, MergeAttrMapping]'):
if udf.requires_custom_merge:
raise NotImplementedError(
"Default merging only works for kind='nav' buffers. "
"Please implement a suitable custom merge_all function."
)
result_chunks = defaultdict(lambda: [])
for b in ordered_results.values():
for key in b:
result_chunks[key].append(getattr(b, key))
result = {
# checking above assures that we only have kind='nav'
# where concatenation is the correct method.
k: np.concatenate(val)
for k, val in result_chunks.items()
}
return result
class UDFBase(UDFProtocol):
'''
Base class for UDFs with helper functions.
'''
def __init__(self, *args, **kwargs) -> None:
# this should not execute - it is mostly for getting the typing right:
self.params = UDFKwargsWrapper({})
self.results = UDFData({})
def get_task_data(self) -> Dict[str, Any]:
raise NotImplementedError()
def get_result_buffers(self) -> Dict[str, BufferWrapper]:
raise NotImplementedError()
def allocate_for_part(self, partition: Partition, roi: Optional[np.ndarray]) -> None:
for ns in [self.results]:
ns.allocate_for_part(partition, roi, lib=self.xp)
def allocate_for_full(self, dataset: DataSet, roi: Optional[np.ndarray]) -> None:
for ns in [self.params, self.results]:
ns.allocate_for_full(dataset, roi)
def set_views_for_dataset(self, dataset: DataSet) -> None:
for ns in [self.params]:
ns.set_view_for_dataset(dataset)
def set_views_for_partition(self, partition: Partition) -> None:
for ns in [self.params, self.results]:
ns.set_view_for_partition(partition)
def set_views_for_tile(self, partition: Partition, tile: DataTile) -> None:
for ns in [self.params, self.results]:
ns.set_view_for_tile(partition, tile)
def set_contiguous_views_for_tile(self, partition: Partition, tile: DataTile) -> None:
# .. versionadded:: 0.5.0
for ns in [self.params, self.results]:
ns.set_contiguous_view_for_tile(partition, tile)
def flush(self, debug: bool = False) -> None:
# .. versionadded:: 0.5.0
for ns in [self.params, self.results]:
ns.flush(debug=debug)
def set_views_for_frame(self, partition: Partition, tile: DataTile, frame_idx: int):
for ns in [self.params, self.results]:
ns.set_view_for_frame(partition, tile, frame_idx)
def clear_views(self) -> None:
for ns in [self.params, self.results]:
ns.clear_views()
def init_task_data(self) -> None:
self.task_data = UDFData(self.get_task_data())
def init_result_buffers(self, executor=None) -> None:
"""
Create the UDFData instance containing BufferWrappers
for results of this UDF. Same method used for
Dataset- and Partition-sized buffers.
The executor argument is provided on the main node
only to allow modification of Dataset-sized buffers
buffers into more specialized forms.
#TODO use another mechanism to distinguish between
dataset and parititon-sized buffers when initializing,
such that we can modify buffers in both cases.
"""
self.results = UDFData(self.get_result_buffers())
if executor is not None:
for name, buffer in self.results.items():
new_buffer = executor.modify_buffer_type(buffer)
self.results.set_buffer(name, new_buffer)
def export_results(self) -> None:
# .. versionadded:: 0.6.0.dev0
self.results.export()
def set_meta(self, meta: UDFMeta) -> None:
self.meta = meta
def set_slice(self, slice_: Slice) -> None:
self.meta.slice = slice_
def set_tile_idx(self, idx: int) -> None:
self.meta.tiling_scheme_idx = idx
def set_backend(self, backend: Backend) -> None:
assert backend in self.get_backends()
self._backend = backend
def get_backends(self) -> BackendSpec:
raise NotImplementedError() # see impl in UDF.get_backends
@property
def xp(self):
'''
Compute back-end library to use.
Generally, use :code:`self.xp` instead of :code:`np` to use NumPy or CuPy transparently
.. versionadded:: 0.6.0
'''
# Implemented as property and not variable to avoid pickling issues
# tests/udf/test_simple_udf.py::test_udf_pickle
if self._backend == 'numpy' or self._backend == 'cuda':
return np
elif self._backend == 'cupy':
# Re-importing should be fast, right?
# Importing only here to avoid superfluous import
import cupy
# mocking for testing without actual CUDA device
# import numpy as cupy
return cupy
else:
raise ValueError(f"Backend name can be 'numpy', 'cuda' or 'cupy', got {self._backend}")
def get_method(self) -> Literal['tile', 'frame', 'partition']:
if hasattr(self, 'process_tile'):
return 'tile'
elif hasattr(self, 'process_frame'):
return 'frame'
elif hasattr(self, 'process_partition'):
return 'partition'
else:
raise TypeError("UDF should implement one of the `process_*` methods")
def _check_results(self, decl, arr, name):
"""
Check results in `arr` for buffer `name` for consistency with the
declaration in `decl`.
1) All returned buffers need to be declared
2) Private buffers can't be returned from `get_results`
3) The `dtype` of `arr` needs to be compatible with the declared `dtype`
"""
if name not in decl:
raise UDFException(
"buffer '%s' is not declared in `get_result_buffers` "
"(hint: `self.buffer(..., use='result_only')`" % name
)
buf_decl = decl[name]
if buf_decl.use == "private":
raise UDFException("Don't return `use='private'` buffers from `get_results`")
if np.dtype(arr.dtype).kind != np.dtype(buf_decl.dtype).kind:
raise UDFException(
"the returned ndarray '%s' has a different dtype kind (%s) "
"than declared (%s)" % (
name, arr.dtype, buf_decl.dtype
)
)
def get_results(self) -> Dict[str, np.ndarray]:
raise NotImplementedError()
_default_merge_all = UDFMergeAllMixin.merge_all
def _do_merge_all(self, ordered_results: 'OrderedDict[Slice, MergeAttrMapping]'):
if isinstance(self, UDFMergeAllMixin):
results_tmp = self.merge_all(ordered_results)
else:
# This will raise NotImplemementedError if preconditions for default merge
# are not met
results_tmp = _default_merge_all(self, ordered_results)
if not set(results_tmp.keys()).issubset(set(self.results.keys())):
raise ValueError(f'Returned result names from merge_all ({[*results_tmp.keys()]}) '
'are not contained within declared result buffer names '
f'({[*self.results.keys()]})')
for key, value in results_tmp.items():
# This SHOULD throw errors if sth doesn't match up about
# buffer name, shape or dtype
self.results.get_buffer(key).replace_array(value)
def _do_get_results(self) -> Mapping[str, BufferWrapper]:
results_tmp = self.get_results()
decl = self.get_result_buffers()
# include any results that were not explicitly included, but have non-private `use`:
results_tmp.update({
k: getattr(self.results, k)
for k, v in decl.items()
if k not in results_tmp and v.use is None
})
results_buffer_cls = {
k: self.results.get_buffer(k).result_buffer_type()
for k in results_tmp.keys()
}
# wrap numpy results into `ResultBuffer`s:
results = {}
for name, arr in results_tmp.items():
self._check_results(decl, arr, name)
buf_decl = decl[name]
buf = results_buffer_cls[name](
kind=buf_decl.kind, extra_shape=buf_decl.extra_shape,
dtype=buf_decl.dtype,
data=arr,
)
buf.set_shape_ds(self.meta.dataset_shape, self.meta.roi)
results[name] = buf
return results
[docs]class UDF(UDFBase):
"""
The main user-defined functions interface. You can implement your functionality
by overriding methods on this class.
If you override `__init__`, please take care,
as it is called multiple times during evaluation of a UDF. You can handle
some pre-conditioning of parameters, but you also have to accept the results
as input again.
Arguments passed as `**kwargs` will be automatically available on `self.params`
when running the UDF.
Example
-------
>>> class MyUDF(UDF):
... def __init__(self, param1, param2="def2", **kwargs):
... param1 = int(param1)
... if "param3" not in kwargs:
... raise TypeError("missing argument param3")
... super().__init__(param1=param1, param2=param2, **kwargs)
Parameters
----------
kwargs
Input parameters. They are scattered to the worker processes and
available as `self.params` from here on.
Values can be `BufferWrapper` instances, which, when accessed via
`self.params.the_key_here`, will automatically return a view corresponding
to the current unit of data (frame, tile, partition).
"""
def __init__(self, **kwargs: UDFKwarg) -> None:
self._backend = 'numpy' # default so that self.xp can always be used
self._kwargs = kwargs
self.params = UDFKwargsWrapper(kwargs)
self.task_data = UDFData({})
self.results = UDFData({})
self._requires_custom_merge = None
def copy(self) -> "UDF":
return self.__class__(**self._kwargs)
@classmethod
def new_for_partition(
cls,
kwargs: Dict[str, Any],
partition: Partition,
roi: np.ndarray,
) -> "UDF":
new_instance = cls(**kwargs)
new_instance.params.new_for_partition(partition, roi)
return new_instance
[docs] def copy_for_partition(self, partition: Partition, roi: np.ndarray) -> "UDF":
"""
create a copy of the UDF, specifically slicing aux data to the
specified pratition and roi
"""
return self.__class__.new_for_partition(self._kwargs, partition, roi)
[docs] def get_task_data(self) -> Dict[str, Any]:
"""
Initialize per-task data.
Per-task data can be mutable. Override this function
to allocate temporary buffers, or to initialize
system resources.
If you want to distribute static data, use
parameters instead.
Data available in this method:
- `self.params` - the input parameters of this UDF
- `self.meta` - relevant metadata, see :class:`UDFMeta` documentation.
Returns
-------
dict
Flat dict with string keys. Keys should
be valid python identifiers, which allows
access via `self.task_data.the_key_here`.
"""
return {}
[docs] def get_result_buffers(self) -> Dict[str, BufferWrapper]:
"""
Return result buffer declaration.
Values of the returned dict should be :class:`~libertem.common.buffers.BufferWrapper`
instances, which, when accessed via :code:`self.results.key`,
will automatically return a view corresponding to the
current unit of data (frame, tile, partition).
The values also need to be serializable via pickle.
Data available in this method:
- :code:`self.params` - the parameters of this UDF
- :code:`self.meta` - relevant metadata, see :class:`UDFMeta` documentation.
Please note that partition metadata will not be set when this method is
executed on the head node.
Returns
-------
dict
Flat dict with string keys. Keys should
be valid python identifiers, which allows
access via `self.results.the_key_here`.
"""
raise NotImplementedError()
@property
def requires_custom_merge(self):
"""
Determine if buffers with :code:`kind != 'nav'` are present where
the default merge doesn't work
.. versionadded:: 0.5.0
"""
if self._requires_custom_merge is None:
buffers = self.get_result_buffers()
self._requires_custom_merge = any(buffer.kind != 'nav' for buffer in buffers.values())
return self._requires_custom_merge
[docs] def merge(self, dest: MergeAttrMapping, src: MergeAttrMapping):
"""
Merge a partial result `src` into the current global result `dest`.
Data available in this method:
- `self.params` - the parameters of this UDF
Parameters
----------
dest
global results; you can access the ndarrays for each
buffer name (from `get_result_buffers`) by attribute access
(:code:`dest.your_buffer_name`)
src
results for a partition; you can access the ndarrays for each
buffer name (from `get_result_buffers`) by attribute access
(:code:`src.your_buffer_name`)
Note
----
This function is running on the main node, which means `self.results`
and `self.task_data` are not available.
"""
if self.requires_custom_merge:
raise NotImplementedError(
"Default merging only works for kind='nav' buffers. "
"Please implement a suitable custom merge function."
)
for k in dest:
check_cast(getattr(dest, k), getattr(src, k))
getattr(dest, k)[:] = getattr(src, k)
[docs] def get_results(self) -> Dict[str, np.ndarray]:
"""
Get results, allowing a postprocessing step on the main node after
a result has been merged. See also: :class:`UDFPostprocessMixin`.
.. versionadded:: 0.7.0
Note
----
You should return all values as numpy arrays, they will be wrapped
in `BufferWrapper` instances before they are returned to the user.
See the :ref:`udf final post processing` section in the documentation for
details and examples.
Returns
-------
results : dict
A `dict` containing the final post-processed results.
"""
for k in self.results.keys():
buf = self.results.get_buffer(k)
if buf.use == "result_only":
raise UDFException(
"don't know how to set use='result_only' buffer '%s';"
" please implement `get_results`" % k
)
decls = self.get_result_buffers()
return {
k: getattr(self.results, k)
for k in self.results.keys()
if decls[k].use != "private"
}
[docs] def get_tiling_preferences(self) -> TilingPreferences:
"""
Configure tiling preferences. Return a dictionary with the
following keys:
- "depth": number of frames/frame parts to stack on top of each other
- "total_size": total size of a tile in bytes
.. versionadded:: 0.6.0
"""
return {
"depth": UDF.TILE_DEPTH_DEFAULT,
"total_size": UDF.TILE_SIZE_MAX,
}
[docs] def get_backends(self) -> BackendSpec:
# TODO see interaction with C++ CUDA modules requires a different type than CuPy
'''
Signal which computation back-ends the UDF can use.
:code:`numpy` is the default CPU-based computation.
:code:`cuda` is CUDA-based computation without CuPy.
:code:`cupy` is CUDA-based computation through CuPy.
.. versionadded:: 0.6.0
Returns
-------
backend : Iterable[str]
An iterable containing possible values :code:`numpy` (default), :code:`'cuda'` and
:code:`cupy`
'''
return ('numpy', )
def cleanup(self) -> None: # FIXME: name? implement cleanup as context manager somehow?
pass
[docs] def buffer(
self,
kind: BufferKind,
extra_shape: Tuple[int, ...] = (),
dtype: "nt.DTypeLike" = "float32",
where: BufferLocation = None,
use: Optional[BufferUse] = None,
) -> BufferWrapper:
'''
Use this method to create :class:`~ libertem.common.buffers.BufferWrapper` objects
in :meth:`get_result_buffers`.
Parameters
----------
kind : "nav", "sig" or "single"
The abstract shape of the buffer, corresponding either to the navigation
or the signal dimensions of the dataset, or a single value.
extra_shape : optional, tuple of int or a Shape object
You can specify additional dimensions for your data. For example, if
you want to store 2D coords, you would specify :code:`(2,)` here.
If this is specified as a Shape object, it is converted to a tuple first.
dtype : string or numpy dtype
The dtype of this buffer
where : string or None
:code:`None` means NumPy array, specify :code:`'device'` to use a device buffer
(for example on a CUDA device)
.. versionadded:: 0.6.0
use : "private", "result_only" or None
If you specify :code:`"private"` here, the result will only be made available
to internal functions, like :meth:`process_frame`, :meth:`merge` or
:meth:`get_results`. It will not be available to the user of the UDF, which means
you can use this to hide implementation details that are likely to change later.
Specify :code:`"result_only"` here if the buffer is only used in :meth:`get_results`,
this means we don't have to allocate and return it on the workers without actually
needing it.
:code:`None` means the buffer is used both as a final and intermediate result.
.. versionadded:: 0.7.0
'''
if use is not None and use.lower() == "result_only":
return PlaceholderBufferWrapper(kind, extra_shape, dtype, use=use)
return BufferWrapper(kind, extra_shape, dtype, where, use=use)
[docs] @classmethod
def aux_data(cls, data, kind, extra_shape=(), dtype="float32"):
"""
Use this method to create auxiliary data. Auxiliary data should
have a shape like `(dataset.shape.nav, extra_shape)` and on access,
an appropriate view will be created. For example, if you access
aux data in `process_frame`, you will get the auxiliary data for
the current frame you are processing.
Example
-------
We create a UDF to demonstrate the behavior:
>>> class MyUDF(UDF):
... def get_result_buffers(self):
... # Result buffer for debug output
... return {'aux_dump': self.buffer(kind='nav', dtype='object')}
...
... def process_frame(self, frame):
... # Extract value of aux data for demonstration
... self.results.aux_dump[:] = str(self.params.aux_data[:])
...
>>> # for each frame, provide three values from a sequential series:
>>> aux1 = MyUDF.aux_data(
... data=np.arange(np.prod(dataset.shape.nav) * 3, dtype=np.float32),
... kind="nav", extra_shape=(3,), dtype="float32"
... )
>>> udf = MyUDF(aux_data=aux1)
>>> res = ctx.run_udf(dataset=dataset, udf=udf)
process_frame for frame (0, 7) received a view of aux_data with values [21., 22., 23.]:
>>> res['aux_dump'].data[0, 7]
'[21. 22. 23.]'
"""
buf = AuxBufferWrapper(kind, extra_shape, dtype)
buf.set_buffer(data)
return buf
[docs]class NoOpUDF(UDF):
'''
A UDF that does nothing and returns nothing.
This is useful for testing.
Parameters
----------
preferred_input_dtype : numpy.dtype
Perform dtype conversion. By default, this is :attr:`UDF.USE_NATIVE_DTYPE`.
'''
def __init__(self, preferred_input_dtype=UDF.USE_NATIVE_DTYPE) -> None:
super().__init__(preferred_input_dtype=preferred_input_dtype)
[docs] def process_tile(self, tile):
'''
Do nothing.
'''
pass
[docs] def get_result_buffers(self):
'''
No result buffers.
'''
return {}
def check_cast(fromvar, tovar):
if not np.can_cast(fromvar.dtype, tovar.dtype, casting='safe'):
# FIXME exception or warning?
raise TypeError(f"Unsafe automatic casting from {fromvar.dtype} to {tovar.dtype}")
class UDFParams:
def __init__(
self,
kwargs: List[dict],
roi: Optional[np.ndarray],
corrections: Optional[CorrectionSet],
tiling_scheme: TilingScheme,
):
"""
Container class for UDF parameters for multiple UDFs
Parameters
----------
kwargs : List[dict]
List of kwargs which can be used to re-create the UDF instances
that were passed to `run_for_dataset`
roi
Boolean array to select parts of the navigation space
corrections
Corrections to apply
"""
self._kwargs = kwargs
self._roi = roi
self._corrections = corrections
self._tiling_scheme = tiling_scheme
@classmethod
def from_udfs(
cls,
udfs: Iterable[UDF],
roi: Optional[np.ndarray],
corrections: Optional[CorrectionSet],
tiling_scheme: TilingScheme,
):
kwargs = [udf._kwargs for udf in udfs]
return cls(kwargs, roi, corrections, tiling_scheme)
@property
def roi(self):
return self._roi
@property
def corrections(self):
return self._corrections
@property
def kwargs(self):
return self._kwargs
@property
def tiling_scheme(self):
return self._tiling_scheme
class Task:
"""
A computation on a partition. Inherit from this class and implement ``__call__``
for your specific computation.
.. versionchanged:: 0.4.0
Moved from libertem.job.base to libertem.udf.base as part of Job API deprecation
"""
def __init__(self, partition: Partition, idx: int, span_context: "SpanContext"):
self.partition = partition
self.idx = idx
self._span_context = span_context
self._progress = False
def get_partition(self):
return self.partition
def get_locations(self):
return self.partition.get_locations()
def get_resources(self) -> ResourceDef:
'''
Specify the resources that a Task will use.
The resources are designed to work with resource tags in Dask clusters:
See https://distributed.dask.org/en/latest/resources.html
The resources allow scheduling of CPU-only compute, CUDA-only compute,
(CPU or CUDA) hybrid compute, and service tasks in such a way that all
resources are used without oversubscription. Furthermore, they
distinguish if the given resource can be accessed with a transparent
NumPy ndarray interface -- namely if CuPy is installed to access CUDA
resources.
Each CPU worker gets one CPU, one compute and one ndarray resource
assigned. Each CUDA worker gets one CUDA and one compute resource
assigned. If CuPy is installed, it additionally gets an ndarray resource
assigned. A service worker doesn't get any resources assigned.
A CPU-only task consumes one CPU, one ndarray and one compute resource,
i.e. it will be scheduled only on CPU workers. A CUDA-only task consumes
one CUDA, one compute and possibly an ndarray resource, i.e. it will
only be scheduled on CUDA workers. A hybrid task that can run on both
CPU or CUDA using a transparent ndarray interface only consumes a
compute and an ndarray resource, i.e. it can be scheduled on CPU workers
or CUDA workers with CuPy. A service task doesn't request any resources
and can therefore run on CPU, CUDA and service workers.
Which device a hybrid task uses is only decided at runtime by the
environment variables that are set on each worker process.
That way, CPU-only and CUDA-only tasks can run in parallel without
oversubscription, and hybrid tasks use whatever compute workers are free
at the time. Furthermore, it allows using CUDA without installing CuPy.
See https://github.com/LiberTEM/LiberTEM/pull/760 for detailed
discussion.
Compute tasks will not run on service workers, so that these can serve
shorter service tasks with lower latency.
At the moment, workers only get a single "item" of each resource
assigned since we run one process per CPU. Requesting more of a resource
than any of the workers has causes a :class:`RuntimeError`, including
requesting a resource that is not available at all.
For that reason one has to make sure that the workers are set up with
the correct resources and matching environment.
:meth:`libertem.utils.devices.detect`,
:meth:`libertem.executor.dask.cluster_spec` and
:meth:`libertem.executor.dask.DaskJobExecutor.make_local` can be used to
set up a local cluster correctly.
.. versionadded:: 0.6.0
'''
# default: run only on CPU and do computation
# No CUDA support for the deprecated Job interface
return {'CPU': 1, 'compute': 1, 'ndarray': 1}
def get_tracing_span_context(self) -> "SpanContext":
return self._span_context
def __call__(self, params: UDFParams, env: Environment):
raise NotImplementedError()
def report_progress(self):
self._progress = True
def _get_canonical_backends(backends: Optional[BackendSpec]) -> Set[Backend]:
"""
Convert from either an iterable of backends or a simple string form into a
canonical form of Set[str]
"""
if backends is None:
return set()
if isinstance(backends, str):
backends = (backends,)
return set(backends)
def get_resources_for_backends(
udf_backends: List[BackendSpec],
user_backends: Optional[BackendSpec]
) -> ResourceDef:
"""
Find the resource definition that is appropriate for the backends specified
by the UDFs and the user. This is the intersection between the backend specified
in each UDF, and the backend that was sepected by the user.
Parameters
----------
udf_backends
The backends specified by each UDF that we want to run
user_backends
The backends specified by the user
See :meth:`Task.get_resources` for details.
"""
udf_backends_canonical = [
_get_canonical_backends(bs)
for bs in udf_backends
]
user_backends_canonical = _get_canonical_backends(user_backends)
needs_cuda = 0
needs_cpu = 0
needs_ndarray = 0
# Limit to externally specified backends
for backend_set in udf_backends_canonical:
if user_backends_canonical:
backends = set(user_backends_canonical).intersection(backend_set)
else:
backends = backend_set
needs_cuda += 'numpy' not in backends
needs_cpu += ('cuda' not in backends) and ('cupy' not in backends)
needs_ndarray += 'cuda' not in backends
if needs_cuda and needs_cpu:
raise ValueError(
"There is no common supported UDF backend (have: %r, limited to %r)"
% (udf_backends, user_backends_canonical)
)
result: ResourceDef = {'compute': 1}
if needs_cpu:
result['CPU'] = 1
if needs_cuda:
result['CUDA'] = 1
if needs_ndarray:
result['ndarray'] = 1
return result
class UDFTask(Task):
def __init__(
self,
partition: Partition,
idx: int,
udf_classes: List[Type[UDF]],
udf_backends: List[BackendSpec],
backends: Optional[BackendSpec],
runner_cls: Type['UDFPartRunner'],
span_context: "SpanContext",
task_frames: int,
):
"""
A computation for a single partition. The parameters that stay the same
for the whole dataset are excluded here and supplied by the executor in
__call__.
Parameters
----------
partition : Partition
The partition to work on
idx : int
the index of the task, used to identify results (?)
udf_classes : List[Type[UDF]]
The UDFs to run
backends : List[str]
The specified backends we want to run on
task_frames : int
The number of frames this task must process (ROI included)
"""
super().__init__(partition=partition, idx=idx, span_context=span_context)
self._backends = backends
self._udf_classes = udf_classes
self._udf_backends = udf_backends
self._runner_cls = runner_cls
self._task_frames = task_frames
def __call__(self, params: UDFParams, env: Environment) -> Tuple[UDFData, ...]:
with self._propagate_tracing(), tracer.start_as_current_span("UDFTask.__call__"):
udfs = [
cls.new_for_partition(kwargs, self.partition, params.roi)
for cls, kwargs in zip(self._udf_classes, params.kwargs)
]
return self._runner_cls(udfs, progress=self._progress).run_for_partition(
self.partition, params, env,
)
@contextmanager
def _propagate_tracing(self):
"""
Initialize the current tracing context from the :code:`SpanContext`
given in `self._span_context`, if we don't already have a running
tracing context.
"""
if opentelemetry_context.get_current():
yield
else:
with attach_to_parent(self._span_context):
yield
def get_runner_cls(self) -> Type["UDFPartRunner"]:
return self._runner_cls
def get_udf_classes(self) -> List[Type["UDF"]]:
return self._udf_classes
def get_resources(self) -> ResourceDef:
"""
Intersection of resources of all UDFs, throws if empty.
See docstring of super class for details.
"""
return get_resources_for_backends(self._udf_backends, user_backends=self._backends)
def __repr__(self):
return f"<UDFTask {self._udf_classes!r}>"
@property
def task_frames(self) -> int:
"""
The number of frames to be processed in this task
(accounting for ROI)
"""
return self._task_frames
class UDFPartRunner:
def __init__(self, udfs: List[UDF], debug: bool = False, progress: bool = False):
self._udfs = udfs
self._debug = debug
self._progress = progress
def run_for_partition(
self,
partition: Partition,
params: UDFParams,
env: Environment,
) -> Tuple[UDFData, ...]:
roi = params.roi
corrections = params.corrections
with env.enter():
try:
previous_id = None
device_class = get_device_class()
# numpy_udfs and cupy_udfs contain references to the objects in
# self._udfs
numpy_udfs, cupy_udfs = self._udf_lists(device_class)
# Will only be populated if actually on CUDA worker
# and any UDF supports 'cupy' (and not 'cuda')
if cupy_udfs:
# Avoid importing if not used
import cupy
device = get_use_cuda()
previous_id = cupy.cuda.Device().id
cupy.cuda.Device(device).use()
(meta, dtype) = self._init_udfs(
numpy_udfs, cupy_udfs, partition, roi, corrections, device_class, env,
params.tiling_scheme,
)
partition.set_corrections(corrections)
if env.worker_context is not None:
partition.set_worker_context(env.worker_context)
self._run_udfs(numpy_udfs, cupy_udfs, partition, params.tiling_scheme, roi, dtype)
self._wrapup_udfs(numpy_udfs, cupy_udfs, partition)
finally:
if previous_id is not None:
cupy.cuda.Device(previous_id).use()
# Make sure results are in the same order as the UDFs
return tuple(udf.results for udf in self._udfs)
def _run_udfs(
self,
numpy_udfs: List[UDF],
cupy_udfs: List[UDF],
partition: Partition,
tiling_scheme: TilingScheme,
roi: Optional[np.ndarray],
dtype,
) -> None:
# FIXME pass information on target location (numpy or cupy)
# to dataset so that is can already move it there.
# In the future, it might even decode data on the device instead of CPU
tiles = partition.get_tiles(
tiling_scheme=tiling_scheme,
roi=roi, dest_dtype=dtype,
)
if cupy_udfs:
xp = cupy_udfs[0].xp
# type explicitly to help mypy
partition_progress: Union[PartitionProgressTracker, PartitionTrackerNoOp]
if self._progress:
partition_progress = PartitionProgressTracker(partition)
else:
partition_progress = PartitionTrackerNoOp()
partition_progress.signal_start()
for tile in tiles:
self._run_tile(numpy_udfs, partition, tile, tile, roi=roi)
if cupy_udfs:
# Work-around, should come from dataset later
device_tile = xp.asanyarray(tile)
self._run_tile(cupy_udfs, partition, tile, device_tile, roi=roi)
partition_progress.signal_tile_complete(tile)
# We could signal partition completion here, but this is already
# handled on the main node via ProgressManager.finalize_task(task)
# as it is the fallback method for executors that don't support comms
# partition_progress.signal_complete()
def _udf_lists(self, device_class: DeviceClass) -> Tuple[List[UDF], List[UDF]]:
numpy_udfs = []
cupy_udfs = []
if device_class == 'cuda':
for udf in self._udfs:
backends = udf.get_backends()
if 'cuda' in backends:
numpy_udfs.append(udf)
elif 'cupy' in backends:
cupy_udfs.append(udf)
elif 'numpy' in backends:
warnings.warn(f"UDF {udf} backends are {backends}, recommended on CUDA are "
"'cuda' and 'cupy'.", RuntimeWarning)
numpy_udfs.append(udf)
else:
raise RuntimeError(
f"UDF {udf} backends are {backends}, supported on CUDA are "
"'cuda' and 'cupy', as well as 'numpy' as a compatibility fallback."
)
elif device_class == 'cpu':
for udf in self._udfs:
backends = udf.get_backends()
if 'numpy' not in backends:
raise RuntimeError(
f"UDF {udf} backends are {backends}, supported on CPU is 'numpy'."
)
numpy_udfs = self._udfs
else:
raise ValueError(f"Unknown device class {device_class}, "
"supported are 'cpu' and 'cuda'")
return (numpy_udfs, cupy_udfs)
def _init_udfs(
self,
numpy_udfs: List[UDF],
cupy_udfs: List[UDF],
partition: Partition,
roi: Optional[np.ndarray],
corrections: CorrectionSet,
device_class,
env: Environment,
tiling_scheme: TilingScheme,
) -> Tuple[UDFMeta, "nt.DTypeLike"]:
dtype = _get_dtype(self._udfs, partition.dtype, corrections)
meta = UDFMeta(
partition_slice=partition.slice.adjust_for_roi(roi),
dataset_shape=partition.meta.shape,
roi=roi,
dataset_dtype=partition.dtype,
input_dtype=dtype,
tiling_scheme=tiling_scheme,
corrections=corrections,
device_class=device_class,
threads_per_worker=env.threads_per_worker,
)
for udf in numpy_udfs:
backends = udf.get_backends()
if device_class == 'cuda':
# Warnings etc handled in _udf_lists()
if 'cuda' in backends:
udf.set_backend('cuda')
elif 'numpy' in backends: # fallback
udf.set_backend('numpy')
else:
# Should be covered in _udf_lists(), but just to be sure.
raise ValueError("Can't run {udf} with backends {backends} on a CUDA worker.")
else:
udf.set_backend('numpy')
if device_class == 'cpu':
# Should be covered in _udf_lists(), but just to be sure.
assert not cupy_udfs
for udf in cupy_udfs:
udf.set_backend('cupy')
udfs = numpy_udfs + cupy_udfs
for udf in udfs:
udf.get_method() # validate that one of the `process_*` methods is implemented
udf.set_meta(meta)
udf.init_result_buffers()
udf.allocate_for_part(partition, roi)
udf.init_task_data()
# TODO: preprocess doesn't have access to the tiling scheme - is this ok?
if isinstance(udf, UDFPreprocessMixin):
udf.clear_views()
udf.preprocess()
for udf in udfs:
udf.set_meta(meta)
return (meta, dtype)
def _run_tile(
self,
udfs: List[UDF],
partition: Partition,
tile: DataTile,
device_tile: DataTile,
roi: Optional[np.ndarray],
) -> None:
for udf in udfs:
if isinstance(udf, UDFTileMixin):
udf.set_contiguous_views_for_tile(partition, tile)
udf.set_slice(tile.tile_slice)
udf.set_tile_idx(tile.scheme_idx)
udf.process_tile(device_tile)
elif isinstance(udf, UDFFrameMixin):
tile_slice = tile.tile_slice
for frame_idx, frame in enumerate(device_tile):
frame_slice = Slice(
origin=(tile_slice.origin[0] + frame_idx,) + tile_slice.origin[1:],
shape=Shape((1,) + tuple(tile_slice.shape)[1:],
sig_dims=tile_slice.shape.sig.dims),
)
# Internal checks for dataset consistency
assert frame.shape == tuple(partition.shape.sig)
udf.set_slice(frame_slice)
udf.set_views_for_frame(partition, tile, frame_idx)
udf.process_frame(frame)
elif isinstance(udf, UDFPartitionMixin):
# Internal checks for dataset consistency
assert partition.slice.adjust_for_roi(roi) == tile.tile_slice
udf.set_views_for_tile(partition, tile)
udf.set_slice(tile.tile_slice)
udf.process_partition(device_tile)
def _wrapup_udfs(
self,
numpy_udfs: List[UDF],
cupy_udfs: List[UDF],
partition: Partition
) -> None:
udfs = numpy_udfs + cupy_udfs
for udf in udfs:
udf.flush(self._debug)
if isinstance(udf, UDFPostprocessMixin):
udf.clear_views()
udf.postprocess()
udf.cleanup()
udf.clear_views()
udf.export_results()
if self._debug:
try:
cloudpickle.loads(cloudpickle.dumps(partition))
except TypeError:
raise TypeError("could not pickle partition")
try:
cloudpickle.loads(cloudpickle.dumps(
[u._do_get_results() for u in udfs]
))
except TypeError:
raise TypeError("could not pickle results")
class UDFRunner:
@staticmethod
def _apply_part_result(udfs, damage, part_results, task):
for results, udf in zip(part_results, udfs):
udf.set_views_for_partition(task.partition)
udf.merge(
dest=udf.results.get_proxy(),
src=results.get_proxy()
)
v = damage.get_view_for_partition(task.partition)
v[:] = True
@staticmethod
def _make_udf_result(udfs: Iterable[UDF], damage: BufferWrapper) -> "UDFResults":
for udf in udfs:
udf.clear_views()
return UDFResults(
buffers=tuple(
udf._do_get_results()
for udf in udfs
),
damage=damage
)
def __init__(self, udfs: List[UDF], debug: bool = False,
progress_reporter: Optional['ProgressReporter'] = None):
self._udfs = udfs
self._debug = debug
self._pool = TracedThreadPoolExecutor(tracer, max_workers=4)
self._progress_reporter = progress_reporter
@classmethod
def inspect_udf(
cls,
udf: UDF,
dataset: DataSet,
roi: Optional[np.ndarray] = None,
) -> Dict[str, BufferWrapper]:
"""
Return result buffer declarations for a given UDF/DataSet/roi combination
"""
meta = UDFMeta(
partition_slice=None,
dataset_shape=dataset.shape,
roi=None,
dataset_dtype=dataset.dtype,
input_dtype=_get_dtype(
[udf],
dataset.dtype,
corrections=None,
),
corrections=None,
)
udf = udf.copy()
udf.set_meta(meta)
buffers = udf.get_result_buffers()
for buf in buffers.values():
buf.set_shape_ds(dataset.shape, roi)
return buffers
@classmethod
def dry_run(cls, udfs, dataset, roi=None):
"""
Return result buffers for a given UDF/DataSet/roi combination
exactly as running the UDFs would, just skipping execution and
merging of the processing tasks.
This can be used to create an empty result to initialize live plots
before running an UDF.
"""
with tracer.start_as_current_span("UDFRunner.dry_run"):
runner = cls(udfs)
executor = InlineJobExecutor()
res = runner.run_for_dataset(
dataset=dataset,
executor=executor,
roi=roi,
dry=True
)
return res
def _debug_task_pickling(self, tasks: List[UDFTask]) -> None:
if self._debug:
cloudpickle.loads(cloudpickle.dumps(tasks))
def _check_preconditions(self, dataset: DataSet, roi: Optional[np.ndarray]) -> None:
if roi is not None and prod(roi.shape) != prod(dataset.shape.nav):
raise ValueError(
"roi: incompatible shapes: {} (roi) vs {} (dataset)".format(
roi.shape, dataset.shape.nav
)
)
def _prepare_run_for_dataset(
self,
dataset: DataSet,
executor: JobExecutor,
roi: Optional[np.ndarray],
corrections: Optional[CorrectionSet],
backends: Optional[BackendSpec],
dry: bool,
) -> Tuple[List[UDFTask], UDFParams]:
self._check_preconditions(dataset, roi)
meta = UDFMeta(
partition_slice=None,
dataset_shape=dataset.shape,
roi=roi,
dataset_dtype=dataset.dtype,
input_dtype=_get_dtype(self._udfs, dataset.dtype, corrections),
corrections=corrections,
)
for udf in self._udfs:
udf.set_meta(meta)
udf.init_result_buffers(executor=executor)
udf.allocate_for_full(dataset, roi)
if isinstance(udf, UDFPreprocessMixin):
udf.set_views_for_dataset(dataset)
udf.preprocess()
neg = Negotiator()
# FIXME take compute backend into consideration as well
# Other boundary conditions when moving input data to device
# FIXME: approximate partition shape here
partition = next(dataset.get_partitions())
tiling_scheme = neg.get_scheme(
udfs=self._udfs,
approx_partition_shape=partition.shape,
dataset=dataset,
read_dtype=meta.input_dtype,
roi=roi,
corrections=corrections,
)
params = UDFParams.from_udfs(
udfs=self._udfs,
roi=roi,
corrections=corrections,
tiling_scheme=tiling_scheme,
)
if dry:
tasks = []
else:
tasks = list(self._make_udf_tasks(dataset, roi, backends))
return (tasks, params)
def run_for_dataset(
self,
dataset: DataSet,
executor: JobExecutor,
roi: Optional[np.ndarray] = None,
progress: bool = False,
corrections: Optional[CorrectionSet] = None,
backends: Optional[BackendSpec] = None,
dry: bool = False
) -> "UDFResults":
with tracer.start_as_current_span("UDFRunner.run_for_dataset"):
for res in self.run_for_dataset_sync(
dataset=dataset,
executor=executor.ensure_sync(),
roi=roi,
progress=progress,
corrections=corrections,
backends=backends,
dry=dry,
iterate=False
):
pass
return res
def results_for_dataset_sync(
self,
dataset: DataSet,
executor: JobExecutor,
roi: Optional[np.ndarray] = None,
progress: bool = False,
corrections: Optional[CorrectionSet] = None,
backends: Optional[BackendSpec] = None,
dry: bool = False,
) -> Iterable[Tuple[Tuple[UDFData, ...], TaskProtocol]]:
with tracer.start_as_current_span("_prepare_run_for_dataset"):
tasks, params = self._prepare_run_for_dataset(
dataset, executor, roi, corrections, backends, dry
)
cancel_id = str(uuid.uuid4())
self._debug_task_pickling(tasks)
executor = executor.ensure_sync()
if dry:
task_comm_handler: TaskCommHandler = NoopCommHandler()
else:
task_comm_handler = dataset.get_task_comm_handler()
try:
if progress and tasks:
pman = ProgressManager(tasks, reporter=self._progress_reporter)
pman.connect(task_comm_handler)
for task in tasks:
task.report_progress()
with executor.scatter(params) as params_handle:
if tasks:
for res in executor.run_tasks(
tasks,
params_handle,
cancel_id,
task_comm_handler,
):
if progress:
pman.finalize_task(res[1])
yield res
finally:
if progress and tasks:
pman.close()
def run_for_dataset_sync(
self,
dataset: DataSet,
executor: JobExecutor,
roi: Optional[np.ndarray] = None,
progress: bool = False,
corrections: Optional[CorrectionSet] = None,
backends: Optional[BackendSpec] = None,
dry: bool = False,
iterate: bool = True
) -> Generator["UDFResults", None, None]:
executor = executor.ensure_sync()
result_iter = self.results_for_dataset_sync(
dataset=dataset,
executor=executor,
roi=roi,
progress=progress,
corrections=corrections,
backends=backends,
dry=dry,
)
damage = BufferWrapper(kind='nav', dtype=bool)
damage.set_shape_ds(dataset.shape, roi)
damage.allocate()
any_result = False
for part_results, task in result_iter:
any_result = True
with tracer.start_as_current_span("_apply_part_result -> UDF.merge"):
self._apply_part_result(
udfs=self._udfs,
damage=damage,
part_results=part_results,
task=task
)
if iterate:
yield self._make_udf_result(
udfs=self._udfs,
damage=damage
)
if not any_result or not iterate:
yield self._make_udf_result(
udfs=self._udfs,
damage=damage
)
async def run_for_dataset_async(
self,
dataset: DataSet,
executor: JobExecutor,
cancel_id,
roi: Optional[np.ndarray] = None,
corrections: Optional[CorrectionSet] = None,
backends: Optional[BackendSpec] = None,
progress: bool = False,
dry: bool = False,
iterate: bool = True,
) -> AsyncGenerator["UDFResults", None]:
with tracer.start_as_current_span("UDFRunner.run_for_dataset_async"):
gen = self.run_for_dataset_sync(
dataset=dataset,
executor=executor.ensure_sync(),
roi=roi,
progress=progress,
corrections=corrections,
backends=backends,
dry=dry,
iterate=iterate,
)
async for res in async_generator_eager(gen, pool=self._pool):
yield res
def _roi_for_partition(self, roi: np.ndarray, partition: Partition) -> np.ndarray:
return roi_for_partition(roi, partition)
def _make_udf_tasks(
self,
dataset: DataSet,
roi: Optional[np.ndarray],
backends: Optional[BackendSpec]
) -> Generator[UDFTask, None, None]:
udf_backends = [
udf.get_backends()
for udf in self._udfs
]
span = trace.get_current_span()
span_context = span.get_span_context()
for idx, partition in enumerate(dataset.get_partitions()):
partition.set_idx(idx)
num_frames = partition.get_frame_count(roi)
if not num_frames:
# roi is empty for this partition or zero-length, ignore
continue
udf_classes = [
udf.__class__
for udf in self._udfs
]
tasks = UDFTask(
partition=partition, idx=idx, udf_classes=udf_classes,
udf_backends=udf_backends,
backends=backends,
runner_cls=self.get_part_runner_cls(),
span_context=span_context,
task_frames=num_frames,
)
yield tasks
@classmethod
def get_part_runner_cls(cls) -> Type['UDFPartRunner']:
return UDFPartRunner
UDFResultDict = Mapping[str, BufferWrapper]
[docs]class UDFResults:
'''
Container class to combine UDF results with additional information.
This class allows to return additional information from UDF execution
together with UDF result buffers. This is currently used to pass
"damage" information when running UDFs as an iterator using
:meth:`libertem.api.Context.run_udf_iter`. "Damage" is
a map of the nav space that is set to :code:`True`
for all positions that have already been processed.
.. versionadded:: 0.7.0
Parameters
----------
buffers
Iterable containing the result buffer dictionaries for each of the UDFs being executed
damage : BufferWrapper
:class:`libertem.common.buffers.BufferWrapper` of :code:`kind='nav'`, :code:`dtype=bool`.
It is set to :code:`True` for all positions in nav space that have been processed already.
'''
def __init__(self, buffers: Iterable[UDFResultDict], damage: BufferWrapper):
self.buffers = tuple(buffers)
self.damage = damage