import os
import logging
import tempfile
from typing import Optional, Literal, Union
from collections.abc import Generator
from contextlib import contextmanager
from libertem_live.detectors.common import cleanup_handle_dir
from libertem_live.detectors.base.connection import DetectorConnection, PendingAcquisition
from libertem_live.detectors.base.acquisition import AcquisitionProtocol
from .control import MerlinControl
from opentelemetry import trace
import libertem_qd_mpx
tracer = trace.get_tracer(__name__)
logger = logging.getLogger(__name__)
class MerlinPendingAcquisition(PendingAcquisition):
def __init__(self, header: libertem_qd_mpx.QdAcquisitionConfig):
self._header = header
@property
def header(self):
return self._header
@property
def nimages(self) -> int:
return self.header.frames_in_acquisition()
@property
def nav_shape(self) -> Optional[tuple[int, ...]]:
"""
The concrete `nav_shape`, if it is known by the detector
"""
# from the ScanX and ScanY fields in the acquisition header
return self._header.nav_shape()
[docs]
class MerlinDetectorConnection(DetectorConnection):
"""
This class holds a permanent data connection to the merlin software.
Control connections are also possible to obtain from this class,
but are created on demand and not kept open.
You can use the convenience function
:meth:`libertem_live.api.LiveContext.make_connection` to create an instance,
instead of calling this constructor directly.
Parameters
----------
api_host
Hostname of the Merlin control server, default '127.0.0.1'
Should in most cases be the same as `data_host`.
api_port
Port of the Merlin control server, default 6341
data_host
Hostname of the Merlin data server, default '127.0.0.1'
data_port
Data port of the Merlin data server, default 6342
drain
Drain the socket before triggering. Enable this when
using old versions of the Merlin software, but not when
using an internal trigger.
recovery_strategy
What to do in case of errors - try to drain the socket
or immediately reconnect. If an error on the LiberTEM-live
side makes the Merlin software hang, try to switch to
:code:`"drain_then_reconnect"`.
huge_pages
Set to True to allocate shared memory in huge pages. This can improve performance
by reducing the page fault cost. Currently only available on Linux. Enabling this
requires reserving huge pages, either at system start, or before connecting.
For example, to reserve 10000 huge pages, you can run:
:code:`echo 10000 | sudo tee /proc/sys/vm/nr_hugepages`
See also the :code:`hugeadm` utility, especially :code:`hugeadm --explain`
can be useful to check your configuration.
Examples
--------
Usually, this class is instantiated using
:meth:`libertem_live.api.LiveContext.make_connection`:
>>> with ctx.make_connection('merlin').open(
... api_host='127.0.0.1',
... api_port=MERLIN_API_PORT,
... data_host='127.0.0.1',
... data_port=MERLIN_DATA_PORT,
... ) as conn:
... aq = ctx.make_acquisition(conn=conn, nav_shape=(32, 32))
... ctx.run_udf(dataset=aq, udf=SumUDF())
{'intensity': ...}
"""
def __init__(
self,
*,
api_host: str = '127.0.0.1',
api_port: int = 6341,
data_host: str = '127.0.0.1',
data_port: int = 6342,
drain: bool = False,
recovery_strategy: Union[
Literal['immediate_reconnect'],
Literal['drain_then_reconnect']
] = "immediate_reconnect",
huge_pages: bool = False,
):
self._api_host = api_host
self._api_port = api_port
self._data_host = data_host
self._data_port = data_port
self._data_socket: Optional[libertem_qd_mpx.QdConnection] = None
self._drain = drain
self._recovery_strategy = recovery_strategy
self._huge_pages = huge_pages
self._shm_handle_path = None
self.connect()
def connect(self):
if self._data_socket is not None:
return self._data_socket # already connected
self._shm_handle_path = self._make_socket_path()
self._data_socket = libertem_qd_mpx.QdConnection(
data_host=self._data_host,
data_port=self._data_port,
frame_stack_size=16, # FIXME! make configurable or determine automatically
shm_handle_path=self._shm_handle_path,
drain=self._drain,
recovery_strategy=self._recovery_strategy,
huge=self._huge_pages,
)
self._data_socket.start_passive()
return self._data_socket
@classmethod
def _make_socket_path(cls):
temp_path = tempfile.mkdtemp()
return os.path.join(temp_path, 'qd-shm-socket')
[docs]
def wait_for_acquisition(self, timeout: Optional[float] = None) -> Optional[PendingAcquisition]:
"""
Wait for at most `timeout` seconds for an acquisition to start. This
does not perform any triggering itself and expects something external
to arm and trigger the acquisition.
Once the detector is armed, this function returns a `PendingAcquisition`,
which can be converted to a full `Acquisition` object using
:meth:`libertem_live.api.LiveContext.make_acquisition`.
The function returns `None` on timeout.
Parameters
----------
timeout
Timeout in seconds. If `None`, wait indefinitely.
Examples
--------
>>> with ctx.make_connection('merlin').open(
... api_host='127.0.0.1',
... api_port=MERLIN_API_PORT,
... data_host='127.0.0.1',
... data_port=MERLIN_DATA_PORT,
... ) as conn:
... pending_aq = conn.wait_for_acquisition(timeout=1)
... # at this point, something else is arming and triggering the
... # detector:
... aq = ctx.make_acquisition(
... conn=conn,
... nav_shape=(32, 32),
... pending_aq=pending_aq,
... )
... ctx.run_udf(dataset=aq, udf=SumUDF())
{'intensity': ...}
"""
if self._data_socket is not None and self._data_socket.is_running():
self.close()
if self._data_socket is None:
self.connect()
assert self._data_socket is not None
acq_header = self._data_socket.wait_for_arm(timeout=timeout)
if acq_header is None:
return None
return MerlinPendingAcquisition(header=acq_header)
def get_acquisition_cls(self) -> type[AcquisitionProtocol]:
from .acquisition import MerlinAcquisition
return MerlinAcquisition
def __enter__(self):
if self._data_socket is None:
self.connect()
return self
[docs]
def close(self):
# implementing "close" and __enter__ above gives us contextmanager API
# for free:
if self._data_socket is not None:
self._data_socket.close()
self._data_socket = None
cleanup_handle_dir(self._shm_handle_path)
def get_conn_impl(self):
return self._data_socket # maybe remove `get_data_socket` function?
def get_data_socket(self) -> libertem_qd_mpx.QdConnection:
assert self._data_socket is not None, "need to be connected to call this"
return self._data_socket
@contextmanager
def control(self) -> Generator[MerlinControl, None, None]:
with MerlinControl(host=self._api_host, port=self._api_port) as c:
yield c
def read_sig_shape(self) -> tuple[int, int]:
with self.control() as c:
width = int(c.get('IMAGEX'))
height = int(c.get('IMAGEY'))
return (height, width)
def read_bitdepth(self) -> int:
with self.control() as c:
return int(c.get('COUNTERDEPTH'))
def get_active_controller(self):
from .controller import MerlinActiveController
return MerlinActiveController()
[docs]
class MerlinConnectionBuilder:
"""
Builder class that can construct :class:`MerlinDetectorConnection` instances.
Use the :meth:`open` method to create a connection.
"""
[docs]
def open(
self,
*,
api_host: str = '127.0.0.1',
api_port: int = 6341,
data_host: str = '127.0.0.1',
data_port: int = 6342,
drain: bool = False,
recovery_strategy: Union[
Literal['immediate_reconnect'],
Literal['drain_then_reconnect']
] = "immediate_reconnect",
huge_pages: bool = False,
) -> MerlinDetectorConnection:
"""
Connect to a Merlin Medipix detector system.
Parameters
----------
api_host
Hostname of the Merlin control server, default '127.0.0.1'
Should in most cases be the same as `data_host`.
api_port
Port of the Merlin control server, default 6341
data_host
Hostname of the Merlin data server, default '127.0.0.1'
data_port
Data port of the Merlin data server, default 6342
drain
Drain the socket before triggering. Enable this when
using old versions of the Merlin software, but not when
using an internal trigger.
recovery_strategy
What to do in case of errors - try to drain the socket
or immediately reconnect.
huge_pages
Set to True to allocate shared memory in huge pages. This can
improve performance by reducing the page fault cost. Currently only
available on Linux. Enabling this requires reserving huge pages,
either at system start, or before connecting.
For example, to reserve 10000 huge pages, you can run:
:code:`echo 10000 | sudo tee /proc/sys/vm/nr_hugepages`
See also the :code:`hugeadm` utility, especially :code:`hugeadm --explain`
can be useful to check your configuration.
Examples
--------
Usually, this method is directly used together with
:meth:`libertem_live.api.LiveContext.make_connection`:
>>> with ctx.make_connection('merlin').open(
... api_host='127.0.0.1',
... api_port=MERLIN_API_PORT,
... data_host='127.0.0.1',
... data_port=MERLIN_DATA_PORT,
... ) as conn:
... aq = ctx.make_acquisition(conn=conn, nav_shape=(32, 32))
... ctx.run_udf(dataset=aq, udf=SumUDF())
{'intensity': ...}
"""
return MerlinDetectorConnection(
api_host=api_host,
api_port=api_port,
data_host=data_host,
data_port=data_port,
drain=drain,
recovery_strategy=recovery_strategy,
huge_pages=huge_pages,
)