Source code for libertem.io.corrections.corrset

import functools
from typing import Optional

import numpy as np
import numba
import sparse

from libertem.common import Slice
from libertem.io.corrections.detector import correct, RepairDescriptor


@numba.njit(cache=True)
def disjunct_multiplier(excluded, sig_shape, base_shape=1, target=1):
    '''
    Calculate an integer i close to target which is a multiple of base_shape
    and for which i * n not in "excluded" for any n > 0, i * n < sig_shape.

    Cases with a bad pixel at 0 are handled downstream.

    Parameters
    ----------

    excluded: Sequence
        Forbidden tile boundary positions
    sig_shape: int
        Signal shape of the adjusted dimension
    base_shape: int
        Base shape of the adjusted dimension
    target: int
        Target value for the tile size, find a solution close to this value.

    Returns
    -------

    int
    '''
    approx_multiplier = int(np.round(target / base_shape))
    current_value = base_shape * approx_multiplier
    max_excluded = np.max(excluded)
    excluded_map = np.zeros(max_excluded + 1, dtype=np.uint8)
    excluded_map[excluded] = 1
    if current_value >= target:
        sign = 1
    else:
        sign = -1
    for offset in range(max_excluded // base_shape + 1):
        # plus 0, minus 1, plus 2, ...
        current_value += offset * sign * base_shape
        sign *= -1
        if current_value <= 0:
            continue
        clear = True
        for multiplier in range(1, max_excluded // current_value + 1):
            index = current_value * multiplier
            bad = (
                index >= 0
                and index < sig_shape
                and index <= max_excluded
                and excluded_map[index]
            )
            if bad:
                clear = False
                break
        if clear:
            return current_value
    # target value is max_excluded + 1,
    # i.e. if the modulo is 0 we have to add one more
    multiple = max_excluded // base_shape + 1
    return min(multiple * base_shape, sig_shape)


[docs] class CorrectionSet: """ A set of corrections to apply. .. versionadded:: 0.6.0 Parameters ---------- dark : np.ndarray An array containing a dark frame to substract from all frames, its shape needs to match the signal shape of the dataset. gain : np.ndarray An array containing a gain map to multiply with each frame, its shape needs to match the signal shape of the dataset. excluded_pixels : sparse.COO A "sparse pydata" COO array containing only entries for pixels that should be excluded. The shape needs to match the signal shape of the dataset. Can also be anything that is directly compatible with the :code:`sparse.COO` constructor, for example a "roi-like" NumPy array. A :code:`sparse.COO` array can be directly constructed from a coordinate array, using :code:`sparse.COO(coords=coords, data=1, shape=ds.shape.sig)` allow_empty : bool Do not throw an exception if a repair environment for an excluded pixel is empty. The pixel is left uncorrected in that case. """ def __init__( self, dark: Optional[np.ndarray] = None, gain: Optional[np.ndarray] = None, excluded_pixels: Optional[sparse.COO] = None, allow_empty: bool = False ): self._dark = dark self._gain = gain if excluded_pixels is not None: excluded_pixels = sparse.COO(excluded_pixels, prune=True) self._excluded_pixels = excluded_pixels self._allow_empty = allow_empty if not allow_empty and excluded_pixels is not None: # Construct the environment for checking so that an exception is thrown # when the CorrectionSet is instantiated and not when workers try to apply it. _ = RepairDescriptor( sig_shape=excluded_pixels.shape, excluded_pixels=excluded_pixels.coords, allow_empty=False ) def get_dark_frame(self) -> Optional[np.ndarray]: return self._dark def get_gain_map(self) -> Optional[np.ndarray]: return self._gain def get_excluded_pixels(self) -> Optional[sparse.COO]: return self._excluded_pixels def have_corrections(self) -> bool: corrs = [ self.get_dark_frame(), self.get_gain_map(), self.get_excluded_pixels(), ] return any(c is not None for c in corrs) def apply(self, data: np.ndarray, tile_slice: Slice) -> None: """ Apply corrections in-place to `data`, cropping the correction data to the `tile_slice`. """ dark_frame = self.get_dark_frame() gain_map = self.get_gain_map() if not self.have_corrections(): return sig_slice = tile_slice.get(sig_only=True) if dark_frame is not None: dark_frame = dark_frame[sig_slice] if gain_map is not None: gain_map = gain_map[sig_slice] correct( buffer=data, dark_image=dark_frame, gain_map=gain_map, repair_descriptor=self.repair_descriptor(tile_slice.discard_nav()), inplace=True, sig_shape=tuple(tile_slice.shape.sig), allow_empty=self._allow_empty ) @functools.lru_cache(maxsize=512) def repair_descriptor(self, sig_slice): excluded_pixels = self.get_excluded_pixels() if excluded_pixels is not None: excluded_pixels = excluded_pixels[sig_slice.get(sig_only=True)] excluded_pixels = excluded_pixels.coords return RepairDescriptor( sig_shape=tuple(sig_slice.shape.sig), excluded_pixels=excluded_pixels, allow_empty=self._allow_empty ) def adjust_tileshape(self, tile_shape, sig_shape, base_shape): excluded_pixels = self.get_excluded_pixels() if excluded_pixels is None: return tile_shape if excluded_pixels.nnz == 0: return tile_shape excluded_list = excluded_pixels.coords adjusted_shape = np.array(tile_shape) sig_shape = np.array(sig_shape) base_shape = np.array(base_shape) adjust( adjusted_shape_inout=adjusted_shape, sig_shape=sig_shape, base_shape=base_shape, excluded_list=excluded_list ) invalid = np.logical_or( adjusted_shape <= 0, adjusted_shape > sig_shape ) adjusted_shape[invalid] = sig_shape[invalid] return tuple(adjusted_shape)
def adjust(adjusted_shape_inout, sig_shape, base_shape, excluded_list): ''' Adjust the tile shape to avoid collisions with patched pixels. Find a tile shape that is a multiple of base_shape in such a way that excluded pixels are not touching a tile boundary. The proposed tile shape in adjusted_shape_inout is used as a starting value. If collisions can't be avoided, use sig_shape as tile shape for that dimension. Parameters ---------- adjusted_shape_inout: np.ndarray, size n_dim Shape to adjust, modified in place sig_shape: sequence, size n_dim Signal shape base_shape: sequence, size n_dim The adjusted shape is a multiple of base_shape excluded_list: 2D sequence Coordinates of exluded pixels of shape (n_dim, n_pixels) ''' for dim in range(0, len(adjusted_shape_inout)): # Nothing to adjust, could trip downstream logic if sig_shape[dim] <= 1: continue unique = np.unique(excluded_list[dim]) # Very many pixels in the way, low chances of a solution if len(unique) > sig_shape[dim] / 3: adjusted_shape_inout[dim] = sig_shape[dim] else: stop = sig_shape[dim] # Left and right side of an invalid pixel are forbidden forbidden = np.concatenate((unique, unique + 1)) forbidden = forbidden[forbidden <= stop] # Invalid pixel at zero is handled separately nonzero_filter = forbidden != 0 m = min( stop, disjunct_multiplier( excluded=forbidden[nonzero_filter], sig_shape=sig_shape[dim], base_shape=base_shape[dim], target=adjusted_shape_inout[dim] ) ) # handle zero if not np.all(nonzero_filter): min_size = max(m, 2) else: min_size = m # Current shape is not a clean multiple # of the ideal shape or too small: adjust if adjusted_shape_inout[dim] < min_size or adjusted_shape_inout[dim] % m != 0: adjusted_shape_inout[dim] = m