Source code for libertem.io.dataset.seq

# Based on the SEQ reader of the PIMS project, with the following license:
#
#    Copyright (c) 2013-2014 PIMS contributors
#    https://github.com/soft-matter/pims
#    All rights reserved
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the soft-matter organization nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
import struct
import warnings
from typing import Tuple

import defusedxml.ElementTree as ET
import numpy as np
import sparse
from ncempy.io.mrc import mrcReader

from libertem.common.math import prod
from libertem.common import Shape
from libertem.common.messageconverter import MessageConverter
from libertem.io.dataset.base import (
    FileSet, DataSet, BasePartition, DataSetMeta, DataSetException,
    File, IOBackend,
)
from libertem.io.corrections import CorrectionSet

DWORD = 'L'
LONG = 'l'
DOUBLE = 'd'
USHORT = 'H'

HEADER_FIELDS = [
    ('magic', DWORD),
    ('name', '24s'),
    ('version', LONG),
    ('header_size', LONG),
    ('description', '512s'),
    ('width', DWORD),
    ('height', DWORD),
    ('bit_depth', DWORD),
    ('bit_depth_real', DWORD),
    ('image_size_bytes', DWORD),
    ('image_format', DWORD),
    ('allocated_frames', DWORD),
    ('origin', DWORD),
    ('true_image_size', DWORD),
    ('suggested_frame_rate', DOUBLE),
    ('description_format', LONG),
    ('reference_frame', DWORD),
    ('fixed_size', DWORD),
    ('flags', DWORD),
    ('bayer_pattern', LONG),
    ('time_offset_us', LONG),
    ('extended_header_size', LONG),
    ('compression_format', DWORD),
    ('reference_time_s', LONG),
    ('reference_time_ms', USHORT),
    ('reference_time_us', USHORT)
    # More header values not implemented
]

HEADER_SIZE = sum(
    struct.Struct('<' + field[1]).size
    for field in HEADER_FIELDS
)


def _read_header(path, fields):
    with open(path, "rb") as fh:
        fh.seek(0)
        return _unpack_header(fh.read(HEADER_SIZE), fields)


def _unpack_header(header_bytes, fields):
    str_fields = {"name", "description"}
    tmp = dict()
    pos = 0
    for name, fmt in fields:
        header_part = header_bytes[pos:]
        val, size = _unpack_header_field(header_part, fmt)
        if name in str_fields:
            val = _decode_str(val)
        tmp[name] = val
        pos += size

    return tmp


def _unpack_header_field(header_part, fs):
    s = struct.Struct('<' + fs)
    vals = s.unpack(header_part[:s.size])
    if len(vals) == 1:
        return vals[0], s.size
    else:
        return vals, s.size


def _decode_str(str_in):
    end_pos = len(str_in)
    end_mark = b'\x00\x00'
    if end_mark in str_in:
        end_pos = str_in.index(end_mark) + 1
    return str_in[:end_pos].decode("utf16")


def _get_image_offset(header):
    if header['version'] >= 5:
        return 8192
    else:
        return 1024


def xml_map_sizes(bad_pixel_maps):
    """
    returns the sizes and binning values of bad_pixel_maps

    Parameters
    ----------
    bad_pixel_maps: node
        all of the bad pixel map nodes from the root
    """
    map_sizes = []

    for size_map in bad_pixel_maps:
        if "Binning" in size_map.attrib:
            map_sizes.append((int(size_map.attrib['Columns']), int(size_map.attrib['Rows']),
                              int(size_map.attrib["Binning"])))
        else:
            map_sizes.append((int(size_map.attrib['Columns']), int(size_map.attrib['Rows']),
                              int(size_map.attrib.get("Binning", 1))))
    map_rearrange = zip(*map_sizes)
    xy_map_sizes = list(map_rearrange)
    return xy_map_sizes, map_sizes


