Source code for libertem.io.dataset.dm

import os
import typing
import logging
import warnings

from ncempy.io.dm import fileDM
import numpy as np

from libertem.common.math import prod
from libertem.common import Shape
from libertem.io.dataset.base.file import OffsetsSizes
from libertem.common.messageconverter import MessageConverter
from .base import (
    DataSet, FileSet, BasePartition, DataSetException, DataSetMeta, File,
    IOBackend,
)

log = logging.getLogger(__name__)

if typing.TYPE_CHECKING:
    from numpy import typing as nt


class SingleDMDatasetParams(MessageConverter):
    SCHEMA = {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "$id": "http://libertem.org/DMDatasetParams.schema.json",
        "title": "DMDatasetParams",
        "type": "object",
        "properties": {
            "type": {"const": "DM"},
            "path": {"type": "string"},
            "nav_shape": {
                "type": "array",
                "items": {"type": "number", "minimum": 1},
                "minItems": 2,
                "maxItems": 2
            },
            "sig_shape": {
                "type": "array",
                "items": {"type": "number", "minimum": 1},
                "minItems": 2,
                "maxItems": 2
            },
            "sync_offset": {"type": "number"},
            "io_backend": {
                "enum": IOBackend.get_supported(),
            },
            "force_c_order": {"type": "boolean"},
        },
        "required": ["type", "path"]
    }

    def convert_to_python(self, raw_data):
        data = {
            k: raw_data[k]
            for k in ["path"]
        }
        for k in ["nav_shape", "sig_shape", "sync_offset", "force_c_order"]:
            if k in raw_data:
                data[k] = raw_data[k]
        return data


class StackedDMDatasetParams(MessageConverter):
    SCHEMA: dict = {}

    def convert_from_python(self, raw_data):
        return super().convert_from_python(raw_data)

    def convert_to_python(self, raw_data):
        return super().convert_to_python(raw_data)


def _get_metadata(path):
    fh = fileDM(path, on_memory=True)
    if fh.numObjects == 1:
        idx = 0
    else:
        idx = 1
    return {
        'offset': fh.dataOffset[idx],
        'zsize': fh.zSize[idx],
    }


class StackedDMFile(File):
    def get_array_from_memview(self, mem: memoryview, slicing: OffsetsSizes):
        mem = mem[slicing.file_offset:-slicing.skip_end]
        res = np.frombuffer(mem, dtype="uint8")
        itemsize = np.dtype(self._native_dtype).itemsize
        sigsize = int(prod(self._sig_shape))
        cutoff = 0
        cutoff += (
            self.num_frames * itemsize * sigsize
        )
        res = res[:cutoff]
        return res.view(dtype=self._native_dtype).reshape(
            (self.num_frames, -1)
        )[:, slicing.frame_offset:slicing.frame_offset + slicing.frame_size]


class DMFileSet(FileSet):
    pass


class DMDataSet(DataSet):
    """
    Factory class for DigitalMicrograph file datasets

     - Passing either the :code:`files` kwarg or a tuple/list as first
       argument will create an instance of :class:`StackedDMDataSet`
     - Passing either the :code:`path` kwarg or any other object
       as first argument will create a :class:`SingleDMDataSet`

    This class is necessary to handle the difference in signatures and
    behaviours of the two DM dataset implementations, but these may
    later be fused if a markup format for multi-file datasets is implemented

    This class implements the methods necessary to expose a DMDataSet in
    the web GUI, which it does by deferring to SingleDMDataSet. At this
    time multi-file datasets are not supported in the UI.

    NOTE this way of generating the subclasses breaks deeper
    subclassing, as this __new__ method will always instantiate
    a SingleDMDataSet or StackedDMDataSet, and not a subclass of
    either of these. This could potentially be improved by using
    the .__instance_subclass__() staticmethod to register the
    subclasses and what they inherit from.
    """
    def __new__(cls, *args, **kwargs):
        # delayed here to avoid circular reference
        from .dm_single import SingleDMDataSet
        if 'path' in kwargs:
            subclass = SingleDMDataSet
        elif 'files' in kwargs:
            subclass = StackedDMDataSet
        elif args and isinstance(args[0], (list, tuple)):
            subclass = StackedDMDataSet
        else:
            subclass = SingleDMDataSet
        return super().__new__(subclass)

    @classmethod
    def get_supported_extensions(cls):
        return {"dm3", "dm4"}

    @classmethod
    def get_msg_converter(cls):
        return SingleDMDatasetParams

    @classmethod
    def detect_params(cls, path, executor):
        # delayed here to avoid circular reference
        from .dm_single import SingleDMDataSet
        return SingleDMDataSet.detect_params(path, executor)


