Source code for libertem.udf.stddev

from collections import defaultdict
import functools

import numpy as np
import numba

from libertem.udf import UDF
from libertem.common.buffers import reshaped_view


@numba.njit(fastmath=True, cache=True, nogil=True)
def merge_single(n, n_0, sum_0, varsum_0, n_1, sum_1, varsum_1, mean_1):
    '''
    Basic function to perform numerically stable merge.

    This function is designed to be inlined in a loop over all pixels in a frame
    to merge an individual pixel, with some parts pre-calculated.

    Based on :cite:`Schubert2018`.

    Parameters
    ----------
    n : int
        Total number of frames, assumed n == n_0 + n_1
        Pre-calculated since equal for all pixels.
    n_0 : int
        Number of frames aggregated in sum_0 and varsum_0, n_0 > 0.
        The case n_0 == 0 is handled separately in the calling function.
    sum_0, varsum_0 : float
        Aggregate sum and sum of variances
    n_1 : int
        Number of frames to merge from minibatch, n_1 > 0
    sum_1, varsum_1, mean_1 : float
        Minibatch sum, sum of variances and mean. The mean is
        available from the previous minibatch calculation in the innermost
        aggregation loop as a "hot" value.samples

    Returns
    -------
    sum, varsum : float
        New aggregate sum and sum of variances
    '''
    # compute mean for each partitions
    mean_0 = sum_0 / n_0

    # compute mean for joint samples
    delta = mean_1 - mean_0
    mean = mean_0 + (n_1 * delta) / n

    # compute sum of images for joint samples
    sumsum = sum_0 + sum_1

    # compute sum of variances for joint samples
    partial_delta = mean_1 - mean
    # The original version had delta * partial_delta, which gave wrong results
    # for complex numbers. Taking the absolute seems to fix this for complex.
    # For real values it is equivalent since delta and partial_delta always have
    # the same sign: inserting mean from above yields
    # partial_delta = delta + (n_1 * delta) / n
    # n_1 and n are natural numbers.
    # For complex values it is plausible since the variance
    # sum of the total is a real number. If the mean of both subsets is equal,
    # the total is varsum_0 + varsum_1, since both delta and partial_delta are
    # zero. If the mean of the subsets is different, the total variance sum
    # must always be larger than the variance sums of the
    # subsets. Taking the absolute of both is the only plausible way I can see
    # to make sure the calculation is equivalent for real values and at the same time always
    # a positive real value for complex numbers.
    # FIXME proper validation and proof for complex
    varsum = varsum_0 + varsum_1 + (n_1 * np.abs(delta) * np.abs(partial_delta))
    return sumsum, varsum


@numba.njit(nogil=True)
def merge(dest_n, dest_sum, dest_varsum, src_n, src_sum, src_varsum, src_mean):
    """
    Given two sets of buffers, with sum of frames
    and sum of variances, aggregate joint sum of frames
    and sum of variances in destination buffers using one pass
    algorithm.

    This is ther numerical workhorse for :meth:`StdDevUDF.merge`.

    Based on :cite:`Schubert2018`.

    Parameters
    ----------
    dest_n : int
        Number of frames aggregated in dest_sum and dest_varsum
        The case :code:`dest_N == 0` is handled correctly.
    dest_sum, dest_varsum : one-dimensional numpy.ndarray
        Aggregation buffers, will be updated: sum of pixels, sum of variances
    src_n : int
        Number of frames aggregated in src_sum and src_varsum
    src_sum, src_varsum : one-dimensional numpy.ndarray
        Source buffers to merge: sum of pixels, sum of variances


    Returns
    -------
    N:
        New number of frames aggregated in aggregation buffer
    """
    if dest_n == 0:
        dest_sum[:] = src_sum
        dest_varsum[:] = src_varsum
        return src_n
    elif src_n == 0:
        # Can happen from empty partitions due to sync offset
        return dest_n
    else:
        n = dest_n + src_n

        for pixel in range(len(dest_sum)):
            dest_sum[pixel], dest_varsum[pixel] = merge_single(
                n,
                dest_n, dest_sum[pixel], dest_varsum[pixel],
                src_n, src_sum[pixel], src_varsum[pixel], src_mean[pixel]
            )
    return n


