LiberTEM executors
All access to data and processing is done by an executor that implements the
JobExecutor
interface to run functions and
tasks. The executor specifies where and how processing is done, including
running on a cluster or in a single thread, while being independent of other
parts of LiberTEM. See Executors for an overview from a user’s perspective.
New in version 0.9.0: The executor API is internal. Since choice and parameters of executors are important for integration with Dask and other frameworks, they are now documented. Only the names and creation methods for executors are reasonably stable. The rest of the API is subject to change without notice.
Base classes
- class libertem.common.executor.AsyncJobExecutor[source]
Async version of
JobExecutor
.- async map(fn, iterable)[source]
Run a callable
fn
for each item initerable
, on arbitrary worker nodes- Parameters:
fn (callable) – Function to call. Should accept exactly one parameter.
iterable (Iterable) – Which elements to call the function on.
- async run_each_worker(fn, *args, **kwargs)[source]
Run
fn
on each worker process, and pass*args
,**kwargs
to it.Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.
- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- class libertem.common.executor.JobExecutor[source]
Interface to execute functions on workers.
- ensure_async(pool=None)[source]
Returns an asynchronous executor; by default just wrap into
AsyncAdapter
.
- ensure_sync()[source]
Returns a synchronous executor. In case of a
JobExecutor
we just returnself
; in case ofAsyncJobExecutor
below more work is needed!
- get_available_workers() WorkerSet [source]
Returns a WorkerSet that contains the available workers
Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.
- get_resource_details() list[dict[str, Any]] [source]
Returns a list of dicts with cluster details
- key of the dictionary:
host: ip address or hostname where the worker is running
- map(fn: Callable[[V], T], iterable: Iterable[V]) Iterable[T] [source]
Run a callable
fn
for each element initerable
, on arbitrary worker nodes.- Parameters:
fn (callable) – Function to call. Should accept exactly one parameter.
iterable (Iterable) – Which elements to call the function on.
- modify_buffer_type(buf)[source]
Allow the executor to modify result buffers if necessary
Currently only called for buffers on the main node
- run_each_host(fn, *args, **kwargs)[source]
Run a callable
fn
once on each host, gathering all results into a dict host -> result- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- run_each_partition(partitions, fn, all_nodes=False)[source]
Run fn for all partitions. Yields results in order of completion.
- Parameters:
partitions (List[Partition]) – List of relevant partitions.
fn (callable) – Function to call, will get the partition as first and only argument.
all_nodes (bool) – If all_nodes is True, run the function on all nodes that have this partition, otherwise run on any node that has the partition. If a partition has no location, the function will not be run for that partition if all_nodes is True, otherwise it will be run on any node.
- run_each_worker(fn, *args, **kwargs)[source]
Run
fn
on each worker process, and pass*args
,**kwargs
to it.Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.
- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- Returns:
Return values keyed by worker name (executor-specific)
- Return type:
- run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]
Run the tasks with the given parameters.
- Raises:
JobCancelledError – Either the job was cancelled using
AsyncJobExecutor.cancel()
, or the underlying data source was interrupted.- Parameters:
tasks – The tasks to be run
params_handle ([type]) – A handle for the task parameters, as returned from
scatter()
cancel_id – An identifier which can be used for cancelling all tasks together. The same identifier should be passed to
AsyncJobExecutor.cancel()
- scatter(obj)[source]
Scatter
obj
throughout the cluster- Parameters:
obj – Some kind of Python object. Must be serializable.
- Returns:
Handle for the scattered
obj
- Return type:
handle
- scatter_update(handle, obj)[source]
Update
handle
to point toobj
Must have been scattered before using
scatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.obj – Some kind of Python object. Must be serializable.
- scatter_update_patch(handle, patch)[source]
Update
handle
by remotely callingobj.patch(patch)
on the underlying object.The referenced object must have a
patch
method, and must have been scattered before usingscatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.patch – Some kind of Python object. Must be serializable and must match the
obj.patch()
method.
Dask.Distributed
The DaskJobExecutor
is the default executor
when creating a Context
with no parameters.
- class libertem.executor.dask.DaskJobExecutor(client: Client, is_local: bool = False, lt_resources: bool = None)[source]
Default LiberTEM executor that uses Dask futures.
- Parameters:
- classmethod connect(scheduler_uri, *args, client_kwargs: dict | None = None, **kwargs)[source]
Connect to a remote dask scheduler.
- Parameters:
scheduler_uri (str) – Compatible with the
address
parameter ofdistributed.Client
.client_kwargs (dict or None) – Passed as kwargs to
distributed.Client
.client_kwargs['set_as_default']
is set toFalse
unless specified otherwise to avoid interference with Dask-based workflows. Passclient_kwargs={'set_as_default': True}
to set the Client as the default Dask scheduler and keep it running when the Context closes.*args (Passed to
DaskJobExecutor
.) –**kwargs (Passed to
DaskJobExecutor
.) –
- Returns:
the connected JobExecutor
- Return type:
- classmethod make_local(spec: dict | None = None, cluster_kwargs: dict | None = None, client_kwargs: dict | None = None, preload: tuple[str] | None = None)[source]
Spin up a local dask cluster
- Parameters:
spec – Dask cluster spec, see https://distributed.dask.org/en/stable/api.html#distributed.SpecCluster for more info.
libertem.utils.devices.detect()
allows to detect devices that can be used with LiberTEM, andcluster_spec()
can be used to create aspec
with customized parameters.cluster_kwargs – Passed to
distributed.SpecCluster
.client_kwargs – Passed to
distributed.Client
. Passclient_kwargs={'set_as_default': True}
to set the Client as the default Dask scheduler.preload (Optional[Tuple[str]]) – Passed to
cluster_spec()
ifspec
isNone
.
- Returns:
the connected JobExecutor
- Return type:
- map(fn, iterable)[source]
Run a callable
fn
for each element initerable
, on arbitrary worker nodes.- Parameters:
fn (callable) – Function to call. Should accept exactly one parameter.
iterable (Iterable) – Which elements to call the function on.
- run_each_host(fn, *args, **kwargs)[source]
Run a callable
fn
once on each host, gathering all results into a dict host -> result
- run_each_partition(partitions, fn, all_nodes=False)[source]
Run fn for all partitions. Yields results in order of completion.
- Parameters:
partitions (List[Partition]) – List of relevant partitions.
fn (callable) – Function to call, will get the partition as first and only argument.
all_nodes (bool) – If all_nodes is True, run the function on all nodes that have this partition, otherwise run on any node that has the partition. If a partition has no location, the function will not be run for that partition if all_nodes is True, otherwise it will be run on any node.
- run_each_worker(fn, *args, **kwargs)[source]
Run
fn
on each worker process, and pass*args
,**kwargs
to it.Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.
- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- Returns:
Return values keyed by worker name (executor-specific)
- Return type:
- run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]
Run the tasks with the given parameters.
- Raises:
JobCancelledError – Either the job was cancelled using
AsyncJobExecutor.cancel()
, or the underlying data source was interrupted.- Parameters:
- scatter(obj)[source]
Scatter
obj
throughout the cluster- Parameters:
obj – Some kind of Python object. Must be serializable.
- Returns:
Handle for the scattered
obj
- Return type:
handle
- scatter_update(handle, obj)[source]
Update
handle
to point toobj
Must have been scattered before using
scatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.obj – Some kind of Python object. Must be serializable.
- scatter_update_patch(handle, patch)[source]
Update
handle
by remotely callingobj.patch(patch)
on the underlying object.The referenced object must have a
patch
method, and must have been scattered before usingscatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.patch – Some kind of Python object. Must be serializable and must match the
obj.patch()
method.
- libertem.executor.dask.cluster_spec(cpus: int | Iterable[int], cudas: int | Iterable[int], has_cupy: bool, name: str = 'default', num_service: int = 1, options: dict | None = None, preload: tuple[str, ...] | None = None)[source]
Create a worker specification dictionary for a LiberTEM Dask cluster
The return from this function can be passed to
DaskJobExecutor.make_local(spec=spec)
.This creates a Dask cluster spec with special initializations and resource tags for CPU + GPU processing in LiberTEM. See Specifying executor type, CPU and GPU workers for an example. See https://distributed.dask.org/en/stable/api.html#distributed.SpecCluster for more info on cluster specs.
- Parameters:
cpus (int | Iterable[int]) – IDs for CPU workers as an iterable, or an integer number of workers to create. Currently no pinning is used, i.e. this specifies the total number and identification of workers, not the CPU cores that are used.
cudas (int | Iterable[int]) – IDs for CUDA device workers as an iterable, or an integer number of GPU workers to create. LiberTEM will use the IDs specified or assign round-robin to the available devices. In the iterable case these have to match CUDA device IDs on the system. Specify the same ID multiple times to spawn multiple workers on the same CUDA device.
has_cupy (bool) – Specify if the cluster should signal that it supports GPU-based array programming using CuPy
name – Prefix for the worker names
num_service – Number of additional workers that are reserved for service tasks. Computation tasks will not be scheduled on these workers, which guarantees responsive behavior for file browsing etc.
options – Options to pass through to every worker. See Dask documentation for details
preload – Items to preload on workers in addition to LiberTEM-internal preloads. This can be used to load libraries, for example HDF5 filter plugins before h5py is used. See https://docs.dask.org/en/stable/how-to/customize-initialization.html#preload-scripts for more information.
See also
- libertem.executor.dask.set_worker_log_level(level: str | int, force: bool = False)[source]
Set the dask.distributed log level for any processes spawned within the context manager. If force is False, don’t overwrite any existing environment variable.
- libertem.executor.integration.get_dask_integration_executor()[source]
Query the current Dask scheduler and return a
JobExecutor
that is compatible with it. See https://docs.dask.org/en/stable/scheduling.html for the meaning of the different scheduler types.This can be used to integrate LiberTEM in an existing Dask workflow. This may not achieve optimal LiberTEM performance and will usually not allow GPU processing with LiberTEM, but avoids potential compatibility issues from changing or duplicating the Dask scheduler in an existing workflow.
New in version 0.9.0.
If a
dask.distributed.Client
is set as the scheduler, return aDaskJobExecutor
using thisClient
.If the Dask scheduler is
'threads'
, return aConcurrentJobExecutor
backed by the same thread pool as used by Dask.If the Dask scheduler is
'synchronous'
, return anInlineJobExecutor
which mimics the single-process, single-thread behaviour of Dask.
Inline
- class libertem.executor.inline.InlineJobExecutor(debug=False, inline_threads=None, *args, **kwargs)[source]
Naive JobExecutor that just iterates over partitions and processes them one after another
- Parameters:
- get_available_workers()[source]
Returns a WorkerSet that contains the available workers
Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.
- map(fn, iterable)[source]
Run a callable
fn
for each element initerable
, on arbitrary worker nodes.- Parameters:
fn (callable) – Function to call. Should accept exactly one parameter.
iterable (Iterable) – Which elements to call the function on.
- run_each_host(fn, *args, **kwargs)[source]
Run a callable
fn
once on each host, gathering all results into a dict host -> result- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- run_each_worker(fn, *args, **kwargs)[source]
Run
fn
on each worker process, and pass*args
,**kwargs
to it.Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.
- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- Returns:
Return values keyed by worker name (executor-specific)
- Return type:
- run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]
Run the tasks with the given parameters.
- Raises:
JobCancelledError – Either the job was cancelled using
AsyncJobExecutor.cancel()
, or the underlying data source was interrupted.- Parameters:
- scatter(obj)[source]
Scatter
obj
throughout the cluster- Parameters:
obj – Some kind of Python object. Must be serializable.
- Returns:
Handle for the scattered
obj
- Return type:
handle
- scatter_update(handle, obj)[source]
Update
handle
to point toobj
Must have been scattered before using
scatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.obj – Some kind of Python object. Must be serializable.
- scatter_update_patch(handle, patch)[source]
Update
handle
by remotely callingobj.patch(patch)
on the underlying object.The referenced object must have a
patch
method, and must have been scattered before usingscatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.patch – Some kind of Python object. Must be serializable and must match the
obj.patch()
method.
Concurrent
- class libertem.executor.concurrent.AsyncConcurrentJobExecutor(wrapped=None, *args, **kwargs)[source]
- class libertem.executor.concurrent.ConcurrentJobExecutor(client: Executor, is_local=False)[source]
JobExecutor
that usespython.concurrent.futures
.New in version 0.9.0.
- Parameters:
client (concurrent.futures.Executor) –
is_local (bool) – Shut the client down when the executor closes.
- get_available_workers()[source]
Returns a WorkerSet that contains the available workers
Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.
- classmethod make_local(n_threads: int | None = None)[source]
Create a local ConcurrentJobExecutor backed by a
concurrent.futures.ThreadPoolExecutor
- Parameters:
n_threads (Optional[int]) – The number of threads to spawn in the executor, by default None in which case as many threads as there are CPU cores will be spawned.
- Returns:
the connected JobExecutor
- Return type:
- map(fn, iterable)[source]
Run a callable
fn
for each element initerable
, on arbitrary worker nodes.- Parameters:
fn (callable) – Function to call. Should accept exactly one parameter.
iterable (Iterable) – Which elements to call the function on.
- run_each_host(fn, *args, **kwargs)[source]
Run a callable
fn
once on each host, gathering all results into a dict host -> result
- run_each_worker(fn, *args, **kwargs)[source]
Run
fn
on each worker process, and pass*args
,**kwargs
to it.Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.
- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- Returns:
Return values keyed by worker name (executor-specific)
- Return type:
- run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]
Run the tasks with the given parameters.
- Raises:
JobCancelledError – Either the job was cancelled using
AsyncJobExecutor.cancel()
, or the underlying data source was interrupted.- Parameters:
- scatter(obj)[source]
Scatter
obj
throughout the cluster- Parameters:
obj – Some kind of Python object. Must be serializable.
- Returns:
Handle for the scattered
obj
- Return type:
handle
- scatter_update(handle, obj)[source]
Update
handle
to point toobj
Must have been scattered before using
scatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.obj – Some kind of Python object. Must be serializable.
- scatter_update_patch(handle, patch)[source]
Update
handle
by remotely callingobj.patch(patch)
on the underlying object.The referenced object must have a
patch
method, and must have been scattered before usingscatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.patch – Some kind of Python object. Must be serializable and must match the
obj.patch()
method.
Dask.delayed
See Create Dask arrays with UDFs for more information about
DelayedJobExecutor
.
- class libertem.executor.delayed.DelayedJobExecutor[source]
JobExecutor
that uses dask.delayed to execute tasks.New in version 0.9.0.
Highly experimental at this time!
- get_available_workers()[source]
Returns a WorkerSet that contains the available workers
Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.
- static get_resources_from_udfs(udfs, user_backends=None)[source]
Returns the resources required by the udfs passed as argument, excluding those not in the tuple user_backends
- map(fn, iterable)[source]
Run a callable
fn
for each element initerable
, on arbitrary worker nodes.- Parameters:
fn (callable) – Function to call. Should accept exactly one parameter.
iterable (Iterable) – Which elements to call the function on.
- run_each_host(fn, *args, **kwargs)[source]
Run a callable
fn
once on each host, gathering all results into a dict host -> result- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- run_each_worker(fn, *args, **kwargs)[source]
Run
fn
on each worker process, and pass*args
,**kwargs
to it.Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.
- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- Returns:
Return values keyed by worker name (executor-specific)
- Return type:
- run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler)[source]
Wraps the call task() such that it returns a flat list of results, then unpacks the Delayed return value into the normal
tuple(udf.results for udf in self._udfs)
where the buffers inside udf.results are dask arrays instead of normal np.arrays
Needs a reference to the udfs on the master node so that the results structure can be inferred. This reference is found in self._udfs, which is set with the method:
executor.register_master_udfs(udfs)
called from
DelayedUDFRunner.results_for_dataset_sync()
Pipelined executor
- class libertem.executor.pipelined.PipelinedExecutor(spec: list[WorkerSpec] | None = None, pin_workers: bool = True, startup_timeout: float = 30.0, cleanup_timeout: float = 10.0, early_setup: Callable | None = None)[source]
Multi-process pipelined executor. Useful for live processing using LiberTEM-live if your processing function is not able to keep up with the incoming data stream in a single process, but also works for offline processing.
- Parameters:
spec – Specification for the worker processes - can be generated by
make_spec()
.pin_workers – Pin each CPU worker to a specific CPU, as defined by
os.sched_setaffinity()
. Only works on OSes that implementos.sched_setaffinity()
.startup_timeout – Startup of the executor is cancelled if it doesn’t finish within this limit (in detail: each worker’s startup time is limited by this timeout). In seconds.
cleanup_timeout – When cleaning up using
close()
, give up after this limit. In seconds.early_setup – Callable that will be run as early as possible on each worker process. Useful for custom warmup code or testing.
Note
This executor is not thread-safe - concurrent calls into
run_tasks()
orrun_function()
are not supported.- get_available_workers() WorkerSet [source]
Returns a WorkerSet that contains the available workers
Each worker should correspond to a “worker process”, so if the executor is using multiple processes or threads, each process/thread should be included in this list.
- map(fn, iterable)[source]
Run a callable
fn
for each element initerable
, on arbitrary worker nodes.- Parameters:
fn (callable) – Function to call. Should accept exactly one parameter.
iterable (Iterable) – Which elements to call the function on.
- run_each_host(fn, *args, **kwargs)[source]
Run a callable
fn
once on each host, gathering all results into a dict host -> result- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- run_each_worker(fn, *args, **kwargs)[source]
Run
fn
on each worker process, and pass*args
,**kwargs
to it.Useful, for example, if you need to prepare the environment of each Python interpreter, warm up per-process caches etc.
- Parameters:
fn (callable) – Function to call
*args – Arguments for fn
**kwargs – Keyword arguments for fn
- Returns:
Return values keyed by worker name (executor-specific)
- Return type:
- run_tasks(tasks: Iterable[TaskProtocol], params_handle: Any, cancel_id: Any, task_comm_handler: TaskCommHandler) Generator[tuple[Any, TaskProtocol], None, None] [source]
Run the tasks with the given parameters.
- Raises:
JobCancelledError – Either the job was cancelled using
AsyncJobExecutor.cancel()
, or the underlying data source was interrupted.- Parameters:
- scatter(obj)[source]
Scatter
obj
throughout the cluster- Parameters:
obj – Some kind of Python object. Must be serializable.
- Returns:
Handle for the scattered
obj
- Return type:
handle
- scatter_update(handle: str, obj)[source]
Update
handle
to point toobj
Must have been scattered before using
scatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.obj – Some kind of Python object. Must be serializable.
- scatter_update_patch(handle: str, patch)[source]
Update
handle
by remotely callingobj.patch(patch)
on the underlying object.The referenced object must have a
patch
method, and must have been scattered before usingscatter()
.- Parameters:
handle – The handle, as returned from
scatter()
.patch – Some kind of Python object. Must be serializable and must match the
obj.patch()
method.
- class libertem.executor.pipelined.PipelinedWorkerContext(queue: WorkerQueue, msg_queue: WorkerQueue)[source]
A context object that is made available to the Partition for custom communication to its matching DataSet class (currently uni-directional)
- exception libertem.executor.pipelined.PoolStateError[source]
The worker pool is not in the expected state to perform the requested operation
- namedtuple libertem.executor.pipelined.PoolWorkerInfo(queues, process, spec)[source]
PoolWorkerInfo(queues, process, spec)
- Fields:
queues (
WorkerQueues
) – Alias for field number 0process (
Process
) – Alias for field number 1spec (
WorkerSpec
) – Alias for field number 2
- class libertem.executor.pipelined.WorkerPool(worker_fn: Callable, spec: list[WorkerSpec])[source]
Combination of worker processes and matching request queues, and a single response queue.
Processes are started using the spawn method, so they need to use primitives that are compatible with spawn.
Take care to properly close queues and join workers at shutdown.
Note
We are not using the vanilla
multiprocesing.Pool
here, because we need to coordinate the sending and receiving side for each task, and we need to keep state on the worker, pin processes to cores etc.
- namedtuple libertem.executor.pipelined.WorkerQueues(request, response, message)[source]
WorkerQueues(request, response, message)
- Fields:
request (
WorkerQueue
) – Alias for field number 0response (
WorkerQueue
) – Alias for field number 1message (
WorkerQueue
) – Alias for field number 2
- libertem.executor.pipelined.pipelined_worker(queues: WorkerQueues, pin: bool, spec: WorkerSpec, span_context: SpanContext, early_setup: Callable | None = None)[source]
Main pipelined worker function.
- Parameters:
queues – request and response queues
pin – Whether or not the CPU worker should be pinned to a specific CPU
spec – This worker’s spec, containing name, worker index, device kind etc.
span_context – The tracing span we should attach to for the setup code
early_setup – Function that will be called very early in the setup code, allowing to inject custom functionality or specific warmup code
- libertem.executor.pipelined.schedule_task(task_idx: int, task: TaskProtocol, pool: WorkerPool, scheduler: Scheduler, pool_info_for_worker: dict[Worker, PoolWorkerInfo], selector: Callable[[list[PoolWorkerInfo], WorkerPool, TaskProtocol, int], PoolWorkerInfo]) tuple[int, WorkerQueues] [source]
Returns the worker index and its queues of the worker the given task should be scheduled on.
Workers are first filtered based on requested resources, then the concrete worker is selected by the function specified in the
selector
parameter.
- libertem.executor.pipelined.set_thread_name(name: str)[source]
Set a thread name; mostly useful for using system tools for profiling
- Parameters:
name (str) – The thread name
- libertem.executor.pipelined.worker_loop(queues: WorkerQueues, work_mem: dict, worker_idx: int, env: Environment)[source]
The worker main loop, called when the worker setup is done. Waits for messages on the request queue until a SHUTDOWN message is received.
- Parameters:
work_mem – The worker’s working memory, for accessing scattered data
queues – request and response queues
worker_idx – This worker’s index
env – The Environment for preparing thread counts etc. for the UDF run
- libertem.executor.pipelined.worker_run_function(header, queues, worker_idx)[source]
Called from the worker main loop when a RUN_FUNCTION message is received
- Parameters:
header – The header of the message that was received
queues – request and response queues
worker_idx – This worker’s index
- libertem.executor.pipelined.worker_run_task(header: dict, work_mem: dict, queues: WorkerQueues, worker_idx: int, env: Environment)[source]
Called from the worker main loop when a RUN_TASK message is received
- Parameters:
header – The header of the message that was received
work_mem – The worker’s working memory, for accessing scattered data
queues – request and response queues
worker_idx – This worker’s index
env – The Environment for preparing thread counts etc. for the UDF run