Source code for libertem.common.snooze

import time
import weakref
from enum import Enum
import functools
import threading
import contextlib
from typing import TYPE_CHECKING, Optional
from collections.abc import Callable

if TYPE_CHECKING:
    from libertem.common.executor import JobExecutor
    from libertem.common.subscriptions import SubscriptionManager


class SnoozeMessage(Enum):
    SNOOZE = "snooze"
    UNSNOOZE_START = "unsnooze_start"
    UNSNOOZE_DONE = "unsnooze_done"
    UPDATE_ACTIVITY = "update_activity"


class SnoozeManager:
    '''
    Monitor an executor for activity, and call its scale down
    method to free resources if inactive for longer than a
    specified timeout

    Executors can decorate methods with the :function:`~libertem.common.snooze.keep_alive`
    decorator in order to prevent snoozing during execution,
    and to trigger automatic unsnoozing when these methods are called.

    Parameters
    ----------

    up : Callable[[], None]
        Method to call to scale up the executor
    down : Callable[[], None]
        Method to call to scale down the executor
    timeout : float
        The inactivity period before triggering snoozing (seconds)
    subscriptions : SubscriptionManager | None
        An instance of SubscriptionManager used to notify
        when snooze / unsnooze events are happening.
    '''
    def __init__(
        self,
        *,
        up: Callable[[], None],
        down: Callable[[], None],
        timeout: float,  # seconds
        subscriptions: Optional['SubscriptionManager'] = None,
    ):
        if timeout <= 0:
            raise ValueError("Must supply a positive snooze timeout")
        self.scale_up = weakref.WeakMethod(up)
        self.scale_down = weakref.WeakMethod(down)
        if subscriptions is not None:
            self.subscriptions = weakref.ref(subscriptions)
        else:
            self.subscriptions = lambda: None
        self.keep_alive = 0
        self.last_activity: float
        self._update_last_activity()
        self.is_snoozing = False
        self._snooze_lock = threading.Lock()
        self._snooze_timeout = timeout
        self._snooze_check_interval = min(
            30.0,
            self._snooze_timeout and (self._snooze_timeout * 0.1) or 30.0,
        )
        # sentinel value, used during shutdown to stop the snooze task
        self._snooze_task_continue = threading.Event()
        self._snooze_task_continue.set()
        self._snooze_task = threading.Thread(
            target=self._snooze_check_task,
            daemon=True,
        )
        self._snooze_task.start()

    def _update_last_activity(self):
        self.last_activity = time.monotonic()
        subs = self.subscriptions()
        if subs is not None:
            subs.send(SnoozeMessage.UPDATE_ACTIVITY, {})

    @contextlib.contextmanager
    def in_use(self):
        self._update_last_activity()
        self.keep_alive += 1
        try:
            yield
        finally:
            self.keep_alive = max(0, self.keep_alive - 1)
            self._update_last_activity()

    def snooze(self):
        if self.keep_alive > 0:
            # always no-op if something is using the executor
            return
        with self._snooze_lock:
            if self.is_snoozing:
                # Wait for lock to check this as there could be a
                # slow unsnooze action in progress which would leave things
                # in a bad state if we no-op too early
                return
            scale_down = self.scale_down()
            if scale_down is None:
                # must be destroying the executor, do nothing
                return
            subs = self.subscriptions()
            if subs is not None:
                subs.send(SnoozeMessage.SNOOZE, {})
            # Set the is_snoozing flag early, in case of a long call to scale_down()
            self.is_snoozing = True
            scale_down()

    def unsnooze(self):
        with self.in_use():
            with self._snooze_lock:
                if not self.is_snoozing:
                    # Wait for lock to check this as there could be a
                    # slow snooze action in progress which would leave things
                    # in a bad state if we no-op too early
                    return
                scale_up = self.scale_up()
                if scale_up is None:
                    # must be destroying the executor, do nothing
                    return
                subs = self.subscriptions()
                if subs is not None:
                    subs.send(SnoozeMessage.UNSNOOZE_START, {})
                scale_up()
                # Unset the is_snoozing flag only *after* we complete scale_up()
                self.is_snoozing = False
                if subs is not None:
                    subs.send(SnoozeMessage.UNSNOOZE_DONE, {})

    def close(self):
        if self._snooze_task is not None:
            self._snooze_task_continue.clear()

    def _snooze_check_task(self):
        """
        Periodically check if we need to snooze the executor
        """
        while self._snooze_task_continue.is_set():
            time.sleep(self._snooze_check_interval)
            if self.scale_down() is None:
                # Executor is likely being torn down
                break
            if self.is_snoozing or self.keep_alive > 0:
                continue
            since_last_activity = time.monotonic() - self.last_activity
            if since_last_activity > self._snooze_timeout:
                self.snooze()


def keep_alive(fn):

    @functools.wraps(fn)
    def wrapped(self: 'JobExecutor', *args, **kwargs):
        manager = self.snooze_manager
        if manager is not None:
            manager.unsnooze()
            with manager.in_use():
                return fn(self, *args, **kwargs)
        else:
            return fn(self, *args, **kwargs)

    return wrapped


def keep_alive_context(fn):

    @contextlib.contextmanager
    def wrapped(self: 'JobExecutor', *args, **kwargs):
        manager = self.snooze_manager
        if manager is not None:
            manager.unsnooze()
            with manager.in_use():
                with fn(self, *args, **kwargs) as mgr:
                    yield mgr
        else:
            with fn(self, *args, **kwargs) as mgr:
                yield mgr

    return wrapped