[docs] class StackedDMDataSet(DMDataSet): """ Reader for stacks of DM3/DM4 files. Note ---- This DataSet is not supported in the GUI yet, as the file dialog needs to be updated to `properly handle opening series <https://github.com/LiberTEM/LiberTEM/issues/498>`_. Note ---- Single-file 3/4D DM datasets are supported through the :class:`~libertem.io.datasets.dm_single.SingleDMDataSet` class. Note ---- You can use the PyPI package `natsort <https://pypi.org/project/natsort/>`_ to sort the filenames by their numerical components, this is especially useful for filenames without leading zeros. Parameters ---------- files : List[str] List of paths to the files that should be loaded. The order is important, as it determines the order in the navigation axis. nav_shape : Tuple[int, ...] or None By default, the files are loaded as a 3D stack. You can change this by specifying the nav_shape, which reshapes the navigation dimensions. Raises a `DataSetException` if the shape is incompatible with the data that is loaded. sig_shape: Tuple[int, ...], optional Signal/detector size (height, width) sync_offset: int, optional If positive, number of frames to skip from start If negative, number of blank frames to insert at start same_offset : bool When reading a stack of dm3/dm4 files, it can be expensive to read in all the metadata from all files, which we currently only use for getting the offsets and sizes of the main data in each file. If you absolutely know that the offsets and sizes are the same for all files, you can set this parameter and we will skip reading all metadata but the one from the first file. """ def __init__(self, files=None, scan_size=None, same_offset=False, nav_shape=None, sig_shape=None, sync_offset=0, io_backend=None): super().__init__(io_backend=io_backend) self._meta = None self._same_offset = same_offset self._nav_shape = tuple(nav_shape) if nav_shape else nav_shape self._sig_shape = tuple(sig_shape) if sig_shape else sig_shape self._sync_offset = sync_offset # handle backwards-compatability: if scan_size is not None: warnings.warn( "scan_size argument is deprecated. please specify nav_shape instead", FutureWarning ) if nav_shape is not None: raise ValueError("cannot specify both scan_size and nav_shape") self._nav_shape = tuple(scan_size) self._filesize = None self._files = files if not isinstance(files, (list, tuple)): raise DataSetException("files argument must be an iterable\ of file paths, recieved {type(files)}") if len(files) == 0: raise DataSetException("need at least one file as input!") self._fileset = None # per-file cached attributes: self._z_sizes = {} self._offsets = {} def __new__(cls, *args, **kwargs): ''' Skip the superclasse's :code:`__new__()` method. Instead, go straight to the grandparent. That disables the :class:`DMDataSet` type determination magic. Otherwise unpickling will always yield a :class:`SingleDMDataSet` since this class inherits the parent's :code:`__new__()` method and unpickling calls it without parameters, making it select :class:`SingleDMDataSet`. It mimics calling the superclass :code:`__new__(cls)` without additional parameters, just like the parent's method. ''' return DataSet.__new__(cls) def _get_sig_shape_and_native_dtype(self): first_fn = self._get_files()[0] first_file = fileDM(first_fn, on_memory=True) if first_file.numObjects == 1: idx = 0 else: idx = 1 try: raw_dtype = first_file._DM2NPDataType(first_file.dataType[idx]) native_sig_shape = (first_file.ySize[idx], first_file.xSize[idx]) except IndexError as e: raise DataSetException("could not determine dtype or signal shape") from e return native_sig_shape, raw_dtype def _get_fileset(self): start_idx = 0 files = [] for fn in self._get_files(): z_size = self._z_sizes[fn] f = StackedDMFile( path=fn, start_idx=start_idx, end_idx=start_idx + z_size, sig_shape=self._meta.shape.sig, native_dtype=self._meta.raw_dtype, file_header=self._offsets[fn], ) files.append(f) start_idx += z_size return DMFileSet(files) def _get_files(self): return self._files def _get_filesize(self): return sum( os.stat(p).st_size for p in self._get_files() ) def initialize(self, executor): self._filesize = executor.run_function(self._get_filesize) if self._same_offset: metadata = executor.run_function(_get_metadata, self._get_files()[0]) self._offsets = { fn: metadata['offset'] for fn in self._get_files() } self._z_sizes = { fn: metadata['zsize'] for fn in self._get_files() } else: metadata = dict(zip( self._get_files(), executor.map(_get_metadata, self._get_files()), )) self._offsets = { fn: metadata[fn]['offset'] for fn in self._get_files() } self._z_sizes = { fn: metadata[fn]['zsize'] for fn in self._get_files() } self._image_count = sum(self._z_sizes.values()) if self._nav_shape is None: self._nav_shape = (sum(self._z_sizes.values()),) native_sig_shape, native_dtype = executor.run_function(self._get_sig_shape_and_native_dtype) if self._sig_shape is None: self._sig_shape = tuple(native_sig_shape) elif int(prod(self._sig_shape)) != int(prod(native_sig_shape)): raise DataSetException( "sig_shape must be of size: %s" % int(prod(native_sig_shape)) ) shape = self._nav_shape + self._sig_shape self._nav_shape_product = int(prod(self._nav_shape)) self._sync_offset_info = self.get_sync_offset_info() self._meta = DataSetMeta( shape=Shape(shape, sig_dims=len(self._sig_shape)), raw_dtype=native_dtype, sync_offset=self._sync_offset, image_count=self._image_count, ) self._fileset = executor.run_function(self._get_fileset) return self @classmethod def get_supported_extensions(cls): return {"dm3", "dm4"} @classmethod def get_msg_converter(cls) -> type[MessageConverter]: return StackedDMDatasetParams @classmethod def detect_params(cls, path, executor): # FIXME: this doesn't really make sense for file series # pl = path.lower() # if pl.endswith(".dm3") or pl.endswith(".dm4"): # return { # "parameters": { # "files": [path] # }, # } return False @property def dtype(self) -> "nt.DTypeLike": return self._meta.raw_dtype @property def shape(self): return self._meta.shape def check_valid(self): first_fn = self._get_files()[0] try: with fileDM(first_fn, on_memory=True): pass return True except OSError as e: raise DataSetException("invalid dataset: %s" % e) def get_partitions(self): for part_slice, start, stop in self.get_slices(): yield BasePartition( meta=self._meta, partition_slice=part_slice, fileset=self._fileset, start_frame=start, num_frames=stop - start, io_backend=self.get_io_backend(), ) def __repr__(self): return "<DMDataSet for a stack of %d files>" % (len(self._get_files()),)