Source code for eclypse.workflow.trigger.trigger

"""Module for Trigger class.

A trigger abstracts the logic for when an EclypseEvent should be fired
in the simulation workflow.

Available triggers include:
- Trigger: Base class for all triggers (can be subclassed).
- PeriodicTrigger: Fires at regular intervals.
- ScheduledTrigger: Fires at specific times.
- RandomTrigger: Fires based on a probability.
"""

from __future__ import annotations

import os
import random
from abc import (
    ABC,
    abstractmethod,
)
from datetime import (
    datetime,
    timedelta,
)
from typing import (
    TYPE_CHECKING,
)

from eclypse.utils.constants import RND_SEED

if TYPE_CHECKING:
    from eclypse.workflow.event.event import EclypseEvent


[docs] class Trigger(ABC): """Base class for triggers."""
[docs] @abstractmethod def trigger(self, _: EclypseEvent | None = None) -> bool: """Check if the trigger should fire. Args: trigger_event (EclypseEvent | None): The event that triggered the check. Returns: bool: True if the trigger should fire, False otherwise. """
[docs] def prepare(self): """Prepare the trigger for use. This method can be overridden in subclasses to perform any necessary initialization before the trigger is used. """ return None
[docs] def reset(self): """Reset the trigger state. This method can be overridden in subclasses to reset any internal state of the trigger. """ return None
def __repr__(self) -> str: """Return a string representation of the trigger.""" return f"{self.__class__.__name__}"
[docs] class PeriodicTrigger(Trigger): """A trigger that fires periodically."""
[docs] def __init__(self, trigger_every_ms: float = 0.0): """Initialize the periodic trigger. Args: trigger_every_ms (float): The interval in milliseconds at which the trigger should fire. Defaults to 0.0, which means it will not trigger. """ super().__init__() self.trigger_every_ms = timedelta(milliseconds=trigger_every_ms) self.last_exec_time: datetime | None = None self.first_trigger: bool = False
[docs] def trigger(self, _: EclypseEvent | None = None) -> bool: """Check if the trigger should fire based on its interval.""" # Implement logic to check the time since the last trigger if self.last_exec_time is None: if not self.first_trigger: self.first_trigger = True return True return False elapsed_time = datetime.now() - self.last_exec_time return elapsed_time >= self.trigger_every_ms
[docs] def reset(self): """Reset the trigger state.""" self.last_exec_time = datetime.now()
def __repr__(self) -> str: """Return a string representation of the periodic trigger.""" ms = self.trigger_every_ms.microseconds // 1000 return f"PeriodicTrigger(trigger_every_ms={ms})"
[docs] class ScheduledTrigger(Trigger): """A trigger that fires at scheduled times."""
[docs] def __init__( self, scheduled_timedelta: timedelta | list[timedelta] | None = None, ): """Initialize the scheduled trigger. Args: scheduled_timedelta (timedelta | list[timedelta] | None): Time(s) when the trigger should fire. Defaults to None, which means no scheduled times. """ if scheduled_timedelta: self._scheduled_timedelta = ( scheduled_timedelta if isinstance(scheduled_timedelta, list) else [scheduled_timedelta] ) else: self._scheduled_timedelta = [] self._init_time: datetime | None = None self._scheduled_times: list[datetime] = []
[docs] def prepare(self): """Prepare the trigger by setting the initial time.""" self._init_time = datetime.now() self._scheduled_timedelta = sorted(self._scheduled_timedelta) self._scheduled_times = [ self._init_time + delta for delta in self._scheduled_timedelta ]
[docs] def trigger(self, _: EclypseEvent | None = None) -> bool: """Return True if the current call count matches a scheduled time.""" if self._init_time is None: raise RuntimeError( "Trigger not initialised. Call prepare() before trigger()." ) if not self._scheduled_times: return False current_time = datetime.now() if current_time >= self._scheduled_times[0]: # Remove the first scheduled time if it has passed self._scheduled_times.pop(0) return True return False
def __repr__(self) -> str: """Return a string representation of the scheduled trigger.""" return f"ScheduledTrigger(scheduled_times={self._scheduled_timedelta})"
[docs] class RandomTrigger(Trigger): """A trigger that fires randomly."""
[docs] def __init__(self, probability: float = 0.5, seed: int | None = None): """Initialize the random trigger. Args: probability (float): The probability of the trigger firing. Defaults to 0.5. seed (int | None): An optional seed for the random number generator. Defaults to None, which means that the random number generator gets the RND_SEED from the simulator. """ self.probability = probability self.seed = seed self.rnd = None
[docs] def prepare(self): """Initialize the random number generator.""" self.seed = int(os.getenv(RND_SEED)) if self.seed is None else self.seed self.rnd = random.Random(self.seed)
[docs] def trigger(self, _: EclypseEvent | None = None) -> bool: """Check if the trigger should fire based on its probability.""" if self.rnd is None: raise RuntimeError( "Trigger not initialised. Call prepare() before trigger()." ) return self.rnd.random() < self.probability
def __repr__(self) -> str: """Return a string representation of the random trigger.""" return f"RandomTrigger(probability={self.probability})"