Source code for libertem_live.detectors.asi_tpx3.connection

import os
from typing import Optional
import tempfile
import logging

from libertem.common.math import prod

from libertem_live.detectors.common import cleanup_handle_dir
from libertem_live.detectors.base.connection import (
    PendingAcquisition, DetectorConnection,
    AcquisitionProtocol,
)

from libertem_asi_tpx3 import ASITpx3Connection

logger = logging.getLogger(__name__)


[docs] class AsiTpx3PendingAcquisition(PendingAcquisition): def __init__(self, acquisition_header): self._acquisition_header = acquisition_header @property def header(self): return self._acquisition_header @property def nimages(self) -> int: return prod(self.nav_shape) @property def nav_shape(self) -> tuple[int, ...]: return self._acquisition_header.get_nav_shape() def __repr__(self): return f"<AsiPendingAcquisition header={self._acquisition_header}>"
[docs] class AsiTpx3DetectorConnection(DetectorConnection): """ Connection to the ASI TPX3 software. Please see :class:`libertem_live.detectors.asi_tpx3.AsiTpx3ConnectionBuilder` for a description of the parameters, and use :meth:`libertem_live.api.LiveContext.make_connection` to create a connection. """ def __init__( self, data_host: str, data_port: int, buffer_size: int = 2048, bytes_per_chunk: Optional[int] = None, chunks_per_stack: int = 24, huge_pages: bool = False, ): self._passive_started = False self._data_host = data_host self._data_port = data_port if bytes_per_chunk is None: bytes_per_chunk = 16 * 1024 # approx: slot_size = bytes_per_chunk * chunks_per_stack self._num_slots = 1024 * 1024 * buffer_size // slot_size self._bytes_per_chunk = bytes_per_chunk self._huge_pages = huge_pages self._chunks_per_stack = chunks_per_stack self._shm_handle_path = None self._conn = self._connect() def _connect(self): self._shm_handle_path = self._make_handle_path() uri = f"{self._data_host}:{self._data_port}" logger.info(f"connecting to {uri} with shared memory handle {self._shm_handle_path}") return ASITpx3Connection( uri=uri, chunks_per_stack=self._chunks_per_stack, num_slots=self._num_slots, bytes_per_chunk=self._bytes_per_chunk, huge=self._huge_pages, handle_path=self._shm_handle_path, )
[docs] def wait_for_acquisition( self, timeout: Optional[float] = None, ) -> Optional[PendingAcquisition]: if not self._passive_started: self._conn.start_passive() self._passive_started = True header = self._conn.wait_for_arm(timeout) if header is None: return None return AsiTpx3PendingAcquisition( acquisition_header=header, )
def get_conn_impl(self): return self._conn @classmethod def _make_handle_path(cls): temp_path = tempfile.mkdtemp() return os.path.join(temp_path, 'asi-shm-socket') def stop_series(self): pass # TODO: what to do?
[docs] def close(self): if self._conn is not None: logger.info(f"closing connection to {self._data_host}:{self._data_port}") self._conn.close() self._passive_started = False self._conn = None cleanup_handle_dir(self._shm_handle_path)
def __enter__(self): if self._conn is None: self._conn = self._connect() return self def reconnect(self): if self._conn is not None: self.close() self._conn = self._connect() def log_stats(self): self._conn.log_shm_stats() def get_acquisition_cls(self) -> type[AcquisitionProtocol]: from .acquisition import AsiTpx3Acquisition return AsiTpx3Acquisition
[docs] class AsiTpx3ConnectionBuilder: """ Builder class that can construct :class:`AsiTpx3DetectorConnection` instances. Use the :meth:`open` method to create a connection. """
[docs] def open( self, data_host: str = "127.0.0.1", data_port: int = 8283, buffer_size: int = 2048, bytes_per_chunk: Optional[int] = None, chunks_per_stack: int = 24, huge_pages: bool = False, ) -> AsiTpx3DetectorConnection: """ Connect to the ASI TPX3 detector software (Accos). Parameters ---------- data_host The hostname or IP address of the computer running Accos data_port The TCP port to connect to buffer_size The total receive buffer in MiB that is used to stream data to worker processes. bytes_per_chunk How large is each chunk, in bytes. Approximate value, as this can change depending on events per scan position chunks_per_stack How many chunks should we assemble to a chunk stack? huge_pages Set to True to allocate shared memory in huge pages. This can improve performance by reducing the page fault cost. Currently only available on Linux. Enabling this requires reserving huge pages, either at system start, or before connecting. For example, to reserve 10000 huge pages, you can run: :code:`echo 10000 | sudo tee /proc/sys/vm/nr_hugepages` See also the :code:`hugeadm` utility, especially :code:`hugeadm --explain` can be useful to check your configuration. """ return AsiTpx3DetectorConnection( data_host=data_host, data_port=data_port, buffer_size=buffer_size, bytes_per_chunk=bytes_per_chunk, chunks_per_stack=chunks_per_stack, huge_pages=huge_pages, )