Source code for libertem_live.api

from typing import TYPE_CHECKING, overload, Union, Optional
from typing_extensions import Literal

from opentelemetry import trace

from libertem.executor.pipelined import PipelinedExecutor
# Avoid having Context in this module to make sure
# it is not imported by accident instead of LiveContext
from libertem.api import Context as LiberTEM_Context

from .detectors.base.connection import DetectorConnection, PendingAcquisition
from .detectors.base.controller import AcquisitionController
from .hooks import Hooks

if TYPE_CHECKING:
    from libertem_live.detectors.dectris import DectrisConnectionBuilder, DectrisAcquisition
    from libertem_live.detectors.dectris.connection import DectrisDetectorConnection
    from libertem_live.detectors.merlin import MerlinConnectionBuilder, MerlinAcquisition
    from libertem_live.detectors.merlin.connection import MerlinDetectorConnection
    from libertem_live.detectors.memory import MemoryConnectionBuilder, MemoryAcquisition
    from libertem_live.detectors.memory.acquisition import MemoryConnection
    from libertem_live.detectors.asi_tpx3 import (
        AsiTpx3DetectorConnection, AsiTpx3ConnectionBuilder, AsiTpx3Acquisition,
    )

tracer = trace.get_tracer(__name__)


class CleanupResultGeneratorProxy:
    def __init__(self, inner, callback):
        self._inner = inner
        self._inner_iter = iter(inner)
        self._callback = callback

    def __iter__(self):
        return self

    def __next__(self):
        try:
            return next(self._inner_iter)
        except Exception:
            self._callback()
            raise

    def close(self):
        try:
            self._inner.close()
        finally:
            self._callback()

    def __getattr__(self, name):
        return getattr(self._inner, name)


