LiberTEM executors

All access to data and processing is done by an executor that implements the JobExecutor interface to run functions and tasks. The executor specifies where and how processing is done, including running on a cluster or in a single thread, while being independent of other parts of LiberTEM. See Executors for an overview from a user’s perspective.

New in version 0.9.0: The executor API is internal. Since choice and parameters of executors are important for integration with Dask and other frameworks, they are now documented. Only the names and creation methods for executors are reasonably stable. The rest of the API is subject to change without notice.

Base classes

class libertem.common.executor.AsyncJobExecutor[source]

Async version of JobExecutor.

async cancel(cancel_id)[source]

cancel execution identified by cancel_id

async close()[source]

Cleanup resources used by this executor, if any.

ensure_async(pool=None)[source]

Returns an asynchronous executor; by default just return self.

async map(fn, iterable)[source]

Run a callable fn for each item in iterable, on arbitrary worker nodes

Parameters
  • fn (callable) – Function to call. Should accept exactly one parameter.

  • iterable (Iterable) – Which elements to call the function on.

async run_each_worker(fn, *args, **kwargs)[source]

Run fn on each worker process, and pass *args, **kwargs to it.

Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

async run_function(fn: Callable[[...], T], *args, **kwargs) T[source]

Run a callable fn on any worker

async run_tasks(tasks, params_handle, cancel_id)[source]

Run a number of Tasks, yielding (result, task) tuples

class libertem.common.executor.JobExecutor[source]

Interface to execute functions on workers.

close()[source]

cleanup resources used by this executor, if any

ensure_async(pool=None)[source]

Returns an asynchronous executor; by default just wrap into AsyncAdapter.

ensure_sync()[source]

Returns a synchronous executor. In case of a JobExecutor we just return self; in case of AsyncJobExecutor below more work is needed!

get_available_workers() WorkerSet[source]

Returns a WorkerSet that contains the available workers

Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.

get_resource_details()[source]

Returns a list of dicts with cluster details

key of the dictionary:

host: ip address or hostname where the worker is running

map(fn: Callable[[V], T], iterable: Iterable[V]) Iterable[T][source]

Run a callable fn for each element in iterable, on arbitrary worker nodes.

Parameters
  • fn (callable) – Function to call. Should accept exactly one parameter.

  • iterable (Iterable) – Which elements to call the function on.

modify_buffer_type(buf)[source]

Allow the executor to modify result buffers if necessary

Currently only called for buffers on the main node

run_each_host(fn, *args, **kwargs)[source]

Run a callable fn once on each host, gathering all results into a dict host -> result

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

run_each_partition(partitions, fn, all_nodes=False)[source]

Run fn for all partitions. Yields results in order of completion.

Parameters
  • partitions (List[Partition]) – List of relevant partitions.

  • fn (callable) – Function to call, will get the partition as first and only argument.

  • all_nodes (bool) – If all_nodes is True, run the function on all nodes that have this partition, otherwise run on any node that has the partition. If a partition has no location, the function will not be run for that partition if all_nodes is True, otherwise it will be run on any node.

run_each_worker(fn, *args, **kwargs)[source]

Run fn on each worker process, and pass *args, **kwargs to it.

Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

Returns

Return values keyed by worker name (executor-specific)

Return type

dict

run_function(fn: Callable[[...], T], *args, **kwargs) T[source]

run a callable fn on any worker

run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]

Run the tasks with the given parameters

Parameters
  • tasks – The tasks to be run

  • params_handle ([type]) – A handle for the task parameters, as returned from scatter()

  • cancel_id – An identifier which can be used for cancelling all tasks together. The same identifier should be passed to AsyncJobExecutor.cancel()

scatter(obj)[source]

Scatter obj throughout the cluster

Parameters

obj – Some kind of Python object or variable

Returns

Handle for the scattered obj

Return type

handle

Dask.Distributed

The DaskJobExecutor is the default executor when creating a Context with no parameters.

class libertem.executor.dask.AsyncDaskJobExecutor(wrapped=None, *args, **kwargs)[source]
class libertem.executor.dask.DaskJobExecutor(client: Client, is_local: bool = False, lt_resources: Optional[bool] = None)[source]

Default LiberTEM executor that uses Dask futures.

