Source code for libertem_live.detectors.dectris.acquisition

import base64
import logging
from typing import Optional, TYPE_CHECKING
from collections.abc import Sequence

import numpy as np
from opentelemetry import trace

from libertem.common import Shape
from libertem.common.math import prod
from libertem.common.executor import (
    WorkerContext,
)
from libertem.io.dataset.base import (
    DataSetMeta, BasePartition, Partition, DataSet, TilingScheme,
)
from libertem.corrections.corrset import CorrectionSet
from sparseconverter import ArrayBackend, NUMPY, CUDA, CUPY

from libertem_live.detectors.base.acquisition import (
    AcquisitionMixin, GenericCommHandler, GetFrames
)
from libertem_live.hooks import Hooks, ReadyForDataEnv
from .controller import DectrisActiveController
from .common import AcquisitionParams, DetectorConfig
from .DEigerClient import DEigerClient

import libertem_dectris

if TYPE_CHECKING:
    from .connection import DectrisDetectorConnection
    from .common import DectrisPendingAcquisition

tracer = trace.get_tracer(__name__)
logger = logging.getLogger(__name__)


def get_darray(darray) -> np.ndarray:
    """
    Helper to decode darray-encoded arrays to numpy
    """
    data = darray['value']['data']
    if 'base64' not in darray['value']['filters']:
        raise RuntimeError("don't unterstand this encoding, bailing")
    shape = tuple(reversed(darray['value']['shape']))
    dtype = darray['value']['type']
    data = base64.decodebytes(data.encode("ascii"))
    return np.frombuffer(data, dtype=dtype).reshape(shape)


def dtype_from_frame(frame):
    dtype = np.dtype(frame.get_pixel_type()).newbyteorder(
        frame.get_endianess()
    )
    return dtype


def shape_from_frame(frame):
    return tuple(reversed(frame.get_shape()))


class DectrisGetFrames(GetFrames):
    CAM_CLIENT_CLS = libertem_dectris.CamClient
    FRAME_STACK_CLS = libertem_dectris.FrameStackHandle


class DectrisCommHandler(GenericCommHandler):
    def __init__(
        self,
        params: Optional[AcquisitionParams],
        conn: "DectrisDetectorConnection",
        controller: "Optional[DectrisActiveController]",
    ):
        super().__init__(conn=conn)
        self.params = params
        self.controller = controller

    def get_conn_impl(self):
        return self._conn.get_conn_impl()

    def start(self):
        if self.controller is not None and self.params is not None:
            logger.info(f"arming for acquisition with id={self.params.sequence_id}")
            self.controller.handle_start(self._conn, self.params.sequence_id)

    def done(self):
        if self.controller is not None:
            self.controller.handle_stop(self._conn)


