import re
import os
import platform
from glob import glob, escape
import logging
from typing import TYPE_CHECKING, Optional, Union
from collections.abc import Generator, Sequence
from typing_extensions import Literal, TypedDict
import warnings
from concurrent.futures import ThreadPoolExecutor
from numba.typed import List as NumbaList
import numba
import psutil
import numpy as np
from libertem.common.math import prod, make_2D_square, flat_nonzero
from libertem.common import Shape
from libertem.io.dataset.base.file import OffsetsSizes
from libertem.common.messageconverter import MessageConverter
from .base import (
DataSet, DataSetException, DataSetMeta,
BasePartition, FileSet, File, make_get_read_ranges,
Decoder, TilingScheme, default_get_read_ranges,
DtypeConversionDecoder, IOBackend,
)
log = logging.getLogger(__name__)
if TYPE_CHECKING:
import numpy.typing as nt
class MIBDatasetParams(MessageConverter):
SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://libertem.org/MIBDatasetParams.schema.json",
"title": "MIBDatasetParams",
"type": "object",
"properties": {
"type": {"const": "MIB"},
"path": {"type": "string"},
"nav_shape": {
"type": "array",
"items": {"type": "number", "minimum": 1},
"minItems": 2,
"maxItems": 2
},
"sig_shape": {
"type": "array",
"items": {"type": "number", "minimum": 1},
"minItems": 2,
"maxItems": 2
},
"sync_offset": {"type": "number"},
"io_backend": {
"enum": IOBackend.get_supported(),
},
},
"required": ["type", "path"],
}
def convert_to_python(self, raw_data):
data = {
k: raw_data[k]
for k in ["path"]
}
if "nav_shape" in raw_data:
data["nav_shape"] = tuple(raw_data["nav_shape"])
if "sig_shape" in raw_data:
data["sig_shape"] = tuple(raw_data["sig_shape"])
if "sync_offset" in raw_data:
data["sync_offset"] = raw_data["sync_offset"]
return data
def read_hdr_file(path):
result = {}
# FIXME: do this open via the io backend!
with open(path, encoding='utf-8', errors='ignore') as f:
for line in f:
if line.startswith("HDR") or line.startswith("End\t"):
continue
k, v = line.split("\t", 1)
k = k.rstrip(':')
v = v.rstrip("\n")
result[k] = v
return result
def is_valid_hdr(path):
# FIXME: do this open via the io backend!
with open(path, encoding='utf-8', errors='ignore') as f:
line = next(f)
return line.startswith("HDR")
def nav_shape_from_hdr(hdr):
if 'ScanX' in hdr and 'ScanY' in hdr:
return (int(hdr['ScanY']), int(hdr['ScanX']))
num_frames, scan_x = (
int(hdr['Frames in Acquisition (Number)']),
int(hdr['Frames per Trigger (Number)'])
)
nav_shape = (num_frames // scan_x, scan_x)
return nav_shape
def _pattern(path: str) -> str:
path, ext = os.path.splitext(path)
ext = ext.lower()
if ext == '.mib':
pattern = "%s*.mib" % (
re.sub(r'[0-9]+$', '', escape(path))
)
elif ext == '.hdr':
pattern = "%s*.mib" % escape(path)
else:
raise DataSetException("unknown extension")
return pattern
def get_filenames(path: str, disable_glob=False) -> list[str]:
if disable_glob:
return [path]
else:
return glob(_pattern(path))
def _get_sequence(f: "MIBHeaderReader"):
return f.fields['sequence_first_image']
def get_image_count_and_sig_shape(
path: str,
disable_glob: bool = False,
) -> tuple[int, tuple[int, int]]:
fns = get_filenames(path, disable_glob=disable_glob)
headers = MIBDataSet._preread_headers(fns)
count = 0
files = []
for path, fields in headers.items():
f = MIBHeaderReader(path, fields=fields)
count += f.fields['num_images']
files.append(f)
try:
first_file = list(sorted(files, key=_get_sequence))[0]
sig_shape = first_file.fields['image_size']
return count, sig_shape
except IndexError:
raise DataSetException("no files found")
# These encoders takes 2D input/output data - this means we can use
# strides to do slicing and reversing. 2D input data means one output
# row (of bytes) corresponds to one input row (of pixels).
@numba.njit(cache=True)
def encode_u1(inp, out):
for y in range(out.shape[0]):
out[y] = inp[y]
@numba.njit(cache=True)
def encode_u2(inp, out):
for y in range(out.shape[0]):
row_out = out[y]
row_in = inp[y]
for i in range(row_in.shape[0]):
in_value = row_in[i]
row_out[i * 2] = (0xFF00 & np.uint16(in_value)) >> 8
row_out[i * 2 + 1] = 0xFF & np.uint16(in_value)
@numba.njit(cache=True)
def encode_r1(inp, out):
for y in range(out.shape[0]):
row_out = out[y]
row_in = inp[y]
for stripe in range(row_out.shape[0] // 8):
for byte in range(8):
out_byte = 0
for bitpos in range(8):
value = row_in[64 * stripe + 8 * byte + bitpos] & 1
out_byte |= (np.uint64(value) << bitpos)
row_out[(stripe + 1) * 8 - (byte + 1)] = out_byte
@numba.njit(cache=True)
def encode_r6(inp, out):
for y in range(out.shape[0]):
row_out = out[y]
row_in = inp[y]
for i in range(row_out.shape[0]):
col = i % 8
pos = i // 8
in_pos = (pos + 1) * 8 - col - 1
row_out[i] = row_in[in_pos]
@numba.njit(cache=True)
def encode_r12(inp, out):
for y in range(out.shape[0]):
row_out = out[y]
row_in = inp[y]
for i in range(row_in.shape[0]):
col = i % 4
pos = i // 4
in_pos = (pos + 1) * 4 - col - 1
in_value = row_in[in_pos]
row_out[i * 2] = (0xFF00 & in_value) >> 8
row_out[i * 2 + 1] = 0xFF & in_value
@numba.njit(inline='always', cache=True)
def _get_row_start_stop(offset_global, offset_local, stride, row_idx, row_length):
start = offset_global + offset_local + row_idx * stride
stop = start + row_length
return start, stop
@numba.njit(inline='always', cache=True)
def _mib_r24_px_to_bytes(
bpp, frame_in_file_idx, slice_sig_size, sig_size, sig_origin,
frame_footer_bytes, frame_header_bytes, file_header_bytes,
file_idx, read_ranges,
):
# we are reading a part of a single frame, so we first need to find
# the offset caused by headers and footers:
footer_offset = frame_footer_bytes * frame_in_file_idx
header_offset = frame_header_bytes * (frame_in_file_idx + 1)
byte_offset = footer_offset + header_offset
# now let's figure in the current frame index:
# (go down into the file by full frames; `sig_size`)
offset = file_header_bytes + byte_offset + frame_in_file_idx * sig_size * bpp // 8
# offset in px in the current frame:
sig_origin_bytes = sig_origin * bpp // 8
start = offset + sig_origin_bytes
# size of the sig part of the slice:
sig_size_bytes = slice_sig_size * bpp // 8
stop = start + sig_size_bytes
read_ranges.append((file_idx, start, stop))
# this is the addition for medipix 24bit raw:
# we read the part from the "second 12bit frame"
second_frame_offset = sig_size * bpp // 8
read_ranges.append((file_idx, start + second_frame_offset, stop + second_frame_offset))
mib_r24_get_read_ranges = make_get_read_ranges(px_to_bytes=_mib_r24_px_to_bytes)
@numba.njit(inline="always", cache=True)
def _mib_2x2_tile_block(
slices_arr, fileset_arr, slice_sig_sizes, sig_origins,
inner_indices_start, inner_indices_stop, frame_indices, sig_size,
px_to_bytes, bpp, frame_header_bytes, frame_footer_bytes, file_idxs,
slice_offset, extra, sig_shape,
):
"""
Generate read ranges for 2x2 Merlin Quad raw data.
The arrangement means that reading a contiguous block of data from the file,
we get data from all four quadrants. The arrangement, and thus resulting
array, looks like this:
_________
| 1 | 2 |
---------
| 3 | 4 |
---------
with the original data layed out like this:
[4 | 3 | 2 | 1]
(note that quadrants 3 and 4 are also flipped in x and y direction in the
resulting array, compared to the original data)
So if we read one row of raw data, we first get the bottom-most rows from 4
and 3 first, then the top-most rows from 2 and 1.
This is similar to how FRMS6 works, and we generate the read ranges in a
similar way here. In addition to the cut-and-flip from FRMS6, we also have
the split in x direction in quadrants.
"""
result = NumbaList()
# positions in the signal dimensions:
for slice_idx in range(slices_arr.shape[0]):
# (offset, size) arrays defining what data to read (in pixels)
# NOTE: assumes contiguous tiling scheme
# (i.e. a shape like (1, 1, ..., 1, X1, ..., XN))
# where X1 is <= the dataset shape at that index, and X2, ..., XN are
# equal to the dataset shape at that index
slice_origin = slices_arr[slice_idx][0]
slice_shape = slices_arr[slice_idx][1]
read_ranges = NumbaList()
x_shape = slice_shape[1]
x_size = x_shape * bpp // 8 # back in bytes
x_size_half = x_size // 2
stride = x_size_half * 4
sig_size_bytes = sig_size * bpp // 8
y_start = slice_origin[0]
y_stop = slice_origin[0] + slice_shape[0]
y_size_half = sig_shape[0] // 2
y_size = sig_shape[0]
# inner "depth" loop along the (flat) navigation axis of a tile:
for i, inner_frame_idx in enumerate(range(inner_indices_start, inner_indices_stop)):
inner_frame = frame_indices[inner_frame_idx]
file_idx = file_idxs[i]
f = fileset_arr[file_idx]
frame_in_file_idx = inner_frame - f[0]
file_header_bytes = f[3]
# we are reading a part of a single frame, so we first need to find
# the offset caused by headers:
header_offset = file_header_bytes + frame_header_bytes * (frame_in_file_idx + 1)
# now let's figure in the current frame index:
# (go down into the file by full frames; `sig_size`)
offset = header_offset + frame_in_file_idx * sig_size_bytes
# in total, we generate depth * 2 * (y_stop - y_start) read ranges per tile
for y in range(y_start, y_stop):
if y < y_size_half:
# top: no y-flip, no x-flip
flip = 0
# quadrant 1, left part of the result: we have the three other blocks
# in the original data in front of us
start, stop = _get_row_start_stop(
offset, 3 * x_size_half, stride, y, x_size_half
)
read_ranges.append((
file_idx,
start,
stop,
flip,
))
# quadrant 2, right part of the result: we have the two other blocks
# in the original data in front of us
start, stop = _get_row_start_stop(
offset, 2 * x_size_half, stride, y, x_size_half
)
read_ranges.append((
file_idx,
start,
stop,
flip,
))
else:
# bottom: both x and y flip
flip = 1
y = y_size - y - 1
# quadrant 3, left part of the result: we have the one other block
# in the original data in front of us
start, stop = _get_row_start_stop(
offset, 1 * x_size_half, stride, y, x_size_half
)
read_ranges.append((
file_idx,
start,
stop,
flip,
))
# quadrant 4, right part of the result: we have the no other blocks
# in the original data in front of us
start, stop = _get_row_start_stop(offset, 0, stride, y, x_size_half)
read_ranges.append((
file_idx,
start,
stop,
flip,
))
# the indices are compressed to the selected frames
compressed_slice = np.array([
[slice_offset + inner_indices_start] + [i for i in slice_origin],
[inner_indices_stop - inner_indices_start] + [i for i in slice_shape],
])
result.append((slice_idx, compressed_slice, read_ranges))
return result
@numba.njit(inline='always', cache=True, boundscheck=True)
def decode_r1_swap_2x2(inp, out, idx, native_dtype, rr, origin, shape, ds_shape):
"""
RAW 1bit format: each pixel is actually saved as a single bit. 64 bits
need to be unpacked together. This is the quad variant.
Parameters
==========
inp : np.ndarray
out : np.ndarray
The output buffer, with the signal dimensions flattened
idx : int
The index in the read ranges array
native_dtype : nt.DtypeLike
The "native" dtype (format-specific)
rr : np.ndarray
A single entry from the read ranges array
origin : np.ndarray
The 3D origin of the tile, for example :code:`np.array([2, 0, 0])`
shape : np.ndarray
The 3D tileshape, for example :code:`np.array([2, 512, 512])`
ds_shape : np.ndarray
The complete ND dataset shape, for example :code:`np.array([32, 32, 512, 512])`
"""
# in case of 2x2 quad, the index into `out` is not straight `idx`, but
# we have `2 * shape[1]` read ranges generated for one depth.
num_rows_tile = shape[1]
out_3d = out.reshape(out.shape[0], -1, shape[-1])
# each line in the output array generates two entries in the
# read ranges. so `(idx // 2) % num_rows_tile` is the correct
# out_y:
out_y = (idx // 2) % num_rows_tile
out_x_start = (idx % 2) * (shape[-1] // 2)
depth = idx // (num_rows_tile * 2)
flip = rr[3]
if flip == 0:
for stripe in range(inp.shape[0] // 8):
for byte in range(8):
inp_byte = inp[(stripe + 1) * 8 - (byte + 1)]
for bitpos in range(8):
out_x = 64 * stripe + 8 * byte + bitpos
out_x += out_x_start
out_3d[depth, out_y, out_x] = (inp_byte >> bitpos) & 1
else:
# flip in x direction:
x_shape = shape[2]
x_shape_half = x_shape // 2
for stripe in range(inp.shape[0] // 8):
for byte in range(8):
inp_byte = inp[(stripe + 1) * 8 - (byte + 1)]
for bitpos in range(8):
out_x = out_x_start + x_shape_half - 1 - (64 * stripe + 8 * byte + bitpos)
out_3d[depth, out_y, out_x] = (inp_byte >> bitpos) & 1
@numba.njit(inline='always', cache=True, boundscheck=True)
def decode_r6_swap_2x2(inp, out, idx, native_dtype, rr, origin, shape, ds_shape):
"""
RAW 6bit format: the pixels need to be re-ordered in groups of 8. `inp`
should have dtype uint8. This is the quad variant.
Parameters
==========
inp : np.ndarray
out : np.ndarray
The output buffer, with the signal dimensions flattened
idx : int
The index in the read ranges array
native_dtype : nt.DtypeLike
The "native" dtype (format-specific)
rr : np.ndarray
A single entry from the read ranges array
origin : np.ndarray
The 3D origin of the tile, for example :code:`np.array([2, 0, 0])`
shape : np.ndarray
The 3D tileshape, for example :code:`np.array([2, 512, 512])`
ds_shape : np.ndarray
The complete ND dataset shape, for example :code:`np.array([32, 32, 512, 512])`
"""
# in case of 2x2 quad, the index into `out` is not straight `idx`, but
# we have `2 * shape[1]` read ranges generated for one depth.
num_rows_tile = shape[1]
out_3d = out.reshape(out.shape[0], -1, shape[-1])
# each line in the output array generates two entries in the
# read ranges. so `(idx // 2) % num_rows_tile` is the correct
# out_y:
out_y = (idx // 2) % num_rows_tile
out_x_start = (idx % 2) * (shape[-1] // 2)
depth = idx // (num_rows_tile * 2)
flip = rr[3]
out_cut = out_3d[depth, out_y, out_x_start:out_x_start + out_3d.shape[2] // 2]
if flip == 0:
for i in range(out_cut.shape[0]):
col = i % 8
pos = i // 8
out_pos = (pos + 1) * 8 - col - 1
out_cut[out_pos] = inp[i]
else:
# flip in x direction:
for i in range(out_cut.shape[0]):
col = i % 8
pos = i // 8
out_pos = (pos + 1) * 8 - col - 1
out_cut[out_cut.shape[0] - out_pos - 1] = inp[i]
@numba.njit(inline='always', cache=True, boundscheck=True)
def decode_r12_swap_2x2(inp, out, idx, native_dtype, rr, origin, shape, ds_shape):
"""
RAW 12bit format: the pixels need to be re-ordered in groups of 4. `inp`
should be an uint8 view on padded big endian 12bit data (">u2").
This is the quad variant.
Parameters
==========
inp : np.ndarray
out : np.ndarray
The output buffer, with the signal dimensions flattened
idx : int
The index in the read ranges array
native_dtype : nt.DtypeLike
The "native" dtype (format-specific)
rr : np.ndarray
A single entry from the read ranges array
origin : np.ndarray
The 3D origin of the tile, for example :code:`np.array([2, 0, 0])`
shape : np.ndarray
The 3D tileshape, for example :code:`np.array([2, 512, 512])`
ds_shape : np.ndarray
The complete ND dataset shape, for example :code:`np.array([32, 32, 512, 512])`
"""
# in case of 2x2 quad, the index into `out` is not straight `idx`, but
# we have `2 * shape[1]` read ranges generated for one depth.
num_rows_tile = shape[1]
out_3d = out.reshape(out.shape[0], -1, shape[-1])
# each line in the output array generates two entries in the
# read ranges. so `(idx // 2) % num_rows_tile` is the correct
# out_y:
out_y = (idx // 2) % num_rows_tile
out_x_start = (idx % 2) * (shape[-1] // 2)
depth = idx // (num_rows_tile * 2)
flip = rr[3]
out_cut = out_3d[depth, out_y, out_x_start:out_x_start + out_3d.shape[2] // 2]
if flip == 0:
for i in range(out_cut.shape[0]):
col = i % 4
pos = i // 4
out_pos = (pos + 1) * 4 - col - 1
out_cut[out_pos] = (np.uint16(inp[i * 2]) << 8) + (inp[i * 2 + 1] << 0)
else:
# flip in x direction:
for i in range(out_cut.shape[0]):
col = i % 4
pos = i // 4
out_pos = (pos + 1) * 4 - col - 1
out_cut[out_cut.shape[0] - out_pos - 1] = (
(np.uint16(inp[i * 2]) << 8) + (inp[i * 2 + 1] << 0)
)
# reference non-quad impl:
# for i in range(out.shape[1]):
# col = i % 4
# pos = i // 4
# out_pos = (pos + 1) * 4 - col - 1
# out[idx, out_pos] = (inp[i * 2] << 8) + (inp[i * 2 + 1] << 0)
mib_2x2_get_read_ranges = make_get_read_ranges(
read_ranges_tile_block=_mib_2x2_tile_block
)
@numba.njit(inline='always', cache=True)
def decode_r1_swap(inp, out, idx, native_dtype, rr, origin, shape, ds_shape):
"""
RAW 1bit format: each pixel is actually saved as a single bit. 64 bits
need to be unpacked together.
"""
for stripe in range(inp.shape[0] // 8):
for byte in range(8):
inp_byte = inp[(stripe + 1) * 8 - (byte + 1)]
for bitpos in range(8):
out[idx, 64 * stripe + 8 * byte + bitpos] = (inp_byte >> bitpos) & 1
@numba.njit(inline='always', cache=True)
def decode_r6_swap(inp, out, idx, native_dtype, rr, origin, shape, ds_shape):
"""
RAW 6bit format: the pixels need to be re-ordered in groups of 8. `inp`
should have dtype uint8.
"""
for i in range(out.shape[1]):
col = i % 8
pos = i // 8
out_pos = (pos + 1) * 8 - col - 1
out[idx, out_pos] = inp[i]
@numba.njit(inline='always', cache=True)
def decode_r12_swap(inp, out, idx, native_dtype, rr, origin, shape, ds_shape):
"""
RAW 12bit format: the pixels need to be re-ordered in groups of 4. `inp`
should be an uint8 view on big endian 12bit data (">u2")
"""
for i in range(out.shape[1]):
col = i % 4
pos = i // 4
out_pos = (pos + 1) * 4 - col - 1
out[idx, out_pos] = (np.uint16(inp[i * 2]) << 8) + (np.uint16(inp[i * 2 + 1]) << 0)
@numba.njit(inline='always', cache=True)
def decode_r24_swap(inp, out, idx, native_dtype, rr, origin, shape, ds_shape):
"""
RAW 24bit format: a single 24bit consists of two frames that are encoded
like the RAW 12bit format, the first contains the most significant bits.
So after a frame header, there are (512, 256) >u2 values, which then
need to be shuffled like in `decode_r12_swap`.
This decoder function only works together with mib_r24_get_read_ranges
which generates twice as many read ranges than normally.
"""
for i in range(out.shape[1]):
col = i % 4
pos = i // 4
out_pos = (pos + 1) * 4 - col - 1
out_val = np.uint32((np.uint32(inp[i * 2]) << 8) + (np.uint32(inp[i * 2 + 1]) << 0))
if idx % 2 == 0: # from first frame: most significant bits
out_val = out_val << 12
out[idx // 2, out_pos] += out_val
class MIBDecoder(Decoder):
def __init__(self, header: "HeaderDict"):
self._kind = header['mib_kind']
self._dtype = header['dtype']
self._bit_depth = header['bits_per_pixel']
self._header = header
def do_clear(self):
"""
In case of 24bit raw data, the output buffer needs to be cleared
before writing, as we can't decode the whole frame in a single call of the decoder.
The `decode_r24_swap` function needs to be able to add to the output
buffer, so at the beginning, it needs to be cleared.
"""
if self._kind == 'r' and self._bit_depth == 24:
return True
return False
def _get_decode_r(self):
bit_depth = self._bit_depth
layout = self._header['sensor_layout']
num_chips = self._header['num_chips']
if layout == (2, 2) and num_chips == 4:
if bit_depth == 1:
return decode_r1_swap_2x2
elif bit_depth == 6:
return decode_r6_swap_2x2
elif bit_depth == 12:
return decode_r12_swap_2x2
else:
raise NotImplementedError(
f"bit depth {bit_depth} not implemented for layout {layout}"
)
if bit_depth == 1:
return decode_r1_swap
elif bit_depth == 6:
return decode_r6_swap
elif bit_depth == 12:
return decode_r12_swap
elif bit_depth == 24:
return decode_r24_swap
else:
raise ValueError("unknown raw bitdepth")
def get_decode(self, native_dtype, read_dtype):
kind = self._kind
if kind == "u":
return DtypeConversionDecoder().get_decode(
native_dtype=native_dtype,
read_dtype=read_dtype,
)
elif kind == "r":
# FIXME: on big endian systems, these need to be implemented without byteswapping
return self._get_decode_r()
else:
raise RuntimeError("unknown type of MIB file")
def get_native_dtype(self, inp_native_dtype, read_dtype):
if self._kind == "u":
# drop the byteswap from the dtype, if it is there
return inp_native_dtype.newbyteorder('N')
else:
# decode byte-by-byte
return np.dtype("u1")
MIBKind = Literal['u', 'r']
class HeaderDict(TypedDict):
header_size_bytes: int
dtype: "nt.DTypeLike"
mib_dtype: str
mib_kind: MIBKind
bits_per_pixel: int
image_size: tuple[int, int]
image_size_bytes: int
sequence_first_image: int
filesize: int
num_images: int
num_chips: int
sensor_layout: tuple[int, int]
class MIBHeaderReader:
def __init__(
self,
path: str,
fields: Optional[HeaderDict] = None,
sequence_start: int = 0,
):
self.path = path
if fields is None:
self._fields = None
else:
self._fields = fields
self._sequence_start = sequence_start
def __repr__(self) -> str:
return "<MIBHeaderReader: %s>" % self.path
@staticmethod
def _get_np_dtype(dtype: str, bit_depth: int) -> "nt.DTypeLike":
dtype = dtype.lower()
num_bits = int(dtype[1:])
if dtype[0] == "u":
num_bytes = num_bits // 8
return np.dtype(">u%d" % num_bytes)
elif dtype[0] == "r":
if bit_depth == 1:
return np.dtype("uint64")
elif bit_depth == 6:
return np.dtype("uint8")
elif bit_depth in (12, 24): # 24bit raw is two 12bit images after another
return np.dtype("uint16")
else:
raise NotImplementedError("unknown bit depth: %s" % bit_depth)
else:
raise NotImplementedError(f"unknown dtype: {dtype}")
def read_header(self) -> HeaderDict:
header, filesize = self._read_header_bytes(self.path)
self._fields = self._parse_header_bytes(header, filesize)
return self._fields
@staticmethod
def _read_header_bytes(path) -> tuple[bytes, int]:
# FIXME: do this read via the IO backend!
with open(file=path, mode='rb') as f:
filesize = os.fstat(f.fileno()).st_size
return f.read(1024), filesize
@staticmethod
def _parse_header_bytes(header: bytes, filesize: int) -> HeaderDict:
header: str = header.decode(encoding='ascii', errors='ignore')
parts = header.split(",")
header_size_bytes = int(parts[2])
parts = [p
for p in header[:header_size_bytes].split(",")
if '\x00' not in p]
dtype = parts[6].lower()
mib_kind: Literal['u', 'r']
# To make mypy Literal check happy...
if dtype[0] == "r":
mib_kind = "r"
elif dtype[0] == "u":
mib_kind = "u"
else:
raise ValueError("unknown kind: %s" % dtype[0])
image_size = (int(parts[5]), int(parts[4]))
# FIXME: There can either be threshold values for all chips, or maybe also
# none. For now, we just make use of the fact that the bit depth is
# supposed to be the last value.
bits_per_pixel_raw = int(parts[-1])
if mib_kind == "u":
bytes_per_pixel = int(parts[6][1:]) // 8
image_size_bytes = image_size[0] * image_size[1] * bytes_per_pixel
num_images = filesize // (
image_size_bytes + header_size_bytes
)
elif mib_kind == "r":
size_factor = {
1: 1/8,
6: 1,
12: 2,
24: 4,
}[bits_per_pixel_raw]
if bits_per_pixel_raw == 24:
image_size = (image_size[0], image_size[1] // 2)
image_size_bytes = int(image_size[0] * image_size[1] * size_factor)
num_images = filesize // (
image_size_bytes + header_size_bytes
)
num_chips = int(parts[3])
# 2x2, Nx1, 2x2G, Nx1G -> strip off the 'G' suffix for now:
# Assumption: layout is width x height
# We currently only support 1x1 and 2x2 layouts, and support for raw
# files with 2x2 layout is limited to 1, 6 or 12 bits. Support for 24bit TBD.
# We don't yet support general Nx1 layouts, or arbitrary "chip select"
# values (2x2 layout, but only a subset of them is enabled).
sensor_layout_str = parts[7].replace('G', '').split('x')
sensor_layout = (
int(sensor_layout_str[0]),
int(sensor_layout_str[1]),
)
if mib_kind == "r":
# in raw data, the layout information is not decoded yet - we just get
# rows from different sensors after one another. The image size is
# accordingly also different.
# NOTE: this is still a bit hacky - individual sensors in a layout
# can be enabled or disabled; because we only support 1x1 and 2x2
# layouts for now, we just do the "layout based reshaping" only
# in the 2x2 case:
if num_chips > 1:
px_length = image_size[0]
image_size_orig = image_size
image_size = (
px_length * sensor_layout[1],
px_length * sensor_layout[0],
)
if prod(image_size_orig) != prod(image_size):
raise ValueError(
f"invalid sensor layout {sensor_layout} "
f"(original image size: {image_size_orig})"
)
return {
'header_size_bytes': header_size_bytes,
'dtype': MIBHeaderReader._get_np_dtype(parts[6], bits_per_pixel_raw),
'mib_dtype': dtype,
'mib_kind': mib_kind,
'bits_per_pixel': bits_per_pixel_raw,
'image_size': image_size,
'image_size_bytes': image_size_bytes,
'sequence_first_image': int(parts[1]),
'filesize': filesize,
'num_images': num_images,
'num_chips': num_chips,
'sensor_layout': sensor_layout,
}
@property
def num_frames(self) -> int:
return self.fields['num_images']
@property
def start_idx(self) -> int:
return self.fields['sequence_first_image'] - self.sequence_start
@property
def sequence_start(self) -> Optional[int]:
return self._sequence_start
@sequence_start.setter
def sequence_start(self, val: int):
self._sequence_start = val
@property
def fields(self) -> HeaderDict:
if self._fields is None:
self._fields = self.read_header()
return self._fields
class MIBFile(File):
def __init__(self, header, *args, **kwargs):
self._header = header
super().__init__(*args, **kwargs)
def get_array_from_memview(self, mem: memoryview, slicing: OffsetsSizes):
mem = mem[slicing.file_offset:-slicing.skip_end]
res = np.frombuffer(mem, dtype="uint8")
cutoff = self._header['num_images'] * (
self._header['image_size_bytes'] + self._header['header_size_bytes']
)
res = res[:cutoff]
return res.view(dtype=self._native_dtype).reshape(
(self.num_frames, -1)
)[:, slicing.frame_offset:slicing.frame_offset + slicing.frame_size]
class MIBFileSet(FileSet):
def __init__(
self,
header: HeaderDict,
*args, **kwargs
):
self._header = header
super().__init__(*args, **kwargs)
def _clone(self, *args, **kwargs):
return self.__class__(header=self._header, *args, **kwargs)
def get_read_ranges(
self, start_at_frame: int, stop_before_frame: int,
dtype, tiling_scheme: TilingScheme, sync_offset: int = 0,
roi: Union[np.ndarray, None] = None,
):
fileset_arr = self.get_as_arr()
bit_depth = self._header['bits_per_pixel']
kind = self._header['mib_kind']
layout = self._header['sensor_layout']
if kind == "r" and bit_depth == 1:
# special case for bit-packed 1-bit format:
bpp = 1
else:
bpp = np.dtype(dtype).itemsize * 8
roi_nonzero = None
if roi is not None:
roi_nonzero = flat_nonzero(roi).astype(np.int64)
kwargs = dict(
start_at_frame=start_at_frame,
stop_before_frame=stop_before_frame,
roi_nonzero=roi_nonzero,
depth=tiling_scheme.depth,
slices_arr=tiling_scheme.slices_array,
fileset_arr=fileset_arr,
sig_shape=tuple(tiling_scheme.dataset_shape.sig),
sync_offset=sync_offset,
bpp=bpp,
frame_header_bytes=self._frame_header_bytes,
frame_footer_bytes=self._frame_footer_bytes,
)
if layout == (1, 1) or self._header['num_chips'] == 1:
if kind == "r" and bit_depth in (24,):
return mib_r24_get_read_ranges(**kwargs)
else:
return default_get_read_ranges(**kwargs)
elif layout == (2, 2):
if kind == "r":
return mib_2x2_get_read_ranges(**kwargs)
else:
return default_get_read_ranges(**kwargs)
else:
raise NotImplementedError(
f"No support for layout {layout} yet - please contact us!"
)
[docs]
class MIBDataSet(DataSet):
# FIXME include sample file for doctest, see Issue #86
"""
MIB data sets consist of one or more `.mib` files, and optionally
a `.hdr` file. The HDR file is used to automatically set the
`nav_shape` parameter from the fields "Frames per Trigger" and "Frames
in Acquisition." When loading a MIB data set, you can either specify
the path to the HDR file, or choose one of the MIB files. The MIB files
are assumed to follow a naming pattern of some non-numerical prefix,
and a sequential numerical suffix.
Note that if you are using a per-pixel or per-scan trigger setup, LiberTEM
won't be able to deduce the x scanning dimension - in that case, you will
need to specify the `nav_shape` yourself.
Currently, we support all integer formats, and most RAW formats. Especially,
the following configurations are not yet supported for RAW files:
* Non-2x2 layouts with more than one chip
* 24bit with more than one chip
.. versionadded:: 0.9.0
Support for the raw quad format was added
Examples
--------
>>> # both examples look for files matching /path/to/default*.mib:
>>> ds1 = ctx.load("mib", path="/path/to/default.hdr") # doctest: +SKIP
>>> ds2 = ctx.load("mib", path="/path/to/default64.mib") # doctest: +SKIP
Parameters
----------
path: str
Path to either the .hdr file or one of the .mib files
nav_shape: tuple of int, optional
A n-tuple that specifies the size of the navigation region ((y, x), but
can also be of length 1 for example for a line scan, or length 3 for
a data cube, for example)
sig_shape: tuple of int, optional
Common case: (height, width); but can be any dimensionality
sync_offset: int, optional
If positive, number of frames to skip from start
If negative, number of blank frames to insert at start
disable_glob : bool, default False
Usually, MIB data sets are stored as a series of .mib files, and we can reliably
guess the whole set from a single path. If you instead save your data set into
a single .mib file, and have multiple of these in a single directory with the same prefix
(for example, a.mib, a1.mib and a2.mib), loading a.mib would include a1.mib and a2.mib
in the data set. Setting :code:`disable_glob` to :code:`True` will only load the single
.mib file specified as :code:`path`.
num_partitions: int, optional
Override the number of partitions. This is useful if the
default number of partitions, chosen based on common workloads,
creates partitions which are too large (or small) for the UDFs
being run on this dataset.
"""
def __init__(
self,
path,
tileshape=None,
scan_size=None,
disable_glob=False,
nav_shape=None,
sig_shape=None,
sync_offset=0,
io_backend=None,
num_partitions=None,
):
super().__init__(
io_backend=io_backend,
num_partitions=num_partitions,
)
self._sig_dims = 2
self._path = str(path)
self._nav_shape = tuple(nav_shape) if nav_shape else nav_shape
self._sig_shape = tuple(sig_shape) if sig_shape else sig_shape
self._sync_offset = sync_offset
# handle backwards-compatibility:
if tileshape is not None:
warnings.warn(
"tileshape argument is ignored and will be removed after 0.6.0",
FutureWarning
)
if scan_size is not None:
warnings.warn(
"scan_size argument is deprecated. please specify nav_shape instead",
FutureWarning
)
if nav_shape is not None:
raise ValueError("cannot specify both scan_size and nav_shape")
self._nav_shape = tuple(scan_size)
if self._nav_shape is None and not self._path.lower().endswith(".hdr"):
raise ValueError(
"either nav_shape needs to be passed, or path needs to point to a .hdr file"
)
self._filename_cache = None
self._files_sorted: Optional[Sequence[MIBHeaderReader]] = None
# ._preread_headers() in _do_initialize() filles self._headers
self._headers = {}
self._meta = None
self._total_filesize = None
self._sequence_start = None
self._disable_glob = disable_glob
def _do_initialize(self):
filenames = self._filenames()
self._headers = self._preread_headers(filenames)
self._files_sorted = list(sorted(self._files(),
key=lambda f: f.fields['sequence_first_image']))
try:
first_file = self._files_sorted[0]
except IndexError:
raise DataSetException("no files found")
self._sequence_start = first_file.fields['sequence_first_image']
# _files_sorted is initially created with _sequence_start = None
# now that we know the first file we can set the correct value
for f in self._files_sorted:
f.sequence_start = self._sequence_start
if self._nav_shape is None:
hdr = read_hdr_file(self._path)
self._nav_shape = nav_shape_from_hdr(hdr)
if self._sig_shape is None:
self._sig_shape = first_file.fields['image_size']
elif int(prod(self._sig_shape)) != int(prod(first_file.fields['image_size'])):
raise DataSetException(
"sig_shape must be of size: %s" % int(prod(first_file.fields['image_size']))
)
self._sig_dims = len(self._sig_shape)
shape = Shape(self._nav_shape + self._sig_shape, sig_dims=self._sig_dims)
dtype = first_file.fields['dtype']
self._total_filesize = sum(
f.fields['filesize']
for f in self._files_sorted
)
self._image_count = self._num_images()
self._nav_shape_product = int(prod(self._nav_shape))
self._sync_offset_info = self.get_sync_offset_info()
self._meta = DataSetMeta(
shape=shape,
raw_dtype=dtype,
sync_offset=self._sync_offset,
image_count=self._image_count,
)
return self
def initialize(self, executor):
return executor.run_function(self._do_initialize)
def get_diagnostics(self):
assert self._files_sorted is not None
first_file = self._files_sorted[0]
header = first_file.fields
return [
{"name": "Bits per pixel",
"value": str(header['bits_per_pixel'])},
{"name": "Data kind",
"value": str(header['mib_kind'])},
{"name": "Layout",
"value": str(header['sensor_layout'])},
]
@classmethod
def get_msg_converter(cls):
return MIBDatasetParams
@classmethod
def get_supported_extensions(cls):
return {"mib", "hdr"}
@classmethod
def detect_params(cls, path, executor):
pathlow = path.lower()
if pathlow.endswith(".mib"):
image_count, sig_shape = executor.run_function(get_image_count_and_sig_shape, path)
nav_shape = make_2D_square((image_count,))
elif pathlow.endswith(".hdr") and executor.run_function(is_valid_hdr, path):
hdr = executor.run_function(read_hdr_file, path)
image_count, sig_shape = executor.run_function(get_image_count_and_sig_shape, path)
nav_shape = nav_shape_from_hdr(hdr)
else:
return False
return {
"parameters": {
"path": path,
"nav_shape": nav_shape,
"sig_shape": sig_shape
},
"info": {
"image_count": image_count,
"native_sig_shape": sig_shape,
}
}
@staticmethod
def _preread_headers(filenames):
# Avoid overhead of creating the Pool on low-file-count datasets
if len(filenames) > 512:
# Default ThreadPoolExecutor allocates 5 threads per CPU on the machine
# In testing this was found to be far too many on Linux, and sometimes
# a good number on Windows but sometimes too many
max_workers = 2
if platform.system() == "Windows":
num_cores = psutil.cpu_count(logical=False)
max_workers = max(max_workers, int(num_cores * 0.5))
with ThreadPoolExecutor(max_workers=max_workers) as p:
header_and_size = p.map(MIBHeaderReader._read_header_bytes, filenames)
else:
header_and_size = tuple(map(MIBHeaderReader._read_header_bytes, filenames))
res = {}
for path, (header, filesize) in zip(filenames, header_and_size):
res[path] = MIBHeaderReader._parse_header_bytes(header, filesize)
return res
def _filenames(self):
if self._filename_cache is not None:
return self._filename_cache
fns = get_filenames(self._path, disable_glob=self._disable_glob)
num_files = len(fns)
if num_files > 16384:
warnings.warn(
f"Saving data in many small files (here: {num_files}) is not efficient, "
"please increase the \"Images Per File\" parameter when acquiring data. If "
"this leads to \"Too many open files\" errors, on POSIX systems you can increase "
"the maximum open files limit using the \"ulimit\" shell command.",
RuntimeWarning
)
self._filename_cache = fns
return fns
def _files(self) -> Generator[MIBHeaderReader, None, None]:
for path, fields in self._headers.items():
f = MIBHeaderReader(path, fields=fields,
sequence_start=self._sequence_start)
yield f
def _num_images(self):
return sum(f.fields['num_images'] for f in self._files_sorted)
@property
def dtype(self):
return self._meta.raw_dtype
@property
def shape(self):
"""
the shape specified or imprinted by nav_shape from the HDR file
"""
return self._meta.shape
def check_valid(self):
pass
def get_cache_key(self):
return {
"path": self._path,
# shape is included here because the structure will be invalid if you open
# the same .mib file with a different nav_shape; should be no issue if you
# open via the .hdr file
"shape": tuple(self.shape),
"sync_offset": self._sync_offset,
}
def get_decoder(self) -> Decoder:
assert self._files_sorted is not None
first_file = self._files_sorted[0]
assert self.meta is not None
return MIBDecoder(
header=first_file.fields,
)
def _get_fileset(self):
assert self._sequence_start is not None
first_file = self._files_sorted[0]
header_size = first_file.fields['header_size_bytes']
return MIBFileSet(files=[
MIBFile(
path=f.path,
start_idx=f.start_idx,
end_idx=f.start_idx + f.num_frames,
native_dtype=f.fields['dtype'],
sig_shape=self._meta.shape.sig,
frame_header=f.fields['header_size_bytes'],
file_header=0,
header=f.fields,
)
for f in self._files_sorted
], header=first_file.fields, frame_header_bytes=header_size)
def get_base_shape(self, roi: Optional[np.ndarray]) -> tuple[int, ...]:
# With R-mode files, we are constrained to tile sizes that are a
# multiple of 64px in the fastest dimension!
# If we make sure full "x-lines" are taken, we are fine (this is the
# case by default)
base_shape = super().get_base_shape(roi)
assert self.meta is not None and base_shape[-1] == self.meta.shape[-1]
return base_shape
def get_partitions(self):
first_file = self._files_sorted[0]
fileset = self._get_fileset()
kind = first_file.fields['mib_kind']
for part_slice, start, stop in self.get_slices():
yield MIBPartition(
meta=self._meta,
fileset=fileset,
partition_slice=part_slice,
start_frame=start,
num_frames=stop - start,
kind=kind,
bit_depth=first_file.fields['bits_per_pixel'],
io_backend=self.get_io_backend(),
decoder=self.get_decoder(),
)
def __repr__(self):
return f"<MIBDataSet of {self.dtype} shape={self.shape}>"
class MIBPartition(BasePartition):
def __init__(self, kind, bit_depth, *args, **kwargs):
super().__init__(*args, **kwargs)
self._kind = kind
self._bit_depth = bit_depth