Parameters
  • client (distributed.Client) –

  • is_local (bool) – Close the Client and cluster when the executor is closed.

  • lt_resources (bool) – Specify if the cluster has LiberTEM resource tags and environment variables for GPU processing. Autodetected by default.

close()[source]

cleanup resources used by this executor, if any

classmethod connect(scheduler_uri, *args, client_kwargs: Optional[dict] = None, **kwargs)[source]

Connect to a remote dask scheduler.

Parameters
  • scheduler_uri (str) – Compatible with the address parameter of distributed.Client.

  • client_kwargs (dict or None) – Passed as kwargs to distributed.Client. client_kwargs['set_as_default'] is set to False unless specified otherwise to avoid interference with Dask-based workflows. Pass client_kwargs={'set_as_default': True} to set the Client as the default Dask scheduler and keep it running when the Context closes.

  • *args (Passed to DaskJobExecutor.) –

  • **kwargs (Passed to DaskJobExecutor.) –

Returns

the connected JobExecutor

Return type

DaskJobExecutor

classmethod make_local(spec: Optional[dict] = None, cluster_kwargs: Optional[dict] = None, client_kwargs: Optional[dict] = None, preload: Optional[Tuple[str]] = None)[source]

Spin up a local dask cluster

Parameters
Returns

the connected JobExecutor

Return type

DaskJobExecutor

map(fn, iterable)[source]

Run a callable fn for each element in iterable, on arbitrary worker nodes.

Parameters
  • fn (callable) – Function to call. Should accept exactly one parameter.

  • iterable (Iterable) – Which elements to call the function on.

run_each_host(fn, *args, **kwargs)[source]

Run a callable fn once on each host, gathering all results into a dict host -> result

run_each_partition(partitions, fn, all_nodes=False)[source]

Run fn for all partitions. Yields results in order of completion.

Parameters
  • partitions (List[Partition]) – List of relevant partitions.

  • fn (callable) – Function to call, will get the partition as first and only argument.

  • all_nodes (bool) – If all_nodes is True, run the function on all nodes that have this partition, otherwise run on any node that has the partition. If a partition has no location, the function will not be run for that partition if all_nodes is True, otherwise it will be run on any node.

run_each_worker(fn, *args, **kwargs)[source]

Run fn on each worker process, and pass *args, **kwargs to it.

Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

Returns

Return values keyed by worker name (executor-specific)

Return type

dict

run_function(fn, *args, **kwargs)[source]

run a callable fn on any worker

run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]

Run the tasks with the given parameters

Parameters
  • tasks – The tasks to be run

  • params_handle ([type]) – A handle for the task parameters, as returned from scatter()

  • cancel_id – An identifier which can be used for cancelling all tasks together. The same identifier should be passed to AsyncJobExecutor.cancel()

scatter(obj)[source]

Scatter obj throughout the cluster

Parameters

obj – Some kind of Python object or variable

Returns

Handle for the scattered obj

Return type

handle

class libertem.executor.dask.DaskWorkerContext(comms_topic: Optional[str])[source]
libertem.executor.dask.cluster_spec(cpus: Union[int, Iterable[int]], cudas: Union[int, Iterable[int]], has_cupy: bool, name: str = 'default', num_service: int = 1, options: Optional[dict] = None, preload: Optional[Tuple[str]] = None)[source]

Create a worker specification dictionary for a LiberTEM Dask cluster

The return from this function can be passed to DaskJobExecutor.make_local(spec=spec).

This creates a Dask cluster spec with special initializations and resource tags for CPU + GPU processing in LiberTEM. See Customize CPUs and CUDA devices for an example. See http://distributed.dask.org/en/stable/api.html#distributed.SpecCluster for more info on cluster specs.

