Source code for libertem.io.dataset.base.file

from typing import NamedTuple
import numpy as np

from libertem.common.math import prod


class OffsetsSizes(NamedTuple):
    """
    General slicing for slicing the data out of a file
    (removing file header and per-frame headers)
    """

    # whole-file slicing, in bytes
    file_offset: int  # number of bytes to skip at the beginning of the file
    skip_end: int  # how many bytes to skip at the end

    # per-frame slicing, in number of items in native dtype
    frame_offset: int  # number of items to skip at the beginning of each frame
    frame_size: int  # number of items for each frame


[docs] class File: """ A description of a file that is part of a dataset. Contains information about the internal structure, like sizes of headers, frames, frame headers, frame footers, ... Parameters ---------- path : str The path of the file. Interpretation may be backend-specific start_idx : int Start index of signal elements in this file (inclusive), in the flattened navigation axis end_idx : int End index of signal elements in this file (exclusive), in the flattened navigation axis native_dtype : np.dtype The dtype that is used for reading the data. This may match the "real" dtype of data, or in some cases, when no direct match is possible (decoding is necessary), it falls back to bytes (np.uint8) sig_shape : Shape | Tuple[int, ...] The shape of each signal element file_header: int Number of bytes to ignore at the beginning of the file frame_header: int Number of bytes to ignore before each frame frame_footer: int Number of bytes to ignore after each frame """ def __init__(self, path, start_idx, end_idx, native_dtype, sig_shape, frame_footer=0, frame_header=0, file_header=0): self._start_idx = start_idx self._end_idx = end_idx self._native_dtype = native_dtype self._path = path self._file_header = file_header self._frame_header = frame_header self._frame_footer = frame_footer self._sig_shape = tuple(sig_shape) @property def file_header_bytes(self) -> int: return self._file_header @property def start_idx(self) -> int: return self._start_idx @property def end_idx(self) -> int: return self._end_idx @property def num_frames(self) -> int: return self._end_idx - self._start_idx @property def sig_shape(self) -> tuple[int, ...]: return self._sig_shape @property def native_dtype(self) -> np.dtype: return self._native_dtype @property def path(self) -> str: return self._path
[docs] def get_offsets_sizes(self, size: int) -> OffsetsSizes: """ Get file and frame offsets/sizes Parameters ---------- size : int len(memoryview) for the whole file Returns ------- slicing The file/frame slicing """ itemsize = np.dtype(self._native_dtype).itemsize assert self._frame_header % itemsize == 0 assert self._frame_footer % itemsize == 0 frame_size = int(prod(self._sig_shape)) frame_offset = self._frame_header // itemsize file_offset = self._file_header skip_end = 0 # cut off any extra data at the end of the file: if size % int(prod(self._sig_shape)): new_mmap_size = self.num_frames * ( (itemsize * frame_size) + self._frame_header + self._frame_footer ) skip_end = (size - file_offset) - new_mmap_size assert skip_end >= 0 return OffsetsSizes( file_offset=file_offset, skip_end=skip_end, frame_offset=frame_offset, frame_size=frame_size, )
[docs] def get_array_from_memview(self, mem: memoryview, slicing: OffsetsSizes) -> np.ndarray: """ Convert a memoryview of the file's data into an ndarray, cutting away frame headers and footers as defined by `start` and `stop` parameters. Parameters ---------- mem The input memoryview start Cut off frame headers of this size; usually start = frame_header_bytes // itemsize stop End index; usually stop = start + prod(sig_shape) Returns ------- np.ndarray The output array. Should have shape (num_frames, prod(sig_shape)) and native dtype """ if slicing.skip_end > 0: mem = mem[slicing.file_offset:-slicing.skip_end] else: mem = mem[slicing.file_offset:] itemsize = np.dtype(self._native_dtype).itemsize assert len(mem) % itemsize == 0, \ "len(mem) must fit the dtype" assert len(mem) // itemsize % self.num_frames == 0, \ "len(mem) must fit the number of frames" assert len(mem) // itemsize // self.num_frames % ( slicing.frame_size + (self._frame_header + self._frame_footer) // itemsize ) == 0, "len(mem) must fit the sig shape" arr_uncut = np.frombuffer(mem, dtype=self._native_dtype).reshape( (self.num_frames, -1) ) arr = arr_uncut[:, slicing.frame_offset:slicing.frame_offset + slicing.frame_size] assert arr.shape[1] == slicing.frame_size, \ "array shape must fit the signal shape" assert arr.size > 0 return arr
def __repr__(self): return "<%s %d:%d>" % (self.__class__.__name__, self._start_idx, self._end_idx)