Source code for

import os
import warnings
import typing
from typing import Dict, Any, Tuple, Optional
import logging

import numpy as np

from libertem.common.math import prod
from libertem.common import Shape
from .base import BasePartition, DataSetException, DataSetMeta, File, IOBackend, DataSet
from .dm import DMDataSet, SingleDMDatasetParams, DMFileSet

log = logging.getLogger(__name__)

if typing.TYPE_CHECKING:
    from numpy import typing as nt
    from libertem.common.executor import JobExecutor

[docs]class SingleDMDataSet(DMDataSet): """ Reader for a single DM3/DM4 file. Handles 4D-STEM, 3D-Spectrum Images, and TEM image stacks stored in a single-file format. Where possible the structure will be inferred from the file metadata. .. versionadded:: 0.11.0 Note ---- Single-file DM data can be stored on disk using either normal C-ordering, which is an option in recent versions of GMS, or an alternative F/C-hybrid ordering depending on the imaging mode and dimensionality. The reading of F/C-hybrid files is currently not supported for performance reasons. The DataSet will try to infer the ordering from the file metadata and read accordingly. If the file uses the older hybrid F/C-ordering :code:`(flat_sig, flat_nav)` then the dataset will raise an exception unless the `force_c_order` argument. is set to true. A converter for F/C-hybrid files is provided as :meth:`~libertem.contrib.convert_transposed.convert_dm4_transposed`. Note ---- In the Web-GUI a 2D-image or 3D-stack/spectrum image will have extra singleton navigation dimensions prepended to allow them to display. DM files containing multiple datasets are supported via the `dataset_index` argument. While capable of reading 2D/3D files, LiberTEM is not particularly well-adapted to processing these data and the user should consider other tools. Individual spectra or vectors (1D data) are not supported. Parameters ---------- path : PathLike The path to the .dm3/.dm4 file nav_shape : Tuple[int, ...], optional Over-ride the nav_shape provided by the file metadata. This can be used to adjust the total number of frames. sig_shape: Tuple[int, ...], optional Over-ride the sig_shape provided by the file metadata. Data are read sequentially in all cases, therefore this is typically only interesting if the total number of sig pixels remains constant. sync_offset: int, optional, by default 0 If positive, number of frames to skip from start If negative, number of blank frames to insert at start io_backend: IOBackend, optional A specific IOBackend implementation to over-ride the platform default. force_c_order: bool, optional, by default False Force the data to be interpreted as a C-ordered array regardless of the tag information. This will lead to incorrect results on an hybrid C/F-ordered file. dataset_index: int, optional In the case of a multi-dataset DM file this can be used to open a specific dataset index. Note that the datasets in a DM-file often begin with a thumbnail which occupies the 0 dataset index. If not provided the first compatible dataset found in the file is used. """ def __init__( self, path: os.PathLike, nav_shape: Optional[Tuple[int, ...]] = None, sig_shape: Optional[Tuple[int, ...]] = None, sync_offset: int = 0, io_backend: Optional[IOBackend] = None, force_c_order: bool = False, dataset_index: Optional[int] = None ): super().__init__(io_backend=io_backend) self._filesize = None self._path = path self._nav_shape = tuple(nav_shape) if nav_shape else None self._sig_shape = tuple(sig_shape) if sig_shape else None self._sync_offset = sync_offset self._force_c_order = force_c_order self._dm_ds_index = dataset_index def __new__(cls, *args, **kwargs): ''' Skip the superclasse's :code:`__new__()` method. Instead, go straight to the grandparent. That disables the :class:`DMDataSet` type determination magic. Otherwise unpickling will always yield a :class:`SingleDMDataSet` since this class inherits the parent's :code:`__new__()` method and unpickling calls it without parameters, making it select :class:`SingleDMDataSet`. It mimics calling the superclass :code:`__new__(cls)` without additional parameters, just like the parent's method. ''' return DataSet.__new__(cls) def __repr__(self): try: shape = f' - {self.shape}' except AttributeError: shape = '' return f"<DMFileDataset {self._path}>" + shape @property def dtype(self) -> "nt.DTypeLike": return self.meta.raw_dtype @property def shape(self): return self.meta.shape @classmethod def get_supported_extensions(cls): return {"dm3", "dm4"} def _get_filesize(self): return os.stat(self._path).st_size @classmethod def get_msg_converter(cls): return SingleDMDatasetParams @classmethod def detect_params(cls, path: str, executor): pathlow = path.lower() if pathlow.endswith(".dm3") or pathlow.endswith(".dm4"): array_meta = executor.run_function(cls._read_metadata, path) sig_dims = array_meta['sig_dims'] if sig_dims == 1: return False sync_offset = 0 nav_shape, sig_shape = cls._modify_shape(array_meta['shape'], sig_dims=sig_dims) if len(nav_shape) == 1: nav_shape = (1,) + nav_shape image_count = prod(nav_shape) else: return False return { "parameters": { "path": path, "nav_shape": nav_shape, "sig_shape": sig_shape, "sync_offset": sync_offset, }, "info": { "image_count": image_count, "native_sig_shape": sig_shape, } } def check_valid(self): try: with, on_memory=True): pass return True except OSError as e: raise DataSetException("invalid dataset: %s" % e) @classmethod def _read_metadata(cls, path, use_ds=None): with, on_memory=True) as fp: tags = fp.allTags array_map = {} start_from = 1 if fp.thumbnail else 0 for ds_idx in range(start_from, fp.numObjects): dims = fp.dataShape[ds_idx] if dims < 2: # Spectrum-only ? continue shape = (fp.xSize[ds_idx], fp.ySize[ds_idx]) if dims > 2: shape = shape + (fp.zSize[ds_idx],) if dims > 3: shape = shape + (fp.zSize2[ds_idx],) array_map[ds_idx] = {'shape': shape, 'ds_idx': ds_idx} array_map[ds_idx]['offset'] = fp.dataOffset[ds_idx] try: array_map[ds_idx]['dtype'] = fp._DM2NPDataType(fp.dataType[ds_idx]) except OSError: # unconvertible DM data type array_map[ds_idx]['dtype'] = fp.dataType[ds_idx] if not array_map: raise DataSetException('Unable to find any 2/3/4D datasets in DM file') if use_ds is not None: if use_ds in array_map.keys(): ds_idx = use_ds else: raise DataSetException(f'Specified dataset idx {use_ds} not found in file') else: # Use first dataset index we loaded ds_idx = [*array_map.keys()][0] if len(array_map) > 1: warnings.warn( "Found multiple datasets in DM file, using first dataset", RuntimeWarning ) array_meta = array_map[ds_idx] ndims = len(array_meta['shape']) # Set default metadata in case tags are incomplete array_meta['format'] = 'Unknown' array_meta['sig_dims'] = 2 # Assume C-ordering for 2D images and 3D image stacks # Assume F-ordering for STEM data unless tagged otherwise # Spectrum images are also F-ordered but these data must # be recognized from the tags (they can be 2- or 3-D) array_meta['c_order'] = True if ndims in (2, 3) else False # Infer array ordering nest = cls._tags_to_nest(tags) # Must + 1 because DM uses 1-based-indexing in its tags dm_data_key = str(array_meta['ds_idx'] + 1) try: data_tags = nest['ImageList'][dm_data_key]['ImageTags'] except KeyError: # unrecognized / invalid tag structure, return defaults return array_meta if 'Meta Data' in data_tags: meta_data = data_tags['Meta Data'] array_meta['format'] = meta_data.get('Format', 'Unknown') if str(array_meta['format']).strip().lower() == 'spectrum image': assert ndims in (2, 3) array_meta['sig_dims'] = 1 if ndims == 3: # 3-D spectrum images seem to be F-ordered # 2-D SI are seemingly C-ordered (value set above) array_meta['c_order'] = False if 'Data Order Swapped' in meta_data: # Always defer to tag for ordering if available # This line handes the new-style STEM datasets # The bool(int()) is just-in-case for string tags array_meta['c_order'] = bool(int(meta_data['Data Order Swapped'])) # Need to find a 3D image stack with the 'Meta Data' + 'Format' tags if array_meta['format'] not in ('Spectrum image', 'Image', 'Diffraction image'): warnings.warn( f"Unrecognized image format {array_meta['format']}, " "DM tags may be parsed incorrectly", RuntimeWarning ) return array_meta @staticmethod def _tags_to_nest(tags: Dict[str, Any]): tags_nest = {} for tag, element in tags.items(): tag = tag.strip('.') _insert_to = tags_nest for tag_el in tag.split('.')[:-1]: try: _insert_to = _insert_to[tag_el] except KeyError: _insert_to[tag_el] = {} _insert_to = _insert_to[tag_el] _insert_to[tag.split('.')[-1]] = element return tags_nest @staticmethod def _modify_shape(shape: Tuple[int, ...], sig_dims: int = 2): # The shape reversal to read in C-ordering applies to DM4/STEM files # saved in the new style as well as DM3, 3D image stacks saved # in older versions of GMS. Must check whether newer image stacks # are saved in C-ordering as well (despite the metadata order) shape = tuple(reversed(shape)) shape = tuple(map(int, shape)) nav_shape = shape[:-sig_dims] sig_shape = shape[-sig_dims:] if not nav_shape: # Special case for 2D image data, LT always requires a nav dim nav_shape = (1,) return nav_shape, sig_shape def initialize(self, executor: 'JobExecutor'): self._filesize = executor.run_function(self._get_filesize) array_meta = executor.run_function(self._read_metadata, self._path, use_ds=self._dm_ds_index) sig_dims = array_meta['sig_dims'] self._array_offset = array_meta['offset'] self._raw_dtype = array_meta['dtype'] assert self._raw_dtype is not None and not isinstance(self._raw_dtype, int) array_c_ordered = self._force_c_order or array_meta['c_order'] if not array_c_ordered: raise DataSetException('Cannot identify DM file as C-ordered from metadata' 'use force_c_order=True to force behaviour.') nav_shape, sig_shape = self._modify_shape(array_meta['shape'], sig_dims=sig_dims) # Image count is true number of frames in file (?) self._image_count = int(prod(nav_shape)) if self._nav_shape is not None: manual_nav_shape_product = prod(self._nav_shape) if manual_nav_shape_product > self._image_count: raise DataSetException('Specified nav_shape greater than file nav size') else: self._nav_shape = nav_shape # nav_shape product is either manual nav_shape if supplied or metadata nav_shape (?) self._nav_shape_product = int(prod(self._nav_shape)) sig_size = int(prod(sig_shape)) if self._sig_shape is not None: manual_sig_size = int(prod(self._sig_shape)) if (manual_sig_size * self._nav_shape_product) > (self._image_count * sig_size): raise DataSetException('Specified sig_shape and nav size ' 'too large for data in file') else: self._sig_shape = sig_shape # regardless of file order the Dataset shape property is 'standard' shape = Shape(self._nav_shape + self._sig_shape, sig_dims=sig_dims) self._sync_offset_info = self.get_sync_offset_info() self._meta = DataSetMeta( shape=shape, raw_dtype=np.dtype(self._raw_dtype), sync_offset=self._sync_offset, image_count=self._image_count, ) return self def _get_fileset(self): return DMFileSet([ DMFile( path=self._path, start_idx=0, end_idx=self._image_count, sig_shape=self.shape.sig, native_dtype=self.meta.raw_dtype, file_header=self._array_offset, ) ]) def get_partitions(self): fileset = self._get_fileset() for part_slice, start, stop in self.get_slices(): yield DMPartition( meta=self.meta, partition_slice=part_slice, fileset=fileset, start_frame=start, num_frames=stop - start, io_backend=self.get_io_backend(), )
class DMFile(File): ... class DMPartition(BasePartition): ...