@numba.njit(fastmath=True, nogil=True)
def process_tile(tile, n_0, sum_inout, varsum_inout):
    '''
    Compute sum and variance of :code:`tile` along navigation axis
    and merge into aggregation buffers. Numerical "workhorse" for
    :meth:`StdDevUDF.process_tile`.

    Parameters
    ----------
    tile : 2-dimensional numpy.ndarray
        Tile with flattened signal dimension
    n_0 : int
        Number of frames already aggegated in sum_inout, varsum_inout
        Cannot be 0 -- the initial case is handled in :meth:`StdDevUDF.process_tile`
        For unknown reasons, handling the case N0 == 0 in this function degraded performance
        significantly. Re-check with a newer compiler version!
    sum_inout, varsum_inout : numpy.ndarray
        Aggregation buffers (1D) with length matching the flattened signal dimension
        of the tile. The tile will be merged into these buffers (call by reference)

    Returns
    -------
    n : int
        New number of frames in aggregation buffer
    '''
    n_frames = tile.shape[0]
    n_pixels = tile.shape[1]
    n = n_0 + n_frames

    BLOCKSIZE = 256
    n_blocks = n_pixels // BLOCKSIZE

    sumsum = np.zeros(BLOCKSIZE, dtype=sum_inout.dtype)
    varsum = np.zeros(BLOCKSIZE, dtype=varsum_inout.dtype)

    for block in range(n_blocks):
        pixel_offset = block*BLOCKSIZE
        sumsum[:] = 0
        for frame in range(n_frames):
            for i in range(BLOCKSIZE):
                sumsum[i] += tile[frame, pixel_offset + i]
        mean = sumsum / n_frames
        varsum[:] = 0
        for frame in range(n_frames):
            for i in range(BLOCKSIZE):
                varsum[i] += np.abs(tile[frame, pixel_offset + i] - mean[i])**2
        for i in range(BLOCKSIZE):
            sum_inout[pixel_offset + i], varsum_inout[pixel_offset + i] = merge_single(
                n,
                n_0, sum_inout[pixel_offset + i], varsum_inout[pixel_offset + i],
                n_frames, sumsum[i], varsum[i], mean[i]
            )
    for pixel in range(n_blocks*BLOCKSIZE, n_pixels):
        sumsum_rest = 0.
        for frame in range(n_frames):
            sumsum_rest += tile[frame, pixel]
        mean_rest = sumsum_rest / n_frames
        varsum_rest = 0.
        for frame in range(n_frames):
            varsum_rest += np.abs(tile[frame, pixel] - mean_rest)**2
        sum_inout[pixel], varsum_inout[pixel] = merge_single(
            n,
            n_0, sum_inout[pixel], varsum_inout[pixel],
            n_frames, sumsum_rest, varsum_rest, mean_rest
        )
    return n


def merge_ndarray(dest_n, dest_sum, dest_varsum, src_n, src_sum, src_varsum, src_mean, forbuf):
    '''
    Version of :func:`merge` that only uses ndarray routines
    for CuPy and sparse input data
    '''
    if dest_n == 0:
        dest_sum[:] = forbuf(src_sum, dest_sum)
        dest_varsum[:] = forbuf(src_varsum, dest_varsum)
        return src_n
    elif src_n == 0:
        return dest_n
    else:
        n = dest_n + src_n
        # compute mean for each partitions
        mean_0 = dest_sum / dest_n
        mean_1 = src_mean

        # compute mean for joint samples
        delta = mean_1 - mean_0
        mean = mean_0 + (src_n * delta) / n

        # compute sum of images for joint samples
        dest_sum += forbuf(src_sum, dest_sum)

        # compute sum of variances for joint samples
        partial_delta = mean_1 - mean
        dest_varsum += forbuf(
            # Use multiply for compatibility with matrix interface (scipy.sparse)
            src_varsum + (np.multiply(np.multiply(src_n, np.abs(delta)), np.abs(partial_delta))),
            dest_varsum
        )
        return n


def process_tile_ndarray(tile, n_0, sum_inout, varsum_inout, forbuf):
    '''
    Version of :func:`process_tile` that only uses ndarray routines
    for CuPy and sparse input data
    '''
    n_frames = tile.shape[0]
    n = n_0 + n_frames
    sumsum = np.sum(tile, axis=0)
    mean = sumsum / n_frames
    delta = np.abs(tile - mean)
    varsum = np.sum(np.multiply(delta, delta), axis=0)
    merge_ndarray(
        n_0, sum_inout, varsum_inout,
        n_frames, sumsum, varsum, mean,
        forbuf
    )
    return n


# Helper function to make sure the frame count
# is consistent at the merge stage
def _validate_n(num_frame):
    if len(num_frame) == 0:
        return 0
    else:
        values = tuple(num_frame.values())
        assert np.all(np.equal(values, values[0]))
        return values[0]


