Source code for libertem.common.container

import functools
import logging
from typing import Union, Callable, Optional
from collections.abc import Sequence
from typing_extensions import Literal

import sparse
import scipy.sparse
import numpy as np
import numpy.typing as npt
import cloudpickle
from sparseconverter import (
    CPU_BACKENDS, CUDA, CUPY_BACKENDS, for_backend, ArrayT, ArrayBackend, NUMPY
)
from libertem.io.dataset.base.tiling_scheme import TilingScheme

from libertem.common.sparse import to_dense, to_sparse, is_sparse
from libertem.common import Slice


log = logging.getLogger(__name__)

FactoryT = Callable[[], ArrayT]
SparseSupportedT = Literal[
    'sparse.pydata',
    'sparse.pydata.GCXS',
    'scipy.sparse',
    'scipy.sparse.csc',
    'scipy.sparse.csr',
]


def _build_sparse(m, dtype: npt.DTypeLike, sparse_backend: SparseSupportedT, backend: ArrayBackend):
    if sparse_backend == 'sparse.pydata' and backend == NUMPY:
        # sparse.pydata.org is fastest for masks with few layers
        # and few entries
        return m.astype(dtype)
    elif sparse_backend == 'sparse.pydata.GCXS' and backend == NUMPY:
        # sparse.pydata.org is fastest for masks with few layers
        # and few entries
        return sparse.GCXS(m.astype(dtype))
    elif 'scipy.sparse' in sparse_backend:
        if backend in CPU_BACKENDS or backend == CUDA:
            lib = scipy.sparse
        elif backend in CUPY_BACKENDS:
            # Avoid import if possible
            import cupyx.scipy.sparse
            lib = cupyx.scipy.sparse
        else:
            raise ValueError(
                f"Backend {backend} not supported for sparse_backend {sparse_backend}."
            )
        iis, jjs = m.coords
        values = m.data
        if sparse_backend == 'scipy.sparse.csc':
            s = scipy.sparse.csc_matrix(
                (values, (iis, jjs)), shape=m.shape, dtype=dtype)
            assert s.has_canonical_format
            return lib.csc_matrix(s)
        elif sparse_backend == 'scipy.sparse' or sparse_backend == 'scipy.sparse.csr':
            s = scipy.sparse.csr_matrix(
                (values, (iis, jjs)), shape=m.shape, dtype=dtype)
            assert s.has_canonical_format
            return lib.csr_matrix(s)
    # Fall through if no return statement was reached
    raise ValueError(
        f"sparse_backend {sparse_backend} not implemented for backend {backend}. "
        "CPU-based backends supports 'sparse.pydata', 'sparse.pydata.GCXS', 'scipy.sparse', "
        "'scipy.sparse.csc' or 'scipy.sparse.csr'. "
        "CUDA-based backends supports 'scipy.sparse', 'scipy.sparse.csc' or 'scipy.sparse.csr'. "
    )


def _make_mask_slicer(
    computed_masks: ArrayT,
    dtype: npt.DTypeLike,
    sparse_backend: Union[Literal[False], SparseSupportedT],
    transpose: bool,
    backend: ArrayBackend,
):
    @functools.cache
    def _get_masks_for_slice(slice_):
        stack_height = computed_masks.shape[0]
        m = slice_.get(computed_masks, sig_only=True)
        # We need the mask's signal dimension flattened
        m = m.reshape((stack_height, -1))
        if transpose:
            # We need the stack transposed in the next step
            m = m.T
        if sparse_backend is False:
            return for_backend(m, backend).astype(dtype)
        else:
            return _build_sparse(m, dtype, sparse_backend, backend)
    return _get_masks_for_slice