Parameters
  • cpus (int | Iterable[int]) – IDs for CPU workers as an iterable, or an integer number of workers to create. Currently no pinning is used, i.e. this specifies the total number and identification of workers, not the CPU cores that are used.

  • cudas (int | Iterable[int]) – IDs for CUDA device workers as an iterable, or an integer number of GPU workers to create. LiberTEM will use the IDs specified or assign round-robin to the available devices. In the iterable case these have to match CUDA device IDs on the system. Specify the same ID multiple times to spawn multiple workers on the same CUDA device.

  • has_cupy (bool) – Specify if the cluster should signal that it supports GPU-based array programming using CuPy

  • name – Prefix for the worker names

  • num_service – Number of additional workers that are reserved for service tasks. Computation tasks will not be scheduled on these workers, which guarantees responsive behavior for file browsing etc.

  • options – Options to pass through to every worker. See Dask documentation for details

  • preload – Items to preload on workers in addition to LiberTEM-internal preloads. This can be used to load libraries, for example HDF5 filter plugins before h5py is used. See https://docs.dask.org/en/stable/how-to/customize-initialization.html#preload-scripts for more information.

libertem.executor.integration.get_dask_integration_executor()[source]

Query the current Dask scheduler and return a JobExecutor that is compatible with it. See https://docs.dask.org/en/stable/scheduling.html for the meaning of the different scheduler types.

This can be used to integrate LiberTEM in an existing Dask workflow. This may not achieve optimal LiberTEM performance and will usually not allow GPU processing with LiberTEM, but avoids potential compatibility issues from changing or duplicating the Dask scheduler in an existing workflow.

New in version 0.9.0.

If a dask.distributed.Client is set as the scheduler, return a DaskJobExecutor using this Client.

If the Dask scheduler is 'threads', return a ConcurrentJobExecutor backed by the same thread pool as used by Dask.

If the Dask scheduler is 'synchronous', return an InlineJobExecutor which mimics the single-process, single-thread behaviour of Dask.

Inline

class libertem.executor.inline.InlineJobExecutor(debug=False, inline_threads=None, *args, **kwargs)[source]

Naive JobExecutor that just iterates over partitions and processes them one after another

Parameters
  • debug (bool) – Set this to enable additional serializability checks

  • inline_threads (Optional[int]) – How many fine grained threads should be allowed? Leaving this None will allow one thread per CPU core

get_available_workers()[source]

Returns a WorkerSet that contains the available workers

Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.

map(fn, iterable)[source]

Run a callable fn for each element in iterable, on arbitrary worker nodes.

Parameters
  • fn (callable) – Function to call. Should accept exactly one parameter.

  • iterable (Iterable) – Which elements to call the function on.

run_each_host(fn, *args, **kwargs)[source]

Run a callable fn once on each host, gathering all results into a dict host -> result

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

run_each_worker(fn, *args, **kwargs)[source]

Run fn on each worker process, and pass *args, **kwargs to it.

Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

Returns

Return values keyed by worker name (executor-specific)

Return type

dict

run_function(fn, *args, **kwargs)[source]

run a callable fn on any worker

run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]

Run the tasks with the given parameters

Parameters
  • tasks – The tasks to be run

  • params_handle ([type]) – A handle for the task parameters, as returned from scatter()

  • cancel_id – An identifier which can be used for cancelling all tasks together. The same identifier should be passed to AsyncJobExecutor.cancel()

scatter(obj)[source]

Scatter obj throughout the cluster

Parameters

obj – Some kind of Python object or variable

Returns

Handle for the scattered obj

Return type

handle

class libertem.executor.inline.InlineWorkerContext(queue: SimpleWorkerQueue, msg_queue: SimpleWorkerQueue)[source]

Concurrent

class libertem.executor.concurrent.AsyncConcurrentJobExecutor(wrapped=None, *args, **kwargs)[source]
class libertem.executor.concurrent.ConcurrentJobExecutor(client: Executor, is_local=False)[source]

JobExecutor that uses python.concurrent.futures.

New in version 0.9.0.

Parameters
close()[source]

cleanup resources used by this executor, if any

get_available_workers()[source]

Returns a WorkerSet that contains the available workers

Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.

classmethod make_local()[source]

Create a local ConcurrentJobExecutor backed by a concurrent.futures.ThreadPoolExecutor

Returns

the connected JobExecutor

Return type

ConcurrentJobExecutor

map(fn, iterable)[source]

Run a callable fn for each element in iterable, on arbitrary worker nodes.

Parameters
  • fn (callable) – Function to call. Should accept exactly one parameter.

  • iterable (Iterable) – Which elements to call the function on.

run_each_host(fn, *args, **kwargs)[source]

Run a callable fn once on each host, gathering all results into a dict host -> result