def xml_unbinned_map_maker(xy_map_sizes):
    """
    returns two list of unbinned sizes(rows and cols) if the size was binned
    return 0 in the list in its place
    Parameters
    ----------
    xy_map_sizes: list of tuple
        bad pixel maps sizes separated by row and col
    """
    unbinned_x = []
    unbinned_y = []
    for map_ind in range(0, len(xy_map_sizes[0])):
        if xy_map_sizes[2][map_ind] < 2:
            unbinned_y.append(xy_map_sizes[0][map_ind])
            unbinned_x.append(xy_map_sizes[1][map_ind])
        else:
            unbinned_y.append(0)
            unbinned_x.append(0)
    return unbinned_x, unbinned_y


def xml_binned_map_maker(xy_map_sizes):
    """
    returns two list of binned sizes(rows and cols) if the size was not binned
    return 0 in the list in its place
    Parameters
    ----------
    xy_map_sizes: list of tuple
        bad pixel maps sizes separated by row and col
    """
    unbinned_x = []
    unbinned_y = []
    for map_ind in range(0, len(xy_map_sizes[0])):
        if xy_map_sizes[2][map_ind] > 1:
            unbinned_y.append(xy_map_sizes[0][map_ind])
            unbinned_x.append(xy_map_sizes[1][map_ind])
        else:
            unbinned_y.append(0)
            unbinned_x.append(0)
    return unbinned_x, unbinned_y


def xml_map_index_selector(used_y):
    """
    returns the bad pixel maps  index with the largest column count

    Parameters
    ----------
    used_y: list
        list of either binned or unbinned y coordinates
    """
    max_y = max(used_y)

    map_index = used_y.index(max_y)
    return map_index


def xml_defect_coord_extractor(bad_pixel_map, map_index, map_sizes):
    """
    returns the chosen bad pixel map's defects

    Parameters
    ----------
    bad_pixel_map: node
        the xml file's root node
    map_index: int
        the index of the correct bad pixel map
    map_sizes: list of tuples
    """
    excluded_rows = []
    excluded_cols = []
    excluded_pixels = []
    for defect in bad_pixel_map.findall('Defect'):
        if len(defect.attrib) == 1:
            defect_attrib_key = defect.attrib.keys()
            if "Rows" in defect_attrib_key:
                split = defect.attrib["Rows"].split('-')
                excluded_rows.append(split)
            if "Row" in defect_attrib_key:
                excluded_rows.append([defect.attrib["Row"]])

            if "Columns" in defect_attrib_key:
                split = defect.attrib["Columns"].split('-')
                excluded_cols.append(split)
            if "Column" in defect_attrib_key:
                excluded_cols.append([defect.attrib["Column"]])
        else:
            excluded_pixels.append([defect.attrib["Column"], defect.attrib["Row"]])

    return {"rows": excluded_rows,
            "cols": excluded_cols,
            "pixels": excluded_pixels,
            "size": (map_sizes[map_index][0], map_sizes[map_index][1])
            }


def xml_defect_data_extractor(root, metadata):
    """
    Parameters
    ----------
    root: node
        the xml file's root node
    metadata: dictionary
        the content of the metadata file as a dictionary
    """
    bad_pixel_maps = root.findall('.//BadPixelMap')
    xy_size, map_sizes = xml_map_sizes(bad_pixel_maps)
    if (metadata['HardwareBinning']) < 2:
        used_x, used_y = xml_unbinned_map_maker(xy_map_sizes=xy_size)
    else:
        used_x, used_y = xml_binned_map_maker(xy_map_sizes=xy_size)
    map_index = xml_map_index_selector(used_y)
    defect_dict = xml_defect_coord_extractor(bad_pixel_maps[map_index], map_index, map_sizes)
    return defect_dict