[docs] class MaskContainer: ''' Container for mask stacks that are created from factory functions. It allows stacking, cached slicing, transposing and conversion to condition the masks for high-performance dot products. Computation of masks is delayed until as late as possible, but is done automatically when necessary. Methods which can trigger mask instantiation include: - container.use_sparse - len(container) [if the count argument is None at __init__] - container.dtype [if the dtype argument is None at __init__] - any of the get() methods use_sparse at init can be None, False, True or any supported sparse backend as a string in {'scipy.sparse', 'scipy.sparse.csc', 'scipy.sparse.csr', 'sparse.pydata', 'sparse.pydata.GCXS'} use_sparse as None means the sparse mode will be chosen only after the masks are instantiated. All masks being sparse will activate sparse processing using the backend in default_sparse, else dense processing will be used on the appropriate backend. ''' def __init__( self, mask_factories: Union[FactoryT, Sequence[FactoryT]], dtype: Optional[npt.DTypeLike] = None, use_sparse: Optional[Union[bool, SparseSupportedT]] = None, count: Optional[int] = None, backend: Optional[ArrayBackend] = None, default_sparse: SparseSupportedT = 'scipy.sparse', ): self.mask_factories = mask_factories # If we generate a whole mask stack with one function call, # we should know the length without generating the mask stack self._length = count self._dtype = dtype self._mask_cache = {} # lazily initialized in the worker process, to keep task size small: self._computed_masks = None if backend is None: backend = 'numpy' self.backend = backend self._get_masks_for_slice = {} # from Python 3.8.... # assert default_sparse in typing.get_args(SparseSupportedT) self._default_sparse = default_sparse self._use_sparse: Union[Literal[False], None, SparseSupportedT] # Try to resolve if we are actually using sparse upfront, # this is not always possible as it depends on whether the # mask_factories will all return sparse matrices if use_sparse is True: self._use_sparse = default_sparse elif use_sparse is False: self._use_sparse = False elif isinstance(use_sparse, str) and ( # This should be rendered compatible with SPARSE_BACKENDS frozenset # but there are issues of capitalization and naming use_sparse.lower().startswith('scipy.sparse') or use_sparse.lower().startswith('sparse.pydata') ): self._use_sparse = use_sparse elif use_sparse is None: # User doesn't specify, will use sparse if masks # are sparse and we are on a compatible backend if ( default_sparse.startswith('sparse.pydata') and self.backend in CUPY_BACKENDS ): # sparse.pydata cannot run on CuPy, so densify to allow calculation self._use_sparse = False else: # we can't determine _use_sparse without creating the masks # themselves and we want to delay this as late as possible # leave as None for now and resolve on first access to # the self.use_sparse property self._use_sparse = None else: raise ValueError(f'use_sparse not an allowed value: {use_sparse}') self.validate_mask_functions() def __getstate__(self): # don't even try to pickle mask cache state = self.__dict__ state['_get_masks_for_slice'] = {} return state def validate_mask_functions(self): fns = self.mask_factories # 1 MB, magic number L3 cache limit = 2**20 if callable(fns): fns = [fns] for fn in fns: s = len(cloudpickle.dumps(fn)) if s > limit: log.warning( 'Mask factory size %s larger than warning limit %s, may be inefficient' % (s, limit) ) def __len__(self): if self._length is not None: return self._length elif not callable(self.mask_factories): return len(self.mask_factories) else: return len(self.computed_masks) def get_for_idx(self, scheme: TilingScheme, idx: int, *args, **kwargs): slice_ = scheme[idx] return self._get(slice_, *args, **kwargs)
[docs] def get_for_sig_slice(self, sig_slice: Slice, *args, **kwargs): """ Same as `get`, but without calling `discard_nav()` on the slice """ return self._get(sig_slice, *args, **kwargs)
def get(self, key: Slice, dtype=None, sparse_backend=None, transpose=True, backend=None): if not isinstance(key, Slice): raise TypeError( "MaskContainer.get() can only be called with " "DataTile/Slice/Partition instances" ) return self._get(key.discard_nav(), dtype, sparse_backend, transpose, backend) def _get(self, slice_: Slice, dtype=None, sparse_backend=None, transpose=True, backend=None): if backend is None: backend = self.backend return self.get_masks_for_slice( slice_, dtype=dtype, sparse_backend=sparse_backend, transpose=transpose, backend=backend ) @property def dtype(self): if self._dtype is None: return self.computed_masks.dtype else: return self._dtype @property def use_sparse(self) -> Union[SparseSupportedT, Literal[False]]: # As far as possible use_sparse was resolved at __init__ # but if we don't know if the masks are sparse we may still arrive # here with self._use_sparse is None if self._use_sparse is None: if is_sparse(self.computed_masks): # The first time the condition is hit will cause # mask computation but on subsequent tries we will # fall through to the normal return self._use_sparse = self._default_sparse else: self._use_sparse = False return self._use_sparse def _compute_masks(self) -> Union[np.ndarray, sparse.COO, sparse.GCXS]: """ Call mask factories and combine into a mask stack Uses the internal attr self._use_sparse, which could be None if we were unable to resolve the sparse mode at __init__ If self._use_sparse is None and all masks are sparse then will return a sparse stack else return a dense stack Otherwise if self._use_sparse is simply False then return dense, anything else return as a sparse stack Returns ------- an array-like mask stack with contents as they were created by the factories """ mask_slices = [] if callable(self.mask_factories): raw_masks = self.mask_factories() mask_slices.append(raw_masks) else: for f in self.mask_factories: m = f() # Scipy.sparse is always 2D, so we have to convert here # before reshaping if scipy.sparse.issparse(m): m = sparse.COO.from_scipy_sparse(m) # We reshape to be a stack of 1 so that we can unify code below m = m.reshape((1, ) + m.shape) mask_slices.append(m) # Fully resolve _use_sparse based on sparsity of masks. # The return type (sparse or dense) from this function # is used to resolve _use_sparse permanently in the # self.use_sparse property method masks_are_sparse = all(is_sparse(m) for m in mask_slices) use_sparse = self._use_sparse if use_sparse is None: if masks_are_sparse: use_sparse = self._default_sparse else: use_sparse = False if use_sparse is not False: # Conversion to correct back-end will happen later # Use sparse.pydata because it implements the array interface # which makes mask handling easier masks = sparse.concatenate( [to_sparse(m) for m in mask_slices] ) else: masks = np.concatenate( [to_dense(m) for m in mask_slices] ) return masks def get_masks_for_slice(self, slice_, dtype=None, sparse_backend=None, transpose=True, backend='numpy'): if dtype is None: dtype = self.dtype if sparse_backend is None: sparse_backend = self.use_sparse if backend is None: backend = self.backend key = (dtype, sparse_backend, transpose, backend) if key not in self._get_masks_for_slice: self._get_masks_for_slice[key] = _make_mask_slicer( self.computed_masks, dtype=dtype, sparse_backend=sparse_backend, transpose=transpose, backend=backend ) return self._get_masks_for_slice[key](slice_) @property def computed_masks(self): if self._computed_masks is None: self._computed_masks = self._compute_masks() return self._computed_masks