Source code for libertem.executor.base

import functools
import asyncio
from typing import Callable, TYPE_CHECKING, TypeVar, Optional

from contextlib import asynccontextmanager

from libertem.common.async_utils import (
    adjust_event_loop_policy, sync_to_async, async_generator_eager
)
from libertem.common.executor import (
    JobExecutor, AsyncJobExecutor,
    Environment, GenericTaskProtocol, GPUSpec,
)
from libertem.common.tracing import TracedThreadPoolExecutor
from libertem.common.exceptions import ExecutorSpecException


if TYPE_CHECKING:
    from libertem.udf.base import UDFRunner
    from libertem.common.snooze import SnoozeManager

T = TypeVar('T')


class ResourceError(RuntimeError):
    """
    Thrown when there is a resource mismatch, for example if the task requests
    resources that are not available in the worker pool.
    """
    pass


def make_canonical(main_gpu: GPUSpec) -> Optional[int]:
    '''
    Handle default cases when specifying a main GPU

    .. versionadded:: 0.16.0

    Parameters
    ----------

    main_gpu : int or bool, optional

        GPU spec to use for GPU processing on the main process.

        True:
            Use any available GPU, throw an error if none are available.
        int:
            Specify GPU ID to use, throw an error if it is not present.
        False:
            Don't use GPU on the main process.
        None:
            Default behavior. Currently activates GPU processing if a GPU is
            available to catch any potential issues with this feature.

    Returns
    -------

    int or None
        GPU ID or :code:`None` for no GPU processing
    '''
    if main_gpu is False:
        main_gpu = None
    elif main_gpu is True:
        from libertem.utils.devices import detect
        detect_out = detect()
        if detect_out['cudas'] and detect_out['has_cupy']:
            main_gpu = detect_out['cudas'][0]
        else:
            raise ExecutorSpecException(
                'Cannot specify main GPU as no GPUs detected or no CuPy found.'
            )
    elif main_gpu is None:
        # Activate if available
        from libertem.utils.devices import detect
        detect_out = detect()
        if detect_out['cudas'] and detect_out['has_cupy']:
            main_gpu = detect_out['cudas'][0]
    # check last because instanceof(<bool>, int) is True
    elif isinstance(main_gpu, int):
        # Verify it is a valid GPU ID
        from libertem.utils.devices import detect
        detect_out = detect()
        if main_gpu not in detect_out['cudas'] or not detect_out['has_cupy']:
            raise ExecutorSpecException(
                f"Main GPU {main_gpu} not detected, "
                f"only found {detect_out['cudas']}."
            )
    else:
        raise ValueError(f'Invalid type for main_process_gpu: {type(main_gpu)}')
    return main_gpu


[docs] class BaseJobExecutor(JobExecutor): ''' Base class for LiberTEM executors Contains a generic implementation for :meth:`libertem.common.executor.JobExecutor.run_process_local` for re-use in executors that don't implement a specialized version. Parameters ---------- main_process_gpu : int or None, optional GPU to set in the :class:`~libertem.common.executor.Environment` supplied to the task in :meth:`run_process_local`. ''' def __init__(self, main_process_gpu: Optional[int] = None): self._main_process_gpu = main_process_gpu def get_udf_runner(self) -> type['UDFRunner']: from libertem.udf.base import UDFRunner return UDFRunner
[docs] def ensure_async(self, pool=None): """ Returns an asynchronous executor; by default just wrap into `AsyncAdapter`. """ return AsyncAdapter(wrapped=self, pool=pool)
[docs] def run_process_local(self, task: GenericTaskProtocol, args=(), kwargs: Optional[dict] = None): """ run a callable :code:`fn` in the context of the current process. """ if kwargs is None: kwargs = {} env = self._get_local_env() return task(args, kwargs, environment=env)
def _get_local_env(self): return Environment( threads_per_worker=None, threaded_executor=True, gpu_id=self._main_process_gpu )
class AsyncAdapter(AsyncJobExecutor): """ Wrap a synchronous JobExecutor and allow to use it as AsyncJobExecutor. All methods are converted to async and executed in a separate thread. """ def __init__(self, wrapped: JobExecutor, pool=None): self._wrapped = wrapped if pool is None: pool = AsyncAdapter.make_pool() self._pool = pool @classmethod def make_pool(cls): pool = TracedThreadPoolExecutor(1) pool.submit(adjust_event_loop_policy).result() pool.submit(lambda: asyncio.set_event_loop(asyncio.new_event_loop())).result() return pool def ensure_sync(self): return self._wrapped @asynccontextmanager async def scatter(self, obj): try: res = await sync_to_async(self._wrapped.scatter.__enter__, self._pool) yield res finally: exit_fn = functools.partial( self._wrapped.scatter.__exit__, None, None, None, # FIXME: exc_type, exc_value, traceback? ) await sync_to_async(exit_fn, self._pool) async def run_tasks(self, tasks, params_handle, cancel_id): """ run a number of Tasks """ gen = self._wrapped.run_tasks(tasks, params_handle, cancel_id) agen = async_generator_eager(gen, self._pool) async for i in agen: yield i async def run_function(self, fn: Callable[..., T], *args, **kwargs) -> T: """ run a callable :code:`fn` on an arbitrary worker node """ fn_with_args = functools.partial(self._wrapped.run_function, fn, *args, **kwargs) return await sync_to_async(fn_with_args, self._pool) async def run_each_partition(self, partitions, fn, all_nodes=False): fn_with_args = functools.partial( self._wrapped.run_each_partition, partitions, fn, all_nodes ) return await sync_to_async(fn_with_args, self._pool) async def map(self, fn, iterable): """ Run a callable :code:`fn` for each item in iterable, on arbitrary worker nodes Parameters ---------- fn : callable Function to call. Should accept exactly one parameter. iterable : Iterable Which elements to call the function on. """ fn_with_args = functools.partial( self._wrapped.map, fn, iterable, ) return await sync_to_async(fn_with_args, self._pool) async def run_each_host(self, fn, *args, **kwargs): fn_with_args = functools.partial(self._wrapped.run_each_host, fn, *args, **kwargs) return await sync_to_async(fn_with_args, self._pool) async def run_each_worker(self, fn, *args, **kwargs): fn_with_args = functools.partial(self._wrapped.run_each_worker, fn, *args, **kwargs) return await sync_to_async(fn_with_args, self._pool) async def close(self): """ Cleanup resources used by this executor, if any, including the wrapped executor. """ res = await sync_to_async(self._wrapped.close, self._pool) if self._pool: self._pool.shutdown() return res async def cancel(self, cancel_id): """ cancel execution identified by cancel_id """ return await sync_to_async( functools.partial(self._wrapped.cancel, cancel_id=cancel_id), self._pool ) async def get_available_workers(self): return await sync_to_async(self._wrapped.get_available_workers) async def get_resource_details(self): return await sync_to_async(self._wrapped.get_resource_details) def get_udf_runner(self) -> type['UDFRunner']: from libertem.udf.base import UDFRunner return UDFRunner @property def snooze_manager(self) -> Optional['SnoozeManager']: return self._wrapped.snooze_manager