[docs] class LiveContext(LiberTEM_Context): ''' :class:`LiveContext` handles the computational resources needed to run UDFs on live data streams. It is the entry point to most interactions with the LiberTEM-live API. A :class:`LiveContext` behaves like a :class:`libertem.api.Context` in most circumstances. Notable differences are that it currently starts an :class:`~libertem.executor.pipelined.PipelinedExecutor` instead of a :class:`~libertem.executor.dask.DaskJobExecutor` if no executor is passed in the constructor, and that it can prepare and run acquisitions on top of loading offline datasets. The docstrings for most functions are inherited from the base class. Most methods, in particular :meth:`run_udf` and :meth:`run_udf_iter`, now accept both an acquisition object and a dataset as the :code:`dataset` parameter. ''' def _create_local_executor(self): ''' Live acquisition currently requires a suitable executor, for example :class:`~libertem.executor.pipelined.PipelinedExecutor`. ''' return PipelinedExecutor() def _start_acquisition(self, acquisition, udf): if hasattr(acquisition, 'start_acquisition'): acquisition.start_acquisition() def _end_acquisition(self, acquisition, udf): if hasattr(acquisition, 'end_acquisition'): acquisition.end_acquisition() @overload def make_connection( self, detector_type: Literal['asi_tpx3'], ) -> "AsiTpx3ConnectionBuilder": ... @overload def make_connection( self, detector_type: Literal['dectris'], ) -> "DectrisConnectionBuilder": ... @overload def make_connection( self, detector_type: Literal['merlin'], ) -> "MerlinConnectionBuilder": ... @overload def make_connection( self, detector_type: Literal['memory'], ) -> "MemoryConnectionBuilder": ...
[docs] def make_connection( self, detector_type: Union[ Literal['asi_tpx3'], Literal['dectris'], Literal['merlin'], Literal['memory'] ], ) -> Union[ "AsiTpx3ConnectionBuilder", "DectrisConnectionBuilder", "MerlinConnectionBuilder", "MemoryConnectionBuilder", ]: """ Connect to a detector system. Parameters ---------- detector_type The detector type as a string. Further connection parameters are passed to the :code:`open` method of the returned builder object. Examples -------- >>> data = np.random.random((23, 42, 51, 67)) >>> ctx = LiveContext() # doctest: +SKIP >>> with ctx.make_connection('memory').open(data=data) as conn: ... print("connected!") connected! """ # FIXME implement similar to LiberTEM datasets once # we have more detector types to support if detector_type == 'dectris': from libertem_live.detectors.dectris import DectrisConnectionBuilder return DectrisConnectionBuilder() if detector_type == 'asi_tpx3': from libertem_live.detectors.asi_tpx3 import AsiTpx3ConnectionBuilder return AsiTpx3ConnectionBuilder() elif detector_type == 'merlin': from libertem_live.detectors.merlin import MerlinConnectionBuilder return MerlinConnectionBuilder() elif detector_type == 'memory': from libertem_live.detectors.memory import MemoryConnectionBuilder return MemoryConnectionBuilder() else: raise NotImplementedError( f"Detector type {detector_type} doesn't support this API" )
@overload def make_acquisition( self, *, conn: "AsiTpx3DetectorConnection", nav_shape: Optional[tuple[int, ...]] = None, frames_per_partition: Optional[int] = None, controller: Optional[AcquisitionController] = None, pending_aq: Optional[PendingAcquisition] = None, hooks: Optional[Hooks] = None, ) -> "AsiTpx3Acquisition": ... @overload def make_acquisition( self, *, conn: "DectrisDetectorConnection", nav_shape: Optional[tuple[int, ...]] = None, frames_per_partition: Optional[int] = None, controller: Optional[AcquisitionController] = None, pending_aq: Optional[PendingAcquisition] = None, hooks: Optional[Hooks] = None, ) -> "DectrisAcquisition": ... @overload def make_acquisition( self, *, conn: "MerlinDetectorConnection", nav_shape: Optional[tuple[int, ...]] = None, frames_per_partition: Optional[int] = None, controller: Optional[AcquisitionController] = None, pending_aq: Optional[PendingAcquisition] = None, hooks: Optional[Hooks] = None, ) -> "MerlinAcquisition": ... @overload def make_acquisition( self, *, conn: "MemoryConnection", nav_shape: Optional[tuple[int, ...]] = None, frames_per_partition: Optional[int] = None, controller: Optional[AcquisitionController] = None, pending_aq: Optional[PendingAcquisition] = None, hooks: Optional[Hooks] = None, ) -> "MemoryAcquisition": ...
[docs] def make_acquisition( self, *, conn: DetectorConnection, nav_shape: Optional[tuple[int, ...]] = None, frames_per_partition: Optional[int] = None, controller: Optional[AcquisitionController] = None, pending_aq: Optional[PendingAcquisition] = None, hooks: Optional[Hooks] = None, ) -> Union[ "AsiTpx3Acquisition", "DectrisAcquisition", "MerlinAcquisition", "MemoryAcquisition", ]: """ Create an acquisition object. Examples -------- >>> data = np.random.random((23, 42, 51, 67)) >>> ctx = LiveContext() # doctest: +SKIP >>> with ctx.make_connection('memory').open(data=data) as conn: ... aq = ctx.make_acquisition(conn=conn) ... ctx.run_udf(dataset=aq, udf=SumUDF()) {'intensity': ...} """ cls = conn.get_acquisition_cls() instance = cls( conn=conn, nav_shape=nav_shape, frames_per_partition=frames_per_partition, controller=controller, pending_aq=pending_aq, hooks=hooks, ) return instance.initialize(self.executor)
[docs] def prepare_acquisition(self, detector_type, *args, trigger=None, **kwargs): ''' This method has been removed, please use `make_connection` and `make_acquisition`. ''' raise RuntimeError( "`LiveContext.prepare_acquisition` has been replaced with " # FIXME: text, docstring "`LiveContext.make_connection` and `LiveContext.make_acquisition`." )
def _run_sync(self, dataset, udf, iterate=False, *args, **kwargs): def _run_sync_iterate(): self._start_acquisition(dataset, udf) res = super(LiveContext, self)._run_sync( dataset=dataset, udf=udf, iterate=iterate, *args, **kwargs ) return CleanupResultGeneratorProxy( res, callback=lambda: self._end_acquisition(dataset, udf) ) if iterate: return _run_sync_iterate() else: self._start_acquisition(dataset, udf) try: result = super()._run_sync( dataset=dataset, udf=udf, iterate=iterate, *args, **kwargs ) return result finally: self._end_acquisition(dataset, udf)