def bin_array2d(a, binning):
    """
    Parameters
    ----------
    a: np array
    binning: int
        the times we want to bin the array
    """
    sx, sy = a.shape
    sxc = sx // binning * binning
    syc = sy // binning * binning
    # crop:
    ac = a[:sxc, :syc]
    return ac.reshape(ac.shape[0] // binning, binning,
                      ac.shape[1] // binning, binning).sum(3).sum(1)


def array_cropping(arr, start_size, req_size, offsets):
    """
    returns a crop from the original array
    Parameters
    ----------
    arr: np array
        the array we will make the changes on
    start_size: tuple
        the size of the original array
    req_size: tuple
        the size we want to crop to
    offsets: tuple
        the top left coord of the crop
    """
    ac = arr
    if offsets[0] + req_size[0] <= start_size[0] and offsets[1] + req_size[1] <= start_size[1]:
        req_y = int(req_size[0]) // 2
        req_x = int(req_size[1]) // 2
        a = int(offsets[0]) + req_y
        b = int(offsets[1]) + req_x
        ac = ac[(a - req_y):(a + req_y), (b - req_x):(b + req_x)]
    return ac


def xml_generate_map_size(exc_rows, exc_cols, exc_pix, size, metadata):
    """
    This function will be responsible for generating new size based on the parameters.
    returns an np array with the excluded pixels as True

    Parameters
    ----------
    exc_rows: list of lists
        list with the excluded rows as lists which length is 1 if its a single row
        and two if its an interval of rows
    exc_cols: list of lists
    exc_pix: list of lists
    size: tuple
        selected bad pixel maps size
    metadata: dictionary
        the metadata where we specify the image's parameters
    """
    required_size = (metadata["UnbinnedFrameSizeY"], metadata["UnbinnedFrameSizeX"])
    offsets = (metadata["OffsetY"], metadata["OffsetX"])
    bin_value = metadata["HardwareBinning"]
    if bin_value > 1:
        required_size = (required_size[0]//2, required_size[1]//2)
        offsets = (offsets[0]//2, offsets[1]//2)
    dummy_m = np.zeros(size, dtype=bool)
    for row in exc_rows:

        if len(row) == 1:
            dummy_m[int(row[0])] = 1
        else:
            dummy_m[int(row[0]):int(row[1]) + 1] = 1

    for col in exc_cols:

        if len(col) == 1:
            dummy_m[:, int(col[0])] = 1
        else:
            dummy_m[:, int(col[0]):int(col[1]) + 1] = 1

    for pix in exc_pix:
        dummy_m[int(pix[1]), int(pix[0])] = 1

    cropped = array_cropping(dummy_m, start_size=size, req_size=required_size, offsets=offsets)
    return np.array(cropped, dtype=bool)


def xml_processing(tree, metadata_dict):
    data_dict = xml_defect_data_extractor(tree, metadata_dict)
    coord = xml_generate_map_size(data_dict["rows"], data_dict["cols"], data_dict["pixels"],
                                  data_dict["size"], metadata_dict)
    return sparse.COO(coord)


def _load_xml_from_string(xml, metadata):
    tree = ET.fromstring(xml)
    return xml_processing(tree, metadata)


class SEQDatasetParams(MessageConverter):
    SCHEMA = {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "$id": "http://libertem.org/SEQDatasetParams.schema.json",
        "title": "SEQDatasetParams",
        "type": "object",
        "properties": {
            "type": {"const": "SEQ"},
            "path": {"type": "string"},
            "nav_shape": {
                "type": "array",
                "items": {"type": "number", "minimum": 1},
                "minItems": 2,
                "maxItems": 2
            },
            "sig_shape": {
                "type": "array",
                "items": {"type": "number", "minimum": 1},
                "minItems": 2,
                "maxItems": 2
            },
            "sync_offset": {"type": "number"},
            "io_backend": {
                "enum": IOBackend.get_supported(),
            }
        },
        "required": ["type", "path", "nav_shape"],
    }

    def convert_to_python(self, raw_data):
        data = {
            k: raw_data[k]
            for k in ["path", "nav_shape"]
        }
        if "sig_shape" in raw_data:
            data["sig_shape"] = tuple(raw_data["sig_shape"])
        if "sync_offset" in raw_data:
            data["sync_offset"] = raw_data["sync_offset"]
        return data


class SEQFileSet(FileSet):
    pass


[docs]class SEQDataSet(DataSet): """ Read data from Norpix SEQ files. Examples -------- >>> ds = ctx.load("seq", path="/path/to/file.seq", nav_shape=(1024, 1024)) # doctest: +SKIP Parameters ---------- path Path to the .seq file nav_shape: tuple of int A n-tuple that specifies the size of the navigation region ((y, x), but can also be of length 1 for example for a line scan, or length 3 for a data cube, for example) sig_shape: tuple of int, optional Signal/detector size (height, width) sync_offset: int, optional If positive, number of frames to skip from start. If negative, number of blank frames to insert at start. Note ---- Dark and gain reference are loaded from MRC files with the same root as the SEQ file and the extensions :code:`.dark.mrc` and :code:`.gain.mrc`, i.e. :code:`/path/to/file.dark.mrc` and :code:`/path/to/file.gain.mrc` if they are present. .. versionadded:: 0.8.0 Dead pixels are read from an XML file with the same root as the SEQ file and the extension :code:`.Config.Metadata.xml`, i.e. :code:`/path/to/file.Config.Metadata.xml` in the above example if both this file and :code:`/path/to/file.metadata` are present. See :ref:`corrections` for more information on how to change or disable corrections. FIXME find public documentation of the XML format and dark/gain maps. """ def __init__(self, path: str, scan_size: Tuple[int] = None, nav_shape: Tuple[int] = None, sig_shape: Tuple[int] = None, sync_offset: int = 0, io_backend=None): super().__init__(io_backend=io_backend) self._path = path # There might be '.seq.seq' and '.seq' in the wild # See https://github.com/LiberTEM/LiberTEM/issues/1120 # We first try if '.seq.seq' matches, then '.seq' name, ext = os.path.splitext(path) name2, ext2 = os.path.splitext(name) if ext.lower() == '.seq' and ext2.lower() == '.seq': self._basename = name2 elif ext.lower() == '.seq': self._basename = name else: self._basename = path self._nav_shape = tuple(nav_shape) if nav_shape else nav_shape self._sig_shape = tuple(sig_shape) if sig_shape else sig_shape self._sync_offset = sync_offset # handle backwards-compatability: if scan_size is not None: warnings.warn( "scan_size argument is deprecated. please specify nav_shape instead", FutureWarning ) if nav_shape is not None: raise ValueError("cannot specify both scan_size and nav_shape") self._nav_shape = tuple(scan_size) if self._nav_shape is None: raise TypeError("missing 1 required argument: 'nav_shape'") self._header = None self._meta = None self._footer_size = None self._filesize = None self._dark = None self._gain = None self._excluded_pixels = None def _do_initialize(self): header = self._header = _read_header(self._path, HEADER_FIELDS) self._image_offset = _get_image_offset(header) if header['version'] >= 5: # StreamPix version 6 # Timestamp = 4-byte unsigned long + 2-byte unsigned short (ms) # + 2-byte unsigned short (us) self._timestamp_micro = True else: # Older versions self._timestamp_micro = False try: dtype = np.dtype('uint%i' % header['bit_depth']) except TypeError: raise DataSetException("unsupported bit depth: %s" % header['bit_depth']) frame_size_bytes = header['width'] * header['height'] * dtype.itemsize self._footer_size = header['true_image_size'] - frame_size_bytes self._filesize = os.stat(self._path).st_size self._image_count = int((self._filesize - self._image_offset) / header['true_image_size']) if self._sig_shape is None: self._sig_shape = tuple((header['height'], header['width'])) elif int(prod(self._sig_shape)) != (header['height'] * header['width']): raise DataSetException( "sig_shape must be of size: %s" % (header['height'] * header['width']) ) self._nav_shape_product = int(prod(self._nav_shape)) self._sync_offset_info = self.get_sync_offset_info() shape = Shape(self._nav_shape + self._sig_shape, sig_dims=len(self._sig_shape)) self._meta = DataSetMeta( shape=shape, raw_dtype=dtype, dtype=dtype, metadata=header, sync_offset=self._sync_offset, image_count=self._image_count, ) self._maybe_load_dark_gain() return self def _maybe_load_mrc(self, path): if not os.path.exists(path): return None data_dict = mrcReader(path) return np.squeeze(data_dict['data']) def _load_xml_from_file(self): xml_path = self._basename + ".seq.Config.Metadata.xml" meta_path = self._basename + ".seq.metadata" if not os.path.exists(xml_path): return None if not os.path.exists(meta_path): return None else: tree = ET.parse(xml_path) root = tree.getroot() with open(meta_path, mode="rb") as file: met = file.read() metdata_keys = ['DEMetadataSize', 'DEMetadataVersion', 'UnbinnedFrameSizeX', 'UnbinnedFrameSizeY', 'OffsetX', 'OffsetY', 'HardwareBinning', 'Bitmode', 'FrameRate', 'RotationMode', 'FlipMode', 'OkraMode'] metadata = dict(zip(metdata_keys, struct.unpack_from('iiiiiiiiiii?', met, 282))) return xml_processing(root, metadata) def _maybe_load_dark_gain(self): self._dark = self._maybe_load_mrc(self._basename + ".seq.dark.mrc") self._gain = self._maybe_load_mrc(self._basename + ".seq.gain.mrc") self._excluded_pixels = self._load_xml_from_file() def get_correction_data(self): return CorrectionSet( dark=self._dark, gain=self._gain, excluded_pixels=self._excluded_pixels, ) def initialize(self, executor): return executor.run_function(self._do_initialize) def get_diagnostics(self): header = self._header return [ {"name": k, "value": str(v)} for k, v in header.items() ] + [ {"name": "Footer size", "value": str(self._footer_size)}, {"name": "Dark frame included", "value": str(self._dark is not None)}, {"name": "Gain map included", "value": str(self._gain is not None)}, ] @property def meta(self): return self._meta @classmethod def get_msg_converter(cls): return SEQDatasetParams @classmethod def _do_detect_params(cls, path): try: header = _read_header(path, HEADER_FIELDS) if header['magic'] != 0xFEED: return False image_offset = _get_image_offset(header) filesize = os.stat(path).st_size image_count = int( (filesize - image_offset) / header['true_image_size'] ) sig_shape = tuple((header['height'], header['width'])) return { "parameters": { "path": path, "nav_shape": tuple((image_count,)), "sig_shape": sig_shape, }, "info": { "image_count": image_count, "native_sig_shape": sig_shape, } } except (OSError, UnicodeDecodeError): return False @classmethod def detect_params(cls, path, executor): try: return executor.run_function(cls._do_detect_params, path) except Exception as e: raise DataSetException(repr(e)) from e @classmethod def get_supported_extensions(cls): return {"seq"} @property def dtype(self): return self._meta.raw_dtype @property def shape(self): return self._meta.shape def _get_fileset(self): return SEQFileSet(files=[ File( path=self._path, start_idx=0, end_idx=self._image_count, native_dtype=self._meta.raw_dtype, sig_shape=self._meta.shape.sig, frame_footer=self._footer_size, frame_header=0, file_header=self._image_offset, ) ], frame_footer_bytes=self._footer_size) def check_valid(self): if self._header['magic'] != 0xFEED: raise DataSetException('The format of this .seq file is unrecognized') if self._header['compression_format'] != 0: raise DataSetException('Only uncompressed images are supported in .seq files') if self._header['image_format'] != 100: raise DataSetException('Non-monochrome images are not supported') def get_cache_key(self): return { "path": self._path, "shape": self.shape, "sync_offset": self._sync_offset, } def get_partitions(self): fileset = self._get_fileset() for part_slice, start, stop in self.get_slices(): yield BasePartition( meta=self._meta, fileset=fileset, partition_slice=part_slice, start_frame=start, num_frames=stop - start, io_backend=self.get_io_backend(), ) def __repr__(self): return f"<SEQDataSet of {self.dtype} shape={self.shape}>"