run_each_worker(fn, *args, **kwargs)[source]

Run fn on each worker process, and pass *args, **kwargs to it.

Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

Returns

Return values keyed by worker name (executor-specific)

Return type

dict

run_function(fn, *args, **kwargs)[source]

run a callable fn on any worker

run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]

Run the tasks with the given parameters

Parameters
  • tasks – The tasks to be run

  • params_handle ([type]) – A handle for the task parameters, as returned from scatter()

  • cancel_id – An identifier which can be used for cancelling all tasks together. The same identifier should be passed to AsyncJobExecutor.cancel()

scatter(obj)[source]

Scatter obj throughout the cluster

Parameters

obj – Some kind of Python object or variable

Returns

Handle for the scattered obj

Return type

handle

class libertem.executor.concurrent.ConcurrentWorkerContext(msg_queue: WorkerQueue)[source]

Dask.delayed

See Create Dask arrays with UDFs for more information about DelayedJobExecutor.

class libertem.executor.delayed.DelayedJobExecutor[source]

JobExecutor that uses dask.delayed to execute tasks.

New in version 0.9.0.

Highly experimental at this time!

get_available_workers()[source]

Returns a WorkerSet that contains the available workers

Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.

static get_resources_from_udfs(udfs, user_backends=None)[source]

Returns the resources required by the udfs passed as argument, excluding those not in the tuple user_backends

map(fn, iterable)[source]

Run a callable fn for each element in iterable, on arbitrary worker nodes.

Parameters
  • fn (callable) – Function to call. Should accept exactly one parameter.

  • iterable (Iterable) – Which elements to call the function on.

run_each_host(fn, *args, **kwargs)[source]

Run a callable fn once on each host, gathering all results into a dict host -> result

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

run_each_worker(fn, *args, **kwargs)[source]

Run fn on each worker process, and pass *args, **kwargs to it.

Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

Returns

Return values keyed by worker name (executor-specific)

Return type

dict

run_function(fn, *args, **kwargs)[source]

run a callable fn on any worker

run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]

Wraps the call task() such that it returns a flat list of results, then unpacks the Delayed return value into the normal

tuple(udf.results for udf in self._udfs)

where the buffers inside udf.results are dask arrays instead of normal np.arrays

Needs a reference to the udfs on the master node so that the results structure can be inferred. This reference is found in self._udfs, which is set with the method:

executor.register_master_udfs(udfs)

called from DelayedUDFRunner.results_for_dataset_sync()

scatter(obj)[source]

Scatter obj throughout the cluster

Parameters

obj – Some kind of Python object or variable

Returns

Handle for the scattered obj

Return type

handle

Pipelined executor

class libertem.executor.pipelined.PipelinedExecutor(spec: Optional[List[WorkerSpec]] = None, pin_workers: bool = True, startup_timeout: float = 30.0, cleanup_timeout: float = 10.0, early_setup: Optional[Callable] = None)[source]

Multi-process pipelined executor. Useful for live processing using LiberTEM-live if your processing function is not able to keep up with the incoming data stream in a single process, but also works for offline processing.

Parameters
  • spec – Specification for the worker processes - can be generated by make_spec().

  • pin_workers – Pin each CPU worker to a specific CPU, as defined by os.sched_setaffinity(). Only works on OSes that implement os.sched_setaffinity().

  • startup_timeout – Startup of the executor is cancelled if it doesn’t finish within this limit (in detail: each worker’s startup time is limited by this timeout). In seconds.

  • cleanup_timeout – When cleaning up using close(), give up after this limit. In seconds.

  • early_setup – Callable that will be run as early as possible on each worker process. Useful for custom warmup code or testing.

Note

This executor is not thread-safe - concurrent calls into run_tasks() or run_function() are not supported.

close()[source]

cleanup resources used by this executor, if any

get_available_workers() WorkerSet[source]

Returns a WorkerSet that contains the available workers

Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.

classmethod make_local(**kwargs)[source]

Create a PipelinedExecutor with the default spec.

map(fn, iterable)[source]

Run a callable fn for each element in iterable, on arbitrary worker nodes.

Parameters
  • fn (callable) – Function to call. Should accept exactly one parameter.

  • iterable (Iterable) – Which elements to call the function on.