[docs] class StdDevUDF(UDF): """ Compute sum of variances and sum of pixels from the given dataset The one-pass algorithm used in this code is taken from the following paper: :cite:`Schubert2018`. .. versionchanged:: 0.5.0 Result buffers have been renamed .. versionchanged:: 0.7.0 :code:`var`, :code:`mean`, and :code:`std` are now returned directly from the UDF via :code:`get_results`. .. versionchanged:: 0.11 added dtype and use_numba parameters, added support for sparse input. Parameters ---------- dtype Base dtype to determine numerical precision. use_numba : bool Use the Numba-accelerated version if input array format and backend allow. Examples -------- >>> udf = StdDevUDF() >>> result = ctx.run_udf(dataset=dataset, udf=udf) >>> np.array(result["varsum"]) # variance times number of frames array(...) >>> np.array(result["num_frames"]) # number of frames for each tile array(...) >>> np.array(result["sum"]) # sum of all frames array(...) >>> np.array(result["var"]) array(...) >>> np.array(result["mean"]) array(...) >>> np.array(result["std"]) array(...) """ def __init__(self, dtype=None, use_numba=True) -> None: super().__init__(dtype=dtype, use_numba=use_numba)
[docs] def get_backends(self): # TODO supporting sparse backends efficiently requires a dedicated implementation # since the generic one calculates the difference from a mean value, # which can densify a sparse array. return tuple(b for b in self.BACKEND_ALL if b not in self.SPARSE_BACKENDS)
def get_result_buffers(self): '' base_dtype = self.params.dtype if base_dtype is None: base_dtype = np.float64 dtype = np.result_type(self.meta.input_dtype, base_dtype) return { 'varsum': self.buffer( kind='sig', dtype=base_dtype, where='device' ), 'num_frames': self.buffer( kind='single', dtype='int64' ), 'sum': self.buffer( kind='sig', dtype=dtype, where='device' ), 'var': self.buffer( kind='sig', dtype=base_dtype, use='result_only', ), 'std': self.buffer( kind='sig', dtype=base_dtype, use='result_only', ), 'mean': self.buffer( kind='sig', dtype=dtype, use='result_only', ), }
[docs] def get_task_data(self): return { 'num_frames': defaultdict(int) }
def postprocess(self): self.results.num_frames[:] = _validate_n(self.task_data.num_frames) def merge(self, dest, src): '' # Given destination and source buffers that contain sum of variances, sum of frames, # and the number of frames used in each of the buffers, merge the source # buffers into the destination buffers by computing the joint sum of variances and # sum of frames over all frames used # Parameters # ---------- # dest # Aggregation bufer that contains sum of variances, sum of frames, and the # number of frames # src # Partial results that contains sum of variances, sum of frames, and the # number of frames of a partition to be merged into the aggregation buffers dest_n = dest.num_frames[0] src_n = src.num_frames[0] if self.params.use_numba: my_merge = merge else: my_merge = functools.partial(merge_ndarray, forbuf=self.forbuf) n = my_merge( dest_n=dest_n, dest_sum=reshaped_view(dest.sum, (-1,)), dest_varsum=reshaped_view(dest.varsum, (-1,)), src_n=src_n, src_sum=reshaped_view(src.sum, (-1,)), src_varsum=reshaped_view(src.varsum, (-1,)), src_mean=reshaped_view(src.sum, (-1,)) / src_n, ) dest.num_frames[:] = n def merge_all(self, ordered_results): '' n_frames = np.stack([b.num_frames[0] for b in ordered_results.values()]) pixel_sums = np.stack([b.sum for b in ordered_results.values()]) pixel_varsums = np.stack([b.varsum for b in ordered_results.values()]) # Expand n_frames to be broadcastable extra_dims = pixel_sums.ndim - n_frames.ndim n_frames = n_frames.reshape(n_frames.shape + (1,) * extra_dims) cumulative_frames = np.cumsum(n_frames, axis=0) cumulative_sum = np.cumsum(pixel_sums, axis=0) sumsum = cumulative_sum[-1, ...] total_frames = cumulative_frames[-1, 0] mean_0 = cumulative_sum / cumulative_frames # Handle the fact that mean_0 is indexed to results from # up-to the partition before. We shift everything one to # the right, and we don't care about result 0 because it # is by definiition replaced with varsum[0, ...] mean_0 = np.roll(mean_0, 1, axis=0) mean_1 = pixel_sums / n_frames delta = mean_1 - mean_0 mean = mean_0 + (n_frames * delta) / cumulative_frames partial_delta = mean_1 - mean varsum = pixel_varsums + (n_frames * np.abs(delta) * np.abs(partial_delta)) varsum_total = np.sum(varsum, axis=0) return { 'sum': sumsum, 'varsum': varsum_total, 'num_frames': total_frames } def _adjust_dtype(self, input): base_dtype = self.params.dtype if base_dtype is None: base_dtype = np.float64 dtype = np.result_type(input.dtype, base_dtype) if input.dtype != dtype: return input.astype(dtype) else: return input def process_tile(self, tile): # Calculate a sum and variance minibatch for the tile and update partition buffers # with it. key = self.meta.tiling_scheme_idx n_0 = self.task_data.num_frames[key] n_1 = tile.shape[0] if n_0 == 0: # Make sure we calculate with full precision tile = self._adjust_dtype(tile) self.results.sum[:] = self.forbuf(tile.sum(axis=0), self.results.sum) # Done this way to support the various array libraries delta = np.abs(tile - tile.mean(axis=0)) self.results.varsum[:] = self.forbuf( np.sum(np.multiply(delta, delta), axis=0).real, self.results.varsum ) self.task_data.num_frames[key] = n_1 else: if isinstance(tile, np.ndarray) and self.params.use_numba: my_process_tile = process_tile else: # Make sure we calculate with full precision tile = self._adjust_dtype(tile) my_process_tile = functools.partial(process_tile_ndarray, forbuf=self.forbuf) self.task_data.num_frames[key] = my_process_tile( tile=tile.reshape((n_1, -1)), n_0=n_0, sum_inout=reshaped_view(self.results.sum, (-1, )), varsum_inout=reshaped_view(self.results.varsum, (-1, )), ) def get_results(self): '' # Calculate variance, mean and standard deviation from raw UDF results num_frames = self.results.num_frames[0] var = self.results.varsum / num_frames return { 'var': var, 'std': np.sqrt(var), 'mean': self.results.sum / num_frames, }
[docs] def consolidate_result(udf_result): ''' Calculate variance, mean and standard deviation from raw UDF results and consolidate the per-tile frame counter into a single value. Convert all result arrays to `ndarray`. Note ---- This is mostly here for backwards-compatability - nowadays, 'var', 'std', and 'mean' are already calculated in :meth:`StdDevUDF.get_results`. Parameters ---------- udf_result : Dict[str, BufferWrapper] UDF result with keys 'sum', 'varsum', 'num_frames', 'var', 'std', 'mean' Returns ------- pass_results : Dict[str, Union[numpy.ndarray, int]] Result dictionary with keys :code:`'sum', 'varsum', 'var', 'std', 'mean'` as :class:`numpy.ndarray`, and :code:`'num_frames'` as :code:`int` ''' return { 'num_frames': udf_result['num_frames'].data[0], 'varsum': udf_result['varsum'].data, 'sum': udf_result['sum'].data, 'var': udf_result['var'].data, 'std': udf_result['std'].data, 'mean': udf_result['mean'].data, }
[docs] def run_stddev(ctx, dataset, roi=None, progress=False, use_numba=True): """ Compute sum of variances and sum of pixels from the given dataset One-pass algorithm used in this code is taken from the following paper: :cite:`Schubert2018`. .. versionchanged:: 0.5.0 Result buffers have been renamed .. versionchanged:: 0.5.0 Added :code:`progress` parameter for progress bar Parameters ---------- ctx : libertem.api.Context dataset : libertem.io.dataset.base.DataSet dataset to work on roi : numpy.ndarray Region of interest, see :ref:`udf roi` for more information. progress : bool, optional Show progress bar. Default is :code:`False`. use_numba : bool Use the Numba backend if available, otherwise use ndarray based implementation Returns ------- pass_results A dictionary of narrays that contains sum of variances, sum of pixels, and number of frames used to compute the above statistic To retrieve statistic, using the following commands: variance : :code:`pass_results['var']` standard deviation : :code:`pass_results['std']` sum of pixels : :code:`pass_results['sum']` mean : :code:`pass_results['mean']` number of frames : :code:`pass_results['num_frames']` """ stddev_udf = StdDevUDF(use_numba=use_numba) pass_results = ctx.run_udf( dataset=dataset, udf=stddev_udf, roi=roi, progress=progress ) return consolidate_result(pass_results)