Source code for eclypse.workflow.trigger.trigger
"""Module for Trigger class.
A trigger abstracts the logic for when an EclypseEvent should be fired
in the simulation workflow.
Available triggers include:
- Trigger: Base class for all triggers (can be subclassed).
- PeriodicTrigger: Fires at regular intervals.
- ScheduledTrigger: Fires at specific times.
- RandomTrigger: Fires based on a probability.
"""
from __future__ import annotations
import os
import random
from abc import (
ABC,
abstractmethod,
)
from datetime import (
datetime,
timedelta,
)
from typing import (
TYPE_CHECKING,
)
from eclypse.utils.constants import RND_SEED
if TYPE_CHECKING:
from eclypse.workflow.event.event import EclypseEvent
[docs]
class Trigger(ABC):
"""Base class for triggers."""
[docs]
@abstractmethod
def trigger(self, _: EclypseEvent | None = None) -> bool:
"""Check if the trigger should fire.
Args:
trigger_event (EclypseEvent | None): The event that triggered the check.
Returns:
bool: True if the trigger should fire, False otherwise.
"""
[docs]
def prepare(self):
"""Prepare the trigger for use.
This method can be overridden in subclasses to perform any necessary
initialization before the trigger is used.
"""
return None
[docs]
def reset(self):
"""Reset the trigger state.
This method can be overridden in subclasses to reset any internal state of the
trigger.
"""
return None
def __repr__(self) -> str:
"""Return a string representation of the trigger."""
return f"{self.__class__.__name__}"
[docs]
class PeriodicTrigger(Trigger):
"""A trigger that fires periodically."""
[docs]
def __init__(self, trigger_every_ms: float = 0.0):
"""Initialize the periodic trigger.
Args:
trigger_every_ms (float): The interval in milliseconds at which the trigger
should fire. Defaults to 0.0, which means it will not trigger.
"""
super().__init__()
self.trigger_every_ms = timedelta(milliseconds=trigger_every_ms)
self.last_exec_time: datetime | None = None
self.first_trigger: bool = False
[docs]
def trigger(self, _: EclypseEvent | None = None) -> bool:
"""Check if the trigger should fire based on its interval."""
# Implement logic to check the time since the last trigger
if self.last_exec_time is None:
if not self.first_trigger:
self.first_trigger = True
return True
return False
elapsed_time = datetime.now() - self.last_exec_time
return elapsed_time >= self.trigger_every_ms
[docs]
def reset(self):
"""Reset the trigger state."""
self.last_exec_time = datetime.now()
def __repr__(self) -> str:
"""Return a string representation of the periodic trigger."""
ms = self.trigger_every_ms.microseconds // 1000
return f"PeriodicTrigger(trigger_every_ms={ms})"
[docs]
class ScheduledTrigger(Trigger):
"""A trigger that fires at scheduled times."""
[docs]
def __init__(
self,
scheduled_timedelta: timedelta | list[timedelta] | None = None,
):
"""Initialize the scheduled trigger.
Args:
scheduled_timedelta (timedelta | list[timedelta] | None):
Time(s) when the trigger should fire.
Defaults to None, which means no scheduled times.
"""
if scheduled_timedelta:
self._scheduled_timedelta = (
scheduled_timedelta
if isinstance(scheduled_timedelta, list)
else [scheduled_timedelta]
)
else:
self._scheduled_timedelta = []
self._init_time: datetime | None = None
self._scheduled_times: list[datetime] = []
[docs]
def prepare(self):
"""Prepare the trigger by setting the initial time."""
self._init_time = datetime.now()
self._scheduled_timedelta = sorted(self._scheduled_timedelta)
self._scheduled_times = [
self._init_time + delta for delta in self._scheduled_timedelta
]
[docs]
def trigger(self, _: EclypseEvent | None = None) -> bool:
"""Return True if the current call count matches a scheduled time."""
if self._init_time is None:
raise RuntimeError(
"Trigger not initialised. Call prepare() before trigger()."
)
if not self._scheduled_times:
return False
current_time = datetime.now()
if current_time >= self._scheduled_times[0]:
# Remove the first scheduled time if it has passed
self._scheduled_times.pop(0)
return True
return False
def __repr__(self) -> str:
"""Return a string representation of the scheduled trigger."""
return f"ScheduledTrigger(scheduled_times={self._scheduled_timedelta})"
[docs]
class RandomTrigger(Trigger):
"""A trigger that fires randomly."""
[docs]
def __init__(self, probability: float = 0.5, seed: int | None = None):
"""Initialize the random trigger.
Args:
probability (float): The probability of the trigger firing. Defaults to 0.5.
seed (int | None): An optional seed for the random number generator.
Defaults to None, which means that the random number generator gets
the RND_SEED from the simulator.
"""
self.probability = probability
self.seed = seed
self.rnd = None
[docs]
def prepare(self):
"""Initialize the random number generator."""
self.seed = int(os.getenv(RND_SEED)) if self.seed is None else self.seed
self.rnd = random.Random(self.seed)
[docs]
def trigger(self, _: EclypseEvent | None = None) -> bool:
"""Check if the trigger should fire based on its probability."""
if self.rnd is None:
raise RuntimeError(
"Trigger not initialised. Call prepare() before trigger()."
)
return self.rnd.random() < self.probability
def __repr__(self) -> str:
"""Return a string representation of the random trigger."""
return f"RandomTrigger(probability={self.probability})"