run_each_host(fn, *args, **kwargs)[source]

Run a callable fn once on each host, gathering all results into a dict host -> result

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

run_each_worker(fn, *args, **kwargs)[source]

Run fn on each worker process, and pass *args, **kwargs to it.

Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.

Parameters
  • fn (callable) – Function to call

  • *args – Arguments for fn

  • **kwargs – Keyword arguments for fn

Returns

Return values keyed by worker name (executor-specific)

Return type

dict

run_function(fn: Callable[[...], T], *args, **kwargs) T[source]

run a callable fn on any worker

run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler) Generator[Tuple[Any, TaskProtocol], None, None][source]

Run the tasks with the given parameters

Parameters
  • tasks – The tasks to be run

  • params_handle ([type]) – A handle for the task parameters, as returned from scatter()

  • cancel_id – An identifier which can be used for cancelling all tasks together. The same identifier should be passed to AsyncJobExecutor.cancel()

scatter(obj)[source]

Scatter obj throughout the cluster

Parameters

obj – Some kind of Python object or variable

Returns

Handle for the scattered obj

Return type

handle

class libertem.executor.pipelined.PipelinedWorkerContext(queue: WorkerQueue, msg_queue: WorkerQueue)[source]

A context object that is made available to the Partition for custom communication to its matching DataSet class (currently uni-directional)

class libertem.executor.pipelined.PoolWorkerInfo(queues, process, spec)[source]
process: Process

Alias for field number 1

queues: WorkerQueues

Alias for field number 0

spec: WorkerSpec

Alias for field number 2

class libertem.executor.pipelined.WorkerPool(worker_fn: Callable, spec: List[WorkerSpec])[source]

Combination of worker processes and matching request queues, and a single response queue.

Processes are started using the spawn method, so they need to use primitives that are compatible with spawn.

Take care to properly close queues and join workers at shutdown.

Note

We are not using the vanilla multiprocesing.Pool here, because we need to coordinate the sending and receiving side for each task, and we need to keep state on the worker, pin processes to cores etc.

class libertem.executor.pipelined.WorkerQueues(request, response, message)[source]
message: WorkerQueue

Alias for field number 2

request: WorkerQueue

Alias for field number 0

response: WorkerQueue

Alias for field number 1

class libertem.executor.pipelined.WorkerSpec(_typename, _fields=None, /, **kwargs)[source]
libertem.executor.pipelined.pipelined_worker(queues: WorkerQueues, pin: bool, spec: WorkerSpec, span_context: SpanContext, early_setup: Optional[Callable] = None)[source]

Main pipelined worker function.

Parameters
  • queues – request and response queues

  • pin – Whether or not the CPU worker should be pinned to a specific CPU

  • spec – This worker’s spec, containing name, worker index, device kind etc.

  • span_context – The tracing span we should attach to for the setup code

  • early_setup – Function that will be called very early in the setup code, allowing to inject custom functionality or specific warmup code

libertem.executor.pipelined.set_thread_name(name: str)[source]

Set a thread name; mostly useful for using system tools for profiling

Parameters

name (str) – The thread name

libertem.executor.pipelined.worker_loop(queues: WorkerQueues, work_mem: Dict, worker_idx: int, env: Environment)[source]

The worker main loop, called when the worker setup is done. Waits for messages on the request queue until a SHUTDOWN message is received.

Parameters
  • work_mem – The worker’s working memory, for accessing scattered data

  • queues – request and response queues

  • worker_idx – This worker’s index

  • env – The Environment for preparing thread counts etc. for the UDF run

libertem.executor.pipelined.worker_run_function(header, queues, worker_idx)[source]

Called from the worker main loop when a RUN_FUNCTION message is received

Parameters
  • header – The header of the message that was received

  • queues – request and response queues

  • worker_idx – This worker’s index

libertem.executor.pipelined.worker_run_task(header: Dict, work_mem: Dict, queues: WorkerQueues, worker_idx: int, env: Environment)[source]

Called from the worker main loop when a RUN_TASK message is received

Parameters
  • header – The header of the message that was received

  • work_mem – The worker’s working memory, for accessing scattered data

  • queues – request and response queues

  • worker_idx – This worker’s index

  • env – The Environment for preparing thread counts etc. for the UDF run