Source code for libertem.io.dataset.raw

import os
import warnings
import numpy as np

from libertem.common.math import prod
from libertem.common import Shape
from libertem.common.messageconverter import MessageConverter
from .base import (
    DataSet, DataSetException, DataSetMeta,
    BasePartition, File, FileSet, DirectBackend, IOBackend,
)


class RAWDatasetParams(MessageConverter):
    SCHEMA = {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "$id": "http://libertem.org/RAWDatasetParams.schema.json",
        "title": "RAWDatasetParams",
        "type": "object",
        "properties": {
            "type": {"const": "RAW"},
            "path": {"type": "string"},
            "dtype": {"type": "string"},
            "nav_shape": {
                "type": "array",
                "items": {"type": "number", "minimum": 1},
                "minItems": 2,
                "maxItems": 2
            },
            "sig_shape": {
                "type": "array",
                "items": {"type": "number", "minimum": 1},
                "minItems": 2,
                "maxItems": 2
            },
            "sync_offset": {"type": "number"},
            "io_backend": {
                "enum": IOBackend.get_supported(),
            },
        },
        "required": ["type", "path", "nav_shape", "sig_shape", "dtype"]
    }

    def convert_to_python(self, raw_data):
        data = {
            k: raw_data[k]
            for k in ["path", "dtype", "nav_shape", "sig_shape"]
        }
        if "sync_offset" in raw_data:
            data["sync_offset"] = raw_data["sync_offset"]
        return data


class RawFile(File):
    pass


class RawFileSet(FileSet):
    pass


[docs]class RawFileDataSet(DataSet): """ Read raw data from a single file of raw binary data. This reader assumes the following format: * only raw data (no file header) * frames are stored in C-order without additional frame headers * dtype supported by numpy Examples -------- >>> ds = ctx.load("raw", path=path_to_raw, nav_shape=(16, 16), sig_shape=(128, 128), ... sync_offset=0, dtype="float32",) Parameters ---------- path: str Path to the file nav_shape: tuple of int A n-tuple that specifies the size of the navigation region ((y, x), but can also be of length 1 for example for a line scan, or length 3 for a data cube, for example) sig_shape: tuple of int Common case: (height, width); but can be any dimensionality sync_offset: int, optional If positive, number of frames to skip from start If negative, number of blank frames to insert at start dtype: numpy dtype The dtype of the data as it is on disk. Can contain endian indicator, for example >u2 for big-endian 16bit data. """ def __init__(self, path, dtype, scan_size=None, detector_size=None, enable_direct=False, detector_size_raw=None, crop_detector_to=None, tileshape=None, nav_shape=None, sig_shape=None, sync_offset=0, io_backend=None): if enable_direct and io_backend is not None: raise ValueError("can't specify io_backend and enable_direct at the same time") if enable_direct: warnings.warn( "enable_direct is deprecated; pass " "`io_backend=DirectBackend()` instead", FutureWarning ) io_backend = DirectBackend() super().__init__(io_backend=io_backend) # handle backwards-compatability: if tileshape is not None: warnings.warn( "tileshape argument is ignored and will be removed after 0.6.0", FutureWarning ) # FIXME execute deprecation after 0.6.0 if crop_detector_to is not None: warnings.warn("crop_detector_to and detector_size_raw are deprecated, " "and will be removed after version 0.6.0. " "please specify sig_shape instead or use a more " "specific DataSet like EMPAD", FutureWarning) if detector_size is not None: raise ValueError("cannot specify both detector_size and crop_detector_to") if detector_size_raw != crop_detector_to: raise ValueError("RawFileDataSet can't crop detector anymore, " "please use EMPAD DataSet") detector_size = crop_detector_to self._nav_shape = tuple(nav_shape) if nav_shape else nav_shape self._sig_shape = tuple(sig_shape) if sig_shape else sig_shape self._sync_offset = sync_offset # handle backwards-compatability: if scan_size is not None: warnings.warn( "scan_size argument is deprecated. please specify nav_shape instead", FutureWarning ) if nav_shape is not None: raise ValueError("cannot specify both scan_size and nav_shape") self._nav_shape = scan_size if detector_size is not None: warnings.warn( "detector_size argument is deprecated. please specify sig_shape instead", FutureWarning ) if sig_shape is not None: raise ValueError("cannot specify both detector_size and sig_shape") self._sig_shape = detector_size if self._nav_shape is None: raise TypeError("missing 1 required argument: 'nav_shape'") if self._sig_shape is None: raise TypeError("missing 1 required argument: 'sig_shape'") self._path = path self._sig_dims = len(self._sig_shape) self._dtype = dtype self._filesize = None def initialize(self, executor): self._filesize = executor.run_function(self._get_filesize) if int(prod(self._sig_shape)) > int(self._filesize / np.dtype(self._dtype).itemsize): raise DataSetException( "sig_shape must be less than size: %s" % ( int(self._filesize / np.dtype(self._dtype).itemsize) ) ) self._image_count = int( self._filesize / ( np.dtype(self._dtype).itemsize * prod(self._sig_shape) ) ) self._nav_shape_product = int(prod(self._nav_shape)) self._sync_offset_info = self.get_sync_offset_info() shape = Shape(self._nav_shape + self._sig_shape, sig_dims=self._sig_dims) self._meta = DataSetMeta( shape=shape, raw_dtype=np.dtype(self._dtype), sync_offset=self._sync_offset, image_count=self._image_count, ) return self def get_diagnostics(self): return [ {"name": "dtype", "value": str(self._meta.raw_dtype)}, ] def _get_filesize(self): return os.stat(self._path).st_size @property def dtype(self): return self._meta.raw_dtype @property def shape(self): return self._meta.shape @classmethod def get_msg_converter(cls): return RAWDatasetParams def _get_fileset(self): return RawFileSet([ RawFile( path=self._path, start_idx=0, end_idx=self._image_count, sig_shape=self.shape.sig, native_dtype=self._meta.raw_dtype, ) ]) def check_valid(self): try: fileset = self._get_fileset() backend = self.get_io_backend().get_impl() with backend.open_files(fileset): return True except (OSError, ValueError) as e: raise DataSetException("invalid dataset: %s" % e) def get_cache_key(self): return { "path": self._path, # nav_shape + sig_shape; included because changing nav_shape will change # the partition structure and cause errors "shape": tuple(self.shape), "dtype": str(self.dtype), "sync_offset": self._sync_offset, } def get_partitions(self): fileset = self._get_fileset() for part_slice, start, stop in self.get_slices(): yield RawPartition( meta=self._meta, fileset=fileset, partition_slice=part_slice, start_frame=start, num_frames=stop - start, io_backend=self.get_io_backend(), ) def __repr__(self): return f"<RawFileDataSet of {self.dtype} shape={self.shape}>"
class RawPartition(BasePartition): pass