import os
import warnings
import typing
from typing import Any, Optional
import logging
import ncempy.io.dm
import numpy as np
from libertem.common.math import prod
from libertem.common import Shape
from .base import BasePartition, DataSetException, DataSetMeta, File, IOBackend, DataSet
from .dm import DMDataSet, SingleDMDatasetParams, DMFileSet
log = logging.getLogger(__name__)
if typing.TYPE_CHECKING:
from numpy import typing as nt
from libertem.common.executor import JobExecutor
[docs]
class SingleDMDataSet(DMDataSet):
"""
Reader for a single DM3/DM4 file. Handles 4D-STEM, 3D-Spectrum Images,
and TEM image stacks stored in a single-file format. Where possible
the structure will be inferred from the file metadata.
.. versionadded:: 0.11.0
Note
----
Single-file DM data can be stored on disk using either normal C-ordering,
which is an option in recent versions of GMS, or an alternative F/C-hybrid
ordering depending on the imaging mode and dimensionality. The reading of
F/C-hybrid files is currently not supported for performance reasons.
The DataSet will try to infer the ordering from the file metadata and
read accordingly. If the file uses the older hybrid F/C-ordering
:code:`(flat_sig, flat_nav)` then the dataset will raise an exception
unless the `force_c_order` argument. is set to true.
A converter for F/C-hybrid files is provided as
:meth:`~libertem.contrib.convert_transposed.convert_dm4_transposed`.
Note
----
In the Web-GUI a 2D-image or 3D-stack/spectrum image will have extra
singleton navigation dimensions prepended to allow them to display.
DM files containing multiple datasets are supported via the
`dataset_index` argument.
While capable of reading 2D/3D files, LiberTEM is not particularly
well-adapted to processing these data and the user should consider
other tools. Individual spectra or vectors (1D data) are not supported.
Parameters
----------
path : PathLike
The path to the .dm3/.dm4 file
nav_shape : Tuple[int, ...], optional
Over-ride the nav_shape provided by the file metadata.
This can be used to adjust the total
number of frames.
sig_shape: Tuple[int, ...], optional
Over-ride the sig_shape provided by the file metadata.
Data are read sequentially in all cases, therefore this
is typically only interesting if the total number of
sig pixels remains constant.
sync_offset: int, optional, by default 0
If positive, number of frames to skip from start
If negative, number of blank frames to insert at start
io_backend: IOBackend, optional
A specific IOBackend implementation to over-ride the
platform default.
force_c_order: bool, optional, by default False
Force the data to be interpreted as a C-ordered
array regardless of the tag information. This will lead
to incorrect results on an hybrid C/F-ordered file.
dataset_index: int, optional
In the case of a multi-dataset DM file this can
be used to open a specific dataset index. Note that
the datasets in a DM-file often begin with a thumbnail
which occupies the 0 dataset index. If not provided the
first compatible dataset found in the file is used.
"""
def __init__(
self,
path: os.PathLike,
nav_shape: Optional[tuple[int, ...]] = None,
sig_shape: Optional[tuple[int, ...]] = None,
sync_offset: int = 0,
io_backend: Optional[IOBackend] = None,
force_c_order: bool = False,
dataset_index: Optional[int] = None
):
super().__init__(io_backend=io_backend)
self._filesize = None
self._path = path
self._nav_shape = tuple(nav_shape) if nav_shape else None
self._sig_shape = tuple(sig_shape) if sig_shape else None
self._sync_offset = sync_offset
self._force_c_order = force_c_order
self._dm_ds_index = dataset_index
def __new__(cls, *args, **kwargs):
'''
Skip the superclasse's :code:`__new__()` method.
Instead, go straight to the grandparent. That disables the
:class:`DMDataSet` type determination magic. Otherwise unpickling will
always yield a :class:`SingleDMDataSet` since this class inherits the
parent's :code:`__new__()` method and unpickling calls it without
parameters, making it select :class:`SingleDMDataSet`.
It mimics calling the superclass :code:`__new__(cls)` without additional
parameters, just like the parent's method.
'''
return DataSet.__new__(cls)
def __repr__(self):
try:
shape = f' - {self.shape}'
except AttributeError:
shape = ''
return f"<DMFileDataset {self._path}>" + shape
@property
def dtype(self) -> "nt.DTypeLike":
return self.meta.raw_dtype
@property
def shape(self):
return self.meta.shape
@classmethod
def get_supported_extensions(cls):
return {"dm3", "dm4"}
def _get_filesize(self):
return os.stat(self._path).st_size
@classmethod
def get_msg_converter(cls):
return SingleDMDatasetParams
@classmethod
def detect_params(cls, path: str, executor):
pathlow = path.lower()
if pathlow.endswith(".dm3") or pathlow.endswith(".dm4"):
array_meta = executor.run_function(cls._read_metadata, path)
sig_dims = array_meta['sig_dims']
if sig_dims == 1:
return False
sync_offset = 0
nav_shape, sig_shape = cls._modify_shape(array_meta['shape'],
sig_dims=sig_dims)
if len(nav_shape) == 1:
nav_shape = (1,) + nav_shape
image_count = prod(nav_shape)
else:
return False
return {
"parameters": {
"path": path,
"nav_shape": nav_shape,
"sig_shape": sig_shape,
"sync_offset": sync_offset,
},
"info": {
"image_count": image_count,
"native_sig_shape": sig_shape,
}
}
def check_valid(self):
try:
with ncempy.io.dm.fileDM(self._path, on_memory=True):
pass
return True
except OSError as e:
raise DataSetException("invalid dataset: %s" % e)
@classmethod
def _read_metadata(cls, path, use_ds=None):
with ncempy.io.dm.fileDM(path, on_memory=True) as fp:
tags = fp.allTags
array_map = {}
start_from = 1 if fp.thumbnail else 0
for ds_idx in range(start_from, fp.numObjects):
dims = fp.dataShape[ds_idx]
if dims < 2:
# Spectrum-only ?
continue
shape = (fp.xSize[ds_idx], fp.ySize[ds_idx])
if dims > 2:
shape = shape + (fp.zSize[ds_idx],)
if dims > 3:
shape = shape + (fp.zSize2[ds_idx],)
array_map[ds_idx] = {'shape': shape, 'ds_idx': ds_idx}
array_map[ds_idx]['offset'] = fp.dataOffset[ds_idx]
try:
array_map[ds_idx]['dtype'] = fp._DM2NPDataType(fp.dataType[ds_idx])
except OSError:
# unconvertible DM data type
array_map[ds_idx]['dtype'] = fp.dataType[ds_idx]
if not array_map:
raise DataSetException('Unable to find any 2/3/4D datasets in DM file')
if use_ds is not None:
if use_ds in array_map.keys():
ds_idx = use_ds
else:
raise DataSetException(f'Specified dataset idx {use_ds} not found in file')
else:
# Use first dataset index we loaded
ds_idx = [*array_map.keys()][0]
if len(array_map) > 1:
warnings.warn(
"Found multiple datasets in DM file, using first dataset",
RuntimeWarning
)
array_meta = array_map[ds_idx]
ndims = len(array_meta['shape'])
# Set default metadata in case tags are incomplete
array_meta['format'] = 'Unknown'
array_meta['sig_dims'] = 2
# Assume C-ordering for 2D images and 3D image stacks
# Assume F-ordering for STEM data unless tagged otherwise
# Spectrum images are also F-ordered but these data must
# be recognized from the tags (they can be 2- or 3-D)
array_meta['c_order'] = True if ndims in (2, 3) else False
# Infer array ordering
nest = cls._tags_to_nest(tags)
# Must + 1 because DM uses 1-based-indexing in its tags
dm_data_key = str(array_meta['ds_idx'] + 1)
try:
data_tags = nest['ImageList'][dm_data_key]['ImageTags']
except KeyError:
# unrecognized / invalid tag structure, return defaults
return array_meta
if 'Meta Data' in data_tags:
meta_data = data_tags['Meta Data']
array_meta['format'] = meta_data.get('Format', 'Unknown')
if str(array_meta['format']).strip().lower() == 'spectrum image':
assert ndims in (2, 3)
array_meta['sig_dims'] = 1
if ndims == 3:
# 3-D spectrum images seem to be F-ordered
# 2-D SI are seemingly C-ordered (value set above)
array_meta['c_order'] = False
if 'Data Order Swapped' in meta_data:
# Always defer to tag for ordering if available
# This line handes the new-style STEM datasets
# The bool(int()) is just-in-case for string tags
array_meta['c_order'] = bool(int(meta_data['Data Order Swapped']))
# Need to find a 3D image stack with the 'Meta Data' + 'Format' tags
if array_meta['format'] not in ('Spectrum image',
'Image',
'Diffraction image'):
warnings.warn(
f"Unrecognized image format {array_meta['format']}, "
"DM tags may be parsed incorrectly",
RuntimeWarning
)
return array_meta
@staticmethod
def _tags_to_nest(tags: dict[str, Any]):
tags_nest = {}
for tag, element in tags.items():
tag = tag.strip('.')
_insert_to = tags_nest
for tag_el in tag.split('.')[:-1]:
try:
_insert_to = _insert_to[tag_el]
except KeyError:
_insert_to[tag_el] = {}
_insert_to = _insert_to[tag_el]
_insert_to[tag.split('.')[-1]] = element
return tags_nest
@staticmethod
def _modify_shape(shape: tuple[int, ...], sig_dims: int = 2):
# The shape reversal to read in C-ordering applies to DM4/STEM files
# saved in the new style as well as DM3, 3D image stacks saved
# in older versions of GMS. Must check whether newer image stacks
# are saved in C-ordering as well (despite the metadata order)
shape = tuple(reversed(shape))
shape = tuple(map(int, shape))
nav_shape = shape[:-sig_dims]
sig_shape = shape[-sig_dims:]
if not nav_shape:
# Special case for 2D image data, LT always requires a nav dim
nav_shape = (1,)
return nav_shape, sig_shape
def initialize(self, executor: 'JobExecutor'):
self._filesize = executor.run_function(self._get_filesize)
array_meta = executor.run_function(self._read_metadata,
self._path,
use_ds=self._dm_ds_index)
sig_dims = array_meta['sig_dims']
self._array_offset = array_meta['offset']
self._raw_dtype = array_meta['dtype']
assert self._raw_dtype is not None and not isinstance(self._raw_dtype, int)
array_c_ordered = self._force_c_order or array_meta['c_order']
if not array_c_ordered:
raise DataSetException('Cannot identify DM file as C-ordered from metadata'
'use force_c_order=True to force behaviour.')
nav_shape, sig_shape = self._modify_shape(array_meta['shape'],
sig_dims=sig_dims)
# Image count is true number of frames in file (?)
self._image_count = int(prod(nav_shape))
if self._nav_shape is not None:
manual_nav_shape_product = prod(self._nav_shape)
if manual_nav_shape_product > self._image_count:
raise DataSetException('Specified nav_shape greater than file nav size')
else:
self._nav_shape = nav_shape
# nav_shape product is either manual nav_shape if supplied or metadata nav_shape (?)
self._nav_shape_product = int(prod(self._nav_shape))
sig_size = int(prod(sig_shape))
if self._sig_shape is not None:
manual_sig_size = int(prod(self._sig_shape))
if (manual_sig_size * self._nav_shape_product) > (self._image_count * sig_size):
raise DataSetException('Specified sig_shape and nav size '
'too large for data in file')
else:
self._sig_shape = sig_shape
# regardless of file order the Dataset shape property is 'standard'
shape = Shape(self._nav_shape + self._sig_shape, sig_dims=sig_dims)
self._sync_offset_info = self.get_sync_offset_info()
self._meta = DataSetMeta(
shape=shape,
raw_dtype=np.dtype(self._raw_dtype),
sync_offset=self._sync_offset,
image_count=self._image_count,
)
return self
def _get_fileset(self):
return DMFileSet([
DMFile(
path=self._path,
start_idx=0,
end_idx=self._image_count,
sig_shape=self.shape.sig,
native_dtype=self.meta.raw_dtype,
file_header=self._array_offset,
)
])
def get_partitions(self):
fileset = self._get_fileset()
for part_slice, start, stop in self.get_slices():
yield DMPartition(
meta=self.meta,
partition_slice=part_slice,
fileset=fileset,
start_frame=start,
num_frames=stop - start,
io_backend=self.get_io_backend(),
)
class DMFile(File):
...
class DMPartition(BasePartition):
...