# FIXME: naming: native `DetectorConnection` vs this is confusing?
[docs] class DectrisAcquisition(AcquisitionMixin, DataSet): ''' Acquisition from a DECTRIS detector. Use :meth:`libertem_live.api.LiveContext.make_acquisition` to instantiate this class! Examples -------- >>> with ctx.make_connection('dectris').open( ... api_host='127.0.0.1', ... api_port=DCU_API_PORT, ... data_host='127.0.0.1', ... data_port=DCU_DATA_PORT, ... ) as conn: ... aq = ctx.make_acquisition( ... conn=conn, ... nav_shape=(128, 128), ... ) Parameters ---------- conn An existing `DectrisDetectorConnection` instance nav_shape The navigation shape as a tuple, for example :code:`(height, width)` for a 2D STEM scan. frames_per_partition A tunable for configuring the feedback rate - more frames per partition means slower feedback, but less computational overhead. Might need to be tuned to adapt to the dwell time. pending_aq A pending acquisition in passive mode, obtained from :meth:`DectrisDetectorConnection.wait_for_acquisition`. If this is not provided, it's assumed that the detector should be actively armed and triggered. controller A `DectrisActiveController` instance, which can be obtained from :meth:`DectrisDetectorConnection.get_active_controller`. You can pass additional parameters to :meth:`DectrisDetectorConnection.get_active_controller` in order to change detector settings. If no controller is passed in, and `pending_aq` is also not given, then the acquisition will be started in active mode, leaving all detector settings unchanged. hooks Acquisition hooks to react to certain events ''' _conn: "DectrisDetectorConnection" _controller: "DectrisActiveController" _pending_aq: "DectrisPendingAcquisition" def __init__( self, # this replaces the {api,data}_{host,port} parameters: conn: "DectrisDetectorConnection", nav_shape: Optional[tuple[int, ...]] = None, frames_per_partition: int = 128, # in passive mode, we get this: pending_aq: Optional["DectrisPendingAcquisition"] = None, # in passive mode, we don't pass the controller. # (in active mode, you _can_ pass it, but if you don't, a default # controller will be created): controller: Optional[DectrisActiveController] = None, hooks: Optional[Hooks] = None, ): if frames_per_partition is None: frames_per_partition = 512 if controller is None and pending_aq is None: # default to active mode, with no settings to change: controller = conn.get_active_controller() if pending_aq is not None: if controller is not None: raise ValueError( "Got both active `controller` and pending acquisition, please only provide one" ) if hooks is None: hooks = Hooks() super().__init__( conn=conn, nav_shape=nav_shape, frames_per_partition=frames_per_partition, hooks=hooks, controller=controller, pending_aq=pending_aq, ) self._sig_shape: tuple[int, ...] = () self._acq_state: Optional[AcquisitionParams] = None
[docs] def get_api_client(self) -> DEigerClient: ''' Get an API client, which can be used to configure the detector. ''' return self._conn.get_api_client()
[docs] def get_detector_config(self) -> DetectorConfig: ''' Get (very limited subset of) detector configuration ''' ec = self._conn.get_api_client() # FIXME: initialize detector here, if not already initialized? shape_x = ec.detectorConfig("x_pixels_in_detector")['value'] shape_y = ec.detectorConfig("y_pixels_in_detector")['value'] bit_depth = ec.detectorConfig("bit_depth_image")['value'] return DetectorConfig( x_pixels_in_detector=shape_x, y_pixels_in_detector=shape_y, bit_depth=bit_depth, )
def initialize(self, executor) -> "DataSet": '' self._update_meta() return self def _update_meta(self): dc = self.get_detector_config() dtypes = { 8: np.uint8, 16: np.uint16, 32: np.uint32, } try: dtype = dtypes[dc.bit_depth] except KeyError: raise Exception(f"unknown bit depth: {dc.bit_depth}") self._sig_shape = (dc.y_pixels_in_detector, dc.x_pixels_in_detector) self._meta = DataSetMeta( shape=Shape(self._nav_shape + self._sig_shape, sig_dims=2), array_backends=self.array_backends, raw_dtype=dtype, dtype=dtype, ) @property def dtype(self): return self._meta.dtype @property def raw_dtype(self): return self._meta.raw_dtype @property def shape(self): return self._meta.shape @property def meta(self): return self._meta @property def array_backends(self) -> Sequence[ArrayBackend]: return (NUMPY, CUDA, CUPY) def get_correction_data(self): '' if self._controller is None or not self._controller.enable_corrections: return CorrectionSet() ec = self._conn.get_api_client() mask = ec.detectorConfig("pixel_mask") mask_arr = get_darray(mask) excluded_pixels = mask_arr > 0 return CorrectionSet(excluded_pixels=excluded_pixels) def start_acquisition(self): '' with tracer.start_as_current_span('start_acquisition'): if self._controller is not None: self._conn.prepare_for_active() self._controller.apply_file_writing() self._controller.apply_scan_settings(self._nav_shape) self._controller.apply_misc_settings() self._update_meta() sequence_id = self._controller.arm() nimages = prod(self.shape.nav) self._acq_state = AcquisitionParams( sequence_id=sequence_id, nimages=nimages, ) with tracer.start_as_current_span("DectrisAcquisition.trigger"): self._hooks.on_ready_for_data(ReadyForDataEnv(aq=self)) else: nimages = prod(self.shape.nav) sequence_id = self._pending_aq.series self._acq_state = AcquisitionParams( sequence_id=sequence_id, nimages=nimages, ) def end_acquisition(self): return def check_valid(self): '' pass def need_decode(self, read_dtype, roi, corrections): '' return True def get_base_shape(self, roi): '' return (1, *self.meta.shape.sig) @property def acquisition_state(self): '' return self._acq_state def get_partitions(self): '' num_frames = np.prod(self._nav_shape, dtype=np.uint64) num_partitions = int(num_frames // self._frames_per_partition) slices = BasePartition.make_slices(self.shape, num_partitions) for part_slice, start, stop in slices: yield DectrisLivePartition( start_idx=start, end_idx=stop, meta=self._meta, partition_slice=part_slice, ) def get_task_comm_handler(self) -> "DectrisCommHandler": '' return DectrisCommHandler( params=self._acq_state, conn=self._conn, controller=self._controller, )
class DectrisLivePartition(Partition): def __init__( self, start_idx, end_idx, partition_slice, meta, ): super().__init__(meta=meta, partition_slice=partition_slice, io_backend=None, decoder=None) self._start_idx = start_idx self._end_idx = end_idx def shape_for_roi(self, roi): return self.slice.adjust_for_roi(roi).shape @property def shape(self): return self.slice.shape @property def dtype(self): return self.meta.raw_dtype def set_corrections(self, corrections): self._corrections = corrections def set_worker_context(self, worker_context: WorkerContext): self._worker_context = worker_context def _preprocess(self, tile_data, tile_slice): if self._corrections is None: return self._corrections.apply(tile_data, tile_slice) def _get_tiles_fullframe( self, tiling_scheme: TilingScheme, dest_dtype="float32", roi=None, array_backend: Optional["ArrayBackend"] = None ): assert array_backend in (None, NUMPY, CUDA, CUPY) assert len(tiling_scheme) == 1, "only supports full frames tiling scheme for now" logger.debug("reading up to frame idx %d for this partition", self._end_idx) to_read = self._end_idx - self._start_idx with DectrisGetFrames( request_queue=self._worker_context.get_worker_queue(), dtype=dest_dtype, sig_shape=tuple(tiling_scheme[0].shape), ) as frames: yield from frames.get_tiles( to_read=to_read, start_idx=self._start_idx, tiling_scheme=tiling_scheme, roi=roi, array_backend=array_backend, ) def get_tiles(self, tiling_scheme, dest_dtype="float32", roi=None, array_backend: Optional["ArrayBackend"] = None): yield from self._get_tiles_fullframe( tiling_scheme, dest_dtype, roi, array_backend=array_backend ) def __repr__(self): return f"<DectrisLivePartition {self._start_idx}:{self._end_idx}>"