Source code for

import os
import contextlib
import typing
from typing import Optional
import warnings
import logging
import time

import numpy as np
import h5py
from sparseconverter import CUDA, NUMPY, ArrayBackend

from libertem.common.math import prod, flat_nonzero
from libertem.common import Slice, Shape
from libertem.common.buffers import zeros_aligned
from import CorrectionSet
from libertem.common.messageconverter import MessageConverter
from .base import (
    DataSet, Partition, DataTile, DataSetException, DataSetMeta,

# alias for mocking:
current_time = time.time

logger = logging.getLogger(__name__)

class HDF5DatasetParams(MessageConverter):
    SCHEMA = {
        "$schema": "",
        "$id": "",
        "title": "HDF5DatasetParams",
        "type": "object",
        "properties": {
            "type": {"const": "HDF5"},
            "path": {"type": "string"},
            "ds_path": {"type": "string"},
            "nav_shape": {
                "type": "array",
                "items": {"type": "number", "minimum": 1},
                "minItems": 2,
                "maxItems": 2
            "sig_shape": {
                "type": "array",
                "items": {"type": "number", "minimum": 1},
                "minItems": 2,
                "maxItems": 2
            "sync_offset": {"type": "number"},
        "required": ["type", "path", "ds_path"]

    def convert_to_python(self, raw_data):
        data = {
            k: raw_data[k]
            for k in ["path", "ds_path"]
        if "nav_shape" in raw_data:
            data["nav_shape"] = tuple(raw_data["nav_shape"])
        if "sig_shape" in raw_data:
            data["sig_shape"] = tuple(raw_data["sig_shape"])
        if "sync_offset" in raw_data:
            data["sync_offset"] = raw_data["sync_offset"]
        return data

def _ensure_2d_nav(nav_shape: tuple[int, ...]) -> tuple[int, int]:
    # For any iterable shape, reduce or pad it to a 2-tuple
    # with the same prod(shape). Reduction from left to right
    # (final dimension preserved). Special case for empty
    # nav_shape which is converted to (1, 1)
    nav_shape = tuple(nav_shape)
    if len(nav_shape) == 1:
        nav_shape = (1,) + nav_shape
    elif len(nav_shape) >= 2:
        nav_shape = (prod(nav_shape[:-1]),) + nav_shape[-1:]
    elif len(nav_shape) == 0:
        return (1, 1)
        raise ValueError(f'Incompatible nav_shape {nav_shape}')
    return nav_shape

class HDF5ArrayDescriptor(typing.NamedTuple):
    name: str
    shape: tuple[int, ...]
    dtype: np.dtype
    compression: Optional[str]
    chunks: tuple[int, ...]

def _get_datasets(path):
    datasets: list[HDF5ArrayDescriptor] = []

        timeout = int(os.environ.get('LIBERTEM_IO_HDF5_TIMEOUT_DEBUG', 3))
    except ValueError:
        timeout = 3

    t0 = current_time()

    def _make_list(name, obj):
        if current_time() - t0 > timeout:
            raise TimeoutError
        if hasattr(obj, 'size') and hasattr(obj, 'shape'):
            if obj.ndim < 3:
                # Can't process this dataset, skip
                HDF5ArrayDescriptor(name, obj.shape, obj.dtype, obj.compression, obj.chunks)

    with h5py.File(path, 'r') as f:
    return datasets

def _have_contig_chunks(chunks, ds_shape):
    Returns `True` if the `chunks` are contiguous in the navigation axes.


    >>> ds_shape = Shape((64, 64, 128, 128), sig_dims=2)
    >>> _have_contig_chunks((1, 4, 32, 32), ds_shape)
    >>> _have_contig_chunks((2, 4, 32, 32), ds_shape)
    >>> _have_contig_chunks((2, 64, 32, 32), ds_shape)
    >>> _have_contig_chunks((64, 1, 32, 32), ds_shape)
    >>> ds_shape_5d = Shape((16, 64, 64, 128, 128), sig_dims=2)
    >>> _have_contig_chunks((1, 1, 2, 32, 32), ds_shape_5d)
    >>> _have_contig_chunks((1, 2, 1, 32, 32), ds_shape_5d)
    >>> _have_contig_chunks((2, 1, 1, 32, 32), ds_shape_5d)
    # In other terms:
    # There exists an index `i` such that `prod(chunks[:i]) == 1` and
    # chunks[i+1:] == ds_shape[i+1:], limited to the nav part of chunks and ds_shape
    nav_shape = tuple(ds_shape.nav)
    nav_dims = len(nav_shape)
    chunks_nav = chunks[:nav_dims]

    for i in range(nav_dims):
        left = chunks_nav[:i]
        left_prod = prod(left)
        if left_prod == 1 and chunks_nav[i + 1:] == nav_shape[i + 1:]:
            return True
    return False

def _partition_shape_for_chunking(chunks, ds_shape):
    Get the minimum partition shape for that allows us to prevent read amplification
    with chunked HDF5 files.


    >>> ds_shape = Shape((64, 64, 128, 128), sig_dims=2)
    >>> _partition_shape_for_chunking((1, 4, 32, 32), ds_shape)
    (1, 4, 128, 128)
    >>> _partition_shape_for_chunking((2, 4, 32, 32), ds_shape)
    (2, 64, 128, 128)
    >>> _partition_shape_for_chunking((2, 64, 32, 32), ds_shape)
    (2, 64, 128, 128)
    >>> _partition_shape_for_chunking((64, 1, 32, 32), ds_shape)
    (64, 64, 128, 128)
    >>> ds_shape_5d = Shape((16, 64, 64, 128, 128), sig_dims=2)
    >>> _partition_shape_for_chunking((1, 1, 2, 32, 32), ds_shape_5d)
    (1, 1, 2, 128, 128)
    >>> _partition_shape_for_chunking((1, 2, 1, 32, 32), ds_shape_5d)
    (1, 2, 64, 128, 128)
    >>> _partition_shape_for_chunking((2, 1, 1, 32, 32), ds_shape_5d)
    (2, 64, 64, 128, 128)
    first_non_one = [x == 1 for x in chunks].index(False)
    shape_left = chunks[:first_non_one + 1]
    return shape_left + ds_shape[first_non_one + 1:]

def _tileshape_for_chunking(chunks, ds_shape):
    Calculate a tileshape for tiled reading from chunked
    data sets.

    >>> ds_shape = Shape((64, 64, 128, 128), sig_dims=2)
    >>> _tileshape_for_chunking((1, 4, 32, 32), ds_shape)
    (4, 32, 32)
    return chunks[-ds_shape.sig.dims - 1:]

def _get_tileshape_nd(partition_slice, tiling_scheme):
    extra_nav_dims = partition_slice.shape.nav.dims - tiling_scheme.shape.nav.dims
    # keep shape of the rightmost dimension:
    nav_item = min(tiling_scheme.shape[0], partition_slice.shape.nav[-1])
    return extra_nav_dims * (1,) + (nav_item,) + tuple(tiling_scheme.shape.sig)

class H5Reader:
    def __init__(self, path, ds_path):
        self._path = path
        self._ds_path = ds_path

    def get_h5ds(self, cache_size=1024 * 1024):
        logger.debug("H5Reader.get_h5ds: cache_size=%dMiB", cache_size / 1024 / 1024)
        with h5py.File(self._path, 'r', rdcc_nbytes=cache_size, rdcc_nslots=19997) as f:
            yield f[self._ds_path]

[docs] class H5DataSet(DataSet): """ Read data from a HDF5 data set. Examples -------- >>> ds = ctx.load("hdf5", path=path_to_hdf5, ds_path="/data") Parameters ---------- path: str Path to the file ds_path: str Path to the HDF5 data set inside the file nav_shape: tuple of int, optional A n-tuple that specifies the shape of the navigation / scan grid. By default this is inferred from the HDF5 dataset. sig_shape: tuple of int, optional A n-tuple that specifies the shape of the signal / frame grid. This parameter is currently unsupported and will raise an error if provided and not matching the underlying data sig shape. By default the sig_shape is inferred from the HDF5 dataset via the :code:`sig_dims` parameter. sig_dims: int Number of dimensions that should be considered part of the signal (for example 2 when dealing with 2D image data) sync_offset: int, optional, by default 0 If positive, number of frames to skip from start If negative, number of blank frames to insert at start target_size: int Target partition size, in bytes. Usually doesn't need to be changed. min_num_partitions: int Minimum number of partitions, set to number of cores if not specified. Usually doesn't need to be specified. Note ---- If the HDF5 file to be loaded contains compressed data using a custom compression filter (other than GZIP, LZF or SZIP), the associated HDF5 filter library must be imported on the workers before accessing the file. See the `h5py documentation on filter pipelines <>`_ for more information. The library `hdf5plugin <>`_ is preloaded automatically if it is installed. Other filter libraries may have to be specified for preloading by the user. Preloads for a local :class:`~libertem.executor.dask.DaskJobExecutor` can be specified through the :code:`preload` argument of either :meth:`~libertem.executor.dask.DaskJobExecutor.make_local` or :func:`libertem.executor.dask.cluster_spec`. For the :class:`libertem.executor.inline.InlineJobExecutor`, the plugins can simply be imported in the main script. For the web GUI or for running LiberTEM in a cluster with existing workers (e.g. by running :code:`libertem-worker` or :code:`dask-worker` on nodes), necessary imports can be specified as :code:`--preload` arguments to the launch command, for example with :code:`libertem-server --preload hdf5plugin` resp. :code:`libertem-worker --preload hdf5plugin tcp://scheduler_ip:port`. :code:`--preload` can be specified multiple times. """ def __init__(self, path, ds_path=None, tileshape=None, nav_shape=None, sig_shape=None, target_size=None, min_num_partitions=None, sig_dims=2, io_backend=None, sync_offset: int = 0): super().__init__(io_backend=io_backend) if io_backend is not None: raise ValueError("H5DataSet currently doesn't support alternative I/O backends") self.path = path self.ds_path = ds_path self.target_size = target_size self.sig_dims = sig_dims # handle backwards-compatability: if tileshape is not None: warnings.warn( "tileshape argument is ignored and will be removed after 0.6.0", FutureWarning ) # self.min_num_partitions appears to be never used self.min_num_partitions = min_num_partitions self._dtype = None self._shape = None self._nav_shape = nav_shape self._sig_shape = sig_shape self._sync_offset = sync_offset self._chunks = None self._compression = None def get_reader(self): return H5Reader( path=self.path, ds_path=self.ds_path ) def _do_initialize(self): if self.ds_path is None: try: datasets = _get_datasets(self.path) largest_ds = max(datasets, key=lambda x: prod(x.shape)) # FIXME: excepting `SystemError` temporarily # more info: except (ValueError, TimeoutError, SystemError): raise DataSetException(f'Unable to infer dataset from file {self.path}') self.ds_path = with self.get_reader().get_h5ds() as h5ds: self._dtype = h5ds.dtype shape = h5ds.shape if len(shape) == self.sig_dims: # shape = (1,) + shape -> this leads to indexing errors down the line # so we currently don't support opening 2D HDF5 files raise DataSetException("2D HDF5 files are currently not supported") ds_shape = Shape(shape, sig_dims=self.sig_dims) if self._sig_shape is not None and self._sig_shape != ds_shape.sig.to_tuple(): raise DataSetException("sig reshaping currently not supported with HDF5 files") self._image_count = ds_shape.nav.size nav_shape = ds_shape.nav.to_tuple() if self._nav_shape is None else self._nav_shape self._shape = nav_shape + ds_shape.sig self._meta = DataSetMeta( shape=self.shape, raw_dtype=self._dtype, sync_offset=self._sync_offset, image_count=self._image_count, metadata={'ds_raw_shape': ds_shape} ) self._chunks = h5ds.chunks self._compression = h5ds.compression if self._compression is not None: warnings.warn( "Loading compressed HDF5, performance can be worse than with other formats", RuntimeWarning ) self._nav_shape_product = self._shape.nav.size self._sync_offset_info = self.get_sync_offset_info() return self def initialize(self, executor): return executor.run_function(self._do_initialize) @classmethod def get_msg_converter(cls): return HDF5DatasetParams @classmethod def get_supported_extensions(cls): return {"h5", "hdf5", "hspy", "nxs"} @classmethod def get_supported_io_backends(self): return [] @classmethod def _do_detect(cls, path): try: with h5py.File(path, 'r'): pass except OSError as e: raise DataSetException(repr(e)) from e @classmethod def detect_params(cls, path, executor): try: executor.run_function(cls._do_detect, path) except (OSError, KeyError, ValueError, TypeError, DataSetException): # not a h5py file or can't open for some reason: return False # Read the dataset info from the file try: datasets = executor.run_function(_get_datasets, path) if not datasets: raise RuntimeError(f'Found no compatible datasets in the file {path}') # FIXME: excepting `SystemError` temporarily # more info: except (RuntimeError, TimeoutError, SystemError): return { "parameters": { "path": path, }, "info": { "datasets": [], } } # datasets contains at least one HDF5ArrayDescriptor # sig_dims is implicitly two here (for web GUI) sig_dims = 2 full_info = [ { "path":, "shape": ds_item.shape, "compression": ds_item.compression, "chunks": ds_item.chunks, "raw_nav_shape": ds_item.shape[:-sig_dims], "nav_shape": _ensure_2d_nav(ds_item.shape[:-sig_dims]), "sig_shape": ds_item.shape[-sig_dims:], "image_count": prod(ds_item.shape[:-sig_dims]), } for ds_item in datasets ] # use the largest size array as initial hdf5 dataset path # need to get info dict to access unpacked nav/sig shape # next line implements argmax on ds_descriptor.size ds_idx, _ = max(enumerate(datasets), key=lambda idx_x: prod(idx_x[1].shape)) largest_ds = full_info[ds_idx] return { "parameters": { "path": path, "ds_path": largest_ds['path'], "nav_shape": largest_ds['nav_shape'], "sig_shape": largest_ds['sig_shape'], }, "info": { "datasets": full_info } } @property def dtype(self): if self._dtype is None: raise RuntimeError("please call initialize") return self._dtype @property def shape(self): if self._shape is None: raise RuntimeError("please call initialize") return self._shape def check_valid(self): try: with self.get_reader().get_h5ds() as h5ds: h5ds.shape return True except (OSError, KeyError, ValueError) as e: raise DataSetException("invalid dataset: %s" % e) def get_cache_key(self): return { "path": self.path, "ds_path": self.ds_path, } def get_diagnostics(self): with self.get_reader().get_h5ds() as ds: try: datasets = _get_datasets(self.path) # FIXME: excepting `SystemError` temporarily # more info: except (TimeoutError, SystemError): datasets = [] datasets = [ {"name":, "value": [ {"name": "Size", "value": str(prod(descriptor.shape))}, {"name": "Shape", "value": str(descriptor.shape)}, {"name": "Datatype", "value": str(descriptor.dtype)}, ]} for descriptor in sorted( datasets, key=lambda i: prod(i.shape), reverse=True ) ] return [ {"name": "dtype", "value": str(ds.dtype)}, {"name": "chunks", "value": str(ds.chunks)}, {"name": "compression", "value": str(ds.compression)}, {"name": "datasets", "value": datasets}, ] def get_min_sig_size(self): if self._chunks is not None: return 1024 # allow for tiled processing w/ small-ish chunks # un-chunked HDF5 seems to prefer larger signal slices, so we aim for 32 4k blocks: return 32 * 4096 // np.dtype(self.meta.raw_dtype).itemsize def get_max_io_size(self) -> Optional[int]: if self._chunks is not None: # this may result in larger tile depth than necessary, but # it needs to be so big to pass the validation of the Negotiator. The tiles # won't ever be as large as the scheme dictates, anyway. # We limit it here to 256e6 elements, to also keep the chunk cache # usage reasonable: return int(256e6) return None # use default value from Negotiator def get_base_shape(self, roi): if roi is not None: return (1,) + self.shape.sig if self._chunks is not None: sig_chunks = self._chunks[-self.shape.sig.dims:] return (1,) + sig_chunks return (1, 1,) + (self.shape[-1],) def adjust_tileshape(self, tileshape, roi): chunks = self._chunks sig_shape = self.shape.sig if roi is not None: return (1,) + sig_shape if chunks is not None and not _have_contig_chunks(chunks, self.shape): sig_chunks = chunks[-sig_shape.dims:] sig_ts = tileshape[-sig_shape.dims:] # if larger signal chunking is requested in the negotiation, # switch to full frames: if any(t > c for t, c in zip(sig_ts, sig_chunks)): # try to keep total tileshape size: tileshape_size = prod(tileshape) depth = max(1, tileshape_size // sig_shape.size) return (depth,) + sig_shape else: # depth needs to be limited to prod(chunks.nav) return _tileshape_for_chunking(chunks, self.shape) return tileshape def need_decode(self, roi, read_dtype, corrections): return True def get_partitions(self): # ds_shape = Shape(self.shape, sig_dims=self.sig_dims) ds_shape: Shape = self.meta['ds_raw_shape'] ds_slice = Slice(origin=[0] * len(ds_shape), shape=ds_shape) target_size = self.target_size if target_size is None: if self._compression is None: target_size = 512 * 1024 * 1024 else: target_size = 256 * 1024 * 1024 partition_shape = self.partition_shape( target_size=target_size, dtype=self.dtype, containing_shape=ds_shape, ) + tuple(ds_shape.sig) # if the data is chunked in the navigation axes, choose a compatible # partition size (even important for non-compressed data!) chunks = self._chunks if chunks is not None and not _have_contig_chunks(chunks, ds_shape): partition_shape = _partition_shape_for_chunking(chunks, ds_shape) # -ve sync offset insert blank at beginning (skips at end) # +ve sync offset skips frames at beginning (blank at end) sync_offset = self._sync_offset ds_flat_shape = self.shape.flatten_nav() for slice_nd in ds_slice.subslices(partition_shape): raw_frames_slice = slice_nd.flatten_nav(ds_shape) raw_origin = raw_frames_slice.origin if self._sync_offset <= 0: # negative or zero s-o, shift right, clip length at end raw_frames_slice.origin = (raw_origin[0] + abs(sync_offset),) + raw_origin[1:] else: # positive s-o, shift left, clip part length at beginning corrected_nav_origin = raw_origin[0] - sync_offset if corrected_nav_origin < 0: corrected_nav_size = raw_frames_slice.shape[0] + corrected_nav_origin if corrected_nav_size <= 0: # Empty partition, skip continue raw_frames_slice.shape = (corrected_nav_size,) + raw_frames_slice.shape.sig raw_frames_slice.origin = (max(0, corrected_nav_origin),) + raw_origin[1:] # All raw_frames_slice should have non-zero dims here partition_slice = raw_frames_slice.clip_to(ds_flat_shape) if any(v <= 0 for v in partition_slice.shape): # Empty partition after clip to desired shape, skip continue yield H5Partition( meta=self._meta, reader=self.get_reader(), partition_slice=partition_slice, slice_nd=slice_nd, io_backend=self.get_io_backend(), chunks=self._chunks, decoder=None, sync_offset=self._sync_offset, ) def __repr__(self): return f"<H5DataSet of {self._dtype} shape={self._shape}>"
class H5Partition(Partition): def __init__(self, reader: H5Reader, slice_nd: Slice, chunks, sync_offset=0, *args, **kwargs): self.reader = reader self.slice_nd = slice_nd self._corrections = None self._chunks = chunks self._sync_offset = sync_offset super().__init__(*args, **kwargs) def _have_compatible_chunking(self): chunks = self._chunks if chunks is None: return True # all-1 in nav dims works: nav_dims = self.slice_nd.shape.nav.dims chunks_nav = self._chunks[:nav_dims] if all(c == 1 for c in chunks_nav): return True # everything else is problematic and needs special case: return False def _get_subslices_chunked_full_frame(self, scheme_lookup, nav_dims, tileshape_nd): """ chunked full-frame reading. outer loop goes over the navigation coords of each chunk, inner loop is pushed into hdf5 by reading full frames need to order slices in a way that efficiently uses the chunk cache. """ chunks_nav = self._chunks[:nav_dims] chunk_full_frame = chunks_nav + self.slice_nd.shape.sig chunk_slices = self.slice_nd.subslices(shape=chunk_full_frame) logger.debug( "_get_subslices_chunked_full_frame: chunking first by %r, then %r", chunk_full_frame, tileshape_nd, ) for chunk_slice in chunk_slices: subslices = chunk_slice.subslices(shape=tileshape_nd) for subslice in subslices: idx = scheme_lookup[(subslice.origin[nav_dims:], subslice.shape[nav_dims:])] yield idx, subslice def _get_subslices_chunked_tiled(self, tiling_scheme, scheme_lookup, nav_dims, tileshape_nd): """ general tiled reading w/ chunking outer loop is a chunk in signal dimensions, inner loop is over "rows in nav" """ slice_nd_sig = self.slice_nd.sig slice_nd_nav = self.slice_nd.nav chunks_nav = self._chunks[:nav_dims] sig_slices = slice_nd_sig.subslices(tiling_scheme.shape.sig) logger.debug( "_get_subslices_chunked_tiled: chunking first by sig %r, then nav %r, finally %r", tiling_scheme.shape.sig, chunks_nav, tileshape_nd ) for sig_slice in sig_slices: chunk_slices = slice_nd_nav.subslices(shape=chunks_nav) for chunk_slice_nav in chunk_slices: chunk_slice = Slice( origin=chunk_slice_nav.origin + sig_slice.origin, shape=chunk_slice_nav.shape + tuple(sig_slice.shape), ) subslices = chunk_slice.subslices(shape=tileshape_nd) for subslice in subslices: scheme_key = (subslice.origin[nav_dims:], subslice.shape[nav_dims:]) idx = scheme_lookup[scheme_key] yield idx, subslice def _get_subslices(self, tiling_scheme): """ Generate partition subslices for the given tiling scheme for the different cases. """ if tiling_scheme.intent == "partition": tileshape_nd = self.slice_nd.shape else: tileshape_nd = _get_tileshape_nd(self.slice_nd, tiling_scheme) assert all(ts <= ps for (ts, ps) in zip(tileshape_nd, self.slice_nd.shape)) nav_dims = self.slice_nd.shape.nav.dims # Three cases need to be handled: if self._have_compatible_chunking(): # 1) no chunking, or compatible chunking. we are free to use # whatever access pattern we deem efficient: logger.debug("using simple tileshape_nd slicing") subslices = self.slice_nd.subslices(shape=tileshape_nd) scheme_len = len(tiling_scheme) for idx, subslice in enumerate(subslices): scheme_idx = idx % scheme_len yield scheme_idx, subslice else: scheme_lookup = { (s.discard_nav().origin, tuple(s.discard_nav().shape)): idx for idx, s in tiling_scheme.slices } if len(tiling_scheme) == 1: logger.debug("using full-frame subslicing") yield from self._get_subslices_chunked_full_frame( scheme_lookup, nav_dims, tileshape_nd ) else: logger.debug("using chunk-adaptive subslicing") yield from self._get_subslices_chunked_tiled( tiling_scheme, scheme_lookup, nav_dims, tileshape_nd ) def _preprocess(self, tile_data, tile_slice): if self._corrections is None: return self._corrections.apply(tile_data, tile_slice) def _get_read_cache_size(self) -> float: chunks = self._chunks if chunks is None: return 1 * 1024 * 1024 else: # heuristic on maximum chunk cache size based on number of cores # of the node this worker is running on, available memory, ... import psutil mem = psutil.virtual_memory() num_cores = psutil.cpu_count(logical=False) available: int = mem.available if num_cores is None: num_cores = 2 cache_size: float = max(256 * 1024 * 1024, available * 0.8 / num_cores) return cache_size def _get_h5ds(self): cache_size = self._get_read_cache_size() return self.reader.get_h5ds(cache_size=cache_size) def _get_tiles_normal(self, tiling_scheme: TilingScheme, dest_dtype): with self._get_h5ds() as dataset: # because the dtype conversion done by HDF5 itself can be quite slow, # we need to use a buffer for reading in hdf5 native dtype: data_flat = np.zeros(tiling_scheme.shape, dtype=dataset.dtype).reshape((-1,)) # ... and additionally a result buffer, for re-using the array used in the DataTile: data_flat_res = np.zeros(tiling_scheme.shape, dtype=dest_dtype).reshape((-1,)) subslices = self._get_subslices( tiling_scheme=tiling_scheme, ) sync_offset = self._sync_offset ds_num_frames = self.meta.shape.nav.size for scheme_idx, tile_slice in subslices: tile_slice_flat: Slice = tile_slice.flatten_nav(self.meta['ds_raw_shape']) raw_origin = tile_slice_flat.origin raw_shape = tile_slice_flat.shape # The following block translates from tile_slice in the raw array # to the partition coordinate system with sync_offset applied # By doing this before reading we can avoid reading some tiles # at the beginning/end of the dataset # We will still read tiles which partially overlap the nav space # and afterwards we drop the unecessary frames corrected_nav_origin = raw_origin[0] - sync_offset if corrected_nav_origin < 0: # positive sync_offset, drop frames at beginning of DS if abs(corrected_nav_origin) > tile_slice_flat.shape[0]: # tile is completely before the first partition, can skip it continue # Clip at the beginning so adjust the tile shape new_nav_size = tile_slice_flat.shape[0] + corrected_nav_origin tile_slice_flat.shape = (new_nav_size,) + tile_slice_flat.shape.sig # Apply max(0, corrected_nav_origin) so we never provide negative nav coord tile_slice_flat.origin = (max(0, corrected_nav_origin),) + raw_origin[1:] # Now check for clipping at the end of the dataset final_frame_idx = tile_slice_flat.origin[0] + tile_slice_flat.shape[0] frames_beyond_end = final_frame_idx - ds_num_frames # We want to skip any tiles which are completely past the end of the dataset # and clip those which are only partially overlapping the final partition if frames_beyond_end >= tile_slice_flat.shape[0]: # Empty tile after clip, skip continue elif frames_beyond_end > 0: # tile partially overlaps end of dataset, adjust the shape new_nav_size = tile_slice_flat.shape[0] - frames_beyond_end tile_slice_flat.shape = (new_nav_size,) + tile_slice_flat.shape.sig # Read the data in this block # cut buffer into the right size buf_size = tile_slice.shape.size buf = data_flat[:buf_size].reshape(tile_slice.shape) buf_res = data_flat_res[:buf_size].reshape(tile_slice.shape) dataset.read_direct(buf, source_sel=tile_slice.get()) buf_res[:] = buf # extra copy for faster dtype/endianess conversion tile_data = buf_res.reshape(raw_shape) # If the true tile origin is before the start of the dataset, must drop frames # This corresponds to the first raw tile which overlaps the first partition # and can only occur when sync_offset > 0 if corrected_nav_origin < 0: tile_data = tile_data[abs(corrected_nav_origin):, ...] # The final tiles in the dataset can partially overlap the final partition # Drop frames at end to match the partition size # we already verified if any frames will remain # this can occur for both +ve and -ve sync_offset if frames_beyond_end > 0: tile_data = tile_data[:-frames_beyond_end, ...] # NOTE could the above two blocks ever apply simultaneously? # would the two operations conflict ? self._preprocess(tile_data, tile_slice_flat) yield DataTile( tile_data, tile_slice=tile_slice_flat, scheme_idx=scheme_idx, ) def _get_tiles_with_roi(self, roi, dest_dtype, tiling_scheme): # we currently don't chop up the frames when reading with a roi, so # the tiling scheme also must not contain more than one slice: # NOTE Why is this ?? assert len(tiling_scheme) == 1, "incompatible tiling scheme! (%r)" % (tiling_scheme) flat_roi_nonzero = flat_nonzero(roi) start_at_frame = self.slice.origin[0] stop_at_frame = start_at_frame + self.slice.shape[0] part_mask = np.logical_and(flat_roi_nonzero >= start_at_frame, flat_roi_nonzero < stop_at_frame) # Must yield tiles with tile_slice in compressed nav dimension for roi frames_in_c_nav = np.arange(flat_roi_nonzero.size)[part_mask] frames_in_part = flat_roi_nonzero[part_mask] # -ve sync offset insert blank at beginning (skips at end) # +ve sync offset skips frames at beginning (blank at end) frames_in_raw = frames_in_part + self._sync_offset raw_shape = self.meta['ds_raw_shape'].nav.to_tuple() result_shape = Shape((1,) + tuple(self.meta.shape.sig), sig_dims=self.meta.shape.sig.dims) sig_origin = (0,) * self.meta.shape.sig.dims tile_data = np.zeros(result_shape, dtype=dest_dtype) with self._get_h5ds() as h5ds: tile_data_raw = np.zeros(result_shape, dtype=h5ds.dtype) for c_nav_idx, raw_idx in zip(frames_in_c_nav, frames_in_raw): tile_slice = Slice( origin=(c_nav_idx,) + sig_origin, shape=result_shape, ) nav_coord = np.unravel_index(raw_idx, raw_shape) h5ds.read_direct(tile_data_raw, source_sel=nav_coord) tile_data[:] = tile_data_raw # extra copy for dtype/endianess conversion self._preprocess(tile_data, tile_slice) yield DataTile( tile_data, tile_slice=tile_slice, # there is only a single slice in the tiling scheme, so our # scheme_idx is constant 0 scheme_idx=0, ) def set_corrections(self, corrections: CorrectionSet): self._corrections = corrections def get_tiles(self, tiling_scheme: TilingScheme, dest_dtype="float32", roi=None, array_backend: Optional[ArrayBackend] = None): if array_backend is None: array_backend = self.meta.array_backends[0] assert array_backend in (NUMPY, CUDA) tiling_scheme = tiling_scheme.adjust_for_partition(self) if roi is not None: yield from self._get_tiles_with_roi(roi, dest_dtype, tiling_scheme) else: yield from self._get_tiles_normal(tiling_scheme, dest_dtype) def get_locations(self): return None def get_macrotile(self, dest_dtype="float32", roi=None): ''' Return a single tile for the entire partition. This is useful to support process_partiton() in UDFs and to construct dask arrays from datasets. Note ---- This can be inefficient if the dataset is compressed and chunked in the navigation axis, because you can either have forced-large macrotiles, or you can have read amplification effects, where a much larger amount of data is read from the HDF5 file than necessary. For example, if your chunking is :code:`(32, 32, 32, 32)`, in a dataset that is :code:`(128, 128, 256, 256)`, the partition must cover the whole of :code:`(32, 128, 256, 256)` - this is because partitions are contiguous in the navigation axis. The other possibility is to keep the partition smaller, for example only :code:`(3, 128, 256, 256)`. That would mean when reading a chunk from HDF5, we can only use 3*32 frames of the total 32*32 frames, a whopping ~10x read amplification. ''' tileshape = self.shape if self._chunks is not None: tileshape = self._chunks tiling_scheme = TilingScheme.make_for_shape( tileshape=Shape(tileshape, sig_dims=self.slice.shape.sig.dims).flatten_nav(), dataset_shape=self.meta.shape, ) data = zeros_aligned(self.slice.adjust_for_roi(roi).shape, dtype=dest_dtype) for tile in self.get_tiles( tiling_scheme=tiling_scheme, dest_dtype=dest_dtype, roi=roi, ): rel_slice = tile.tile_slice.shift(self.slice) data[rel_slice.get()] = tile_slice = Slice( origin=(self.slice.origin[0], 0, 0), shape=Shape(data.shape, sig_dims=2), ) return DataTile( data, tile_slice=tile_slice, scheme_idx=0, )