Source code for eclypse.workflow.event.event

# pylint: disable=protected-access
"""Module for the EclypseEvent class.

The EclypseEvent class is used to define the events that can be triggered in the
simulation. The events can be periodic or non-periodic (static), and can be triggered by
other events. They can also have a timeout, a maximum number of calls and can be used to
trigger events in the simulation.
"""

from __future__ import annotations

from collections import defaultdict
from itertools import product
from typing import (
    TYPE_CHECKING,
    Any,
)

from eclypse.remote import ray_backend
from eclypse.utils._logging import (
    format_log_kv,
    logger,
)
from eclypse.utils.constants import (
    DRIVING_EVENT,
    MAX_FLOAT,
)
from eclypse.workflow.trigger.bucket import TriggerBucket

from .role import EventRole

if TYPE_CHECKING:
    from collections.abc import (
        Callable,
        Generator,
    )

    from ray.actor import ActorHandle

    from eclypse.graph import Infrastructure
    from eclypse.placement import (
        Placement,
        PlacementView,
    )
    from eclypse.simulation._simulator.local import Simulator
    from eclypse.utils._logging import Logger
    from eclypse.utils.types import (
        EventType,
        TriggerCondition,
    )
    from eclypse.workflow.trigger import Trigger


[docs] class EclypseEvent: """An event in the simulation."""
[docs] def __init__( self, name: str, event_type: EventType | None = None, triggers: list[Trigger] | None = None, trigger_condition: TriggerCondition = "any", max_triggers: int = int(MAX_FLOAT), role: EventRole = EventRole.EVENT, report: str | list[str] | None = None, remote: bool = False, verbose: bool = False, ): """Initialize the event. Args: name (str): The name of the event. event_type (EventType): The type of the event. Defaults to None. triggers (list[Trigger] | None): A list of triggers that can trigger the event. Defaults to None. trigger_condition (str | None): The condition for the triggers to fire the event. If "any", the event fires if any trigger is active. If "all", the event fires only if all triggers are active. Defaults to "any". max_triggers (int | None): The maximum number of times the trigger can be called. Defaults to no limit (MAX_FLOAT). role (EventRole): The role of the event in the workflow. Defaults to EventRole.EVENT. report (str | list[str] | None): The type of report to generate for the event. Defaults to DEFAULT_REPORT_TYPE. remote (bool): If True, the event will be executed remotely. Defaults to False. verbose (bool): If True, the event will log its firing. Defaults to False. Raises: ValueError: The event must have a name. """ if not name: raise ValueError("The event must have a name.") self._name: str = name self.trigger_bucket = TriggerBucket( triggers=triggers if triggers is not None else [], condition=trigger_condition, max_triggers=max_triggers, ) self.trigger_bucket.event = self self._role = role self.type = event_type self._remote = remote self._verbose = verbose self._simulator: Simulator | None = None self._data: Any = {} if report: self._report = [report] if isinstance(report, str) else report else: self._report = []
def __call__(self, *args, **kwargs) -> Any: """The event logic. Must be implemented by the user by either decorating a function or a class with a __call__ method, or by subclassing the EclypseEvent class and implementing the __call__ method. Raises: NotImplementedError: The event logic is not implemented. """ raise NotImplementedError( "The event logic must be implemented in two ways: 1. decorate " "a function or", "a class with a __call__ method; 2. subclass the EclypseEvent class and", "implement the __call__ method.", ) def _call_by_type(self, trigger_event: EclypseEvent | None) -> Any: """Execute the event function according to the type of the event. Args: trigger_event (EclypseEvent | None): The event that triggered this event.\ Defaults to None. Returns: Any: The event payload. """ result_fn = None event, sim = trigger_event, self.simulator event_data = event.data if event and isinstance(event.data, dict) else {} placements, infr, pv = ( sim.placements, sim.infrastructure, sim.placement_view, ) if self.type is None or self.type == "simulation": result_fn = self(event) if event else self() if self.type == "application": result_fn = _application_fn( self.__call__, placements, infr, flatten=self.is_post_event, **event_data, ) if self.type == "service": if not self._remote: result_fn = _service_fn( self.__call__, placements, infr, flatten=self.is_post_event, **event_data, ) else: self.detach_simulator() result_fn = _remote_service_fn( self.__call__, placements, infr, flatten=self.is_post_event, **event_data, ) self.attach_simulator(sim) if self.type == "interaction": result_fn = _interaction_fn( self.__call__, placements, infr, flatten=self.is_post_event, **event_data, ) if self.type == "infrastructure": result_fn = _infrastructure_fn(self.__call__, infr, pv, **event_data) if self.type == "node": result_fn = _node_fn( self.__call__, placements, infr, pv, flatten=self.is_post_event, **event_data, ) if self.type == "link": result_fn = _link_fn( self.__call__, placements, infr, pv, flatten=self.is_post_event, **event_data, ) return result_fn def _trigger(self, trigger_event: EclypseEvent | None = None) -> bool: """Trigger the event, if possible. Returns: bool: True if the event was triggered, False otherwise. """ condition = self.trigger_bucket.trigger(trigger_event=trigger_event) if self._verbose and condition: self.simulator.logger.debug( "Event triggered | " + format_log_kv( name=self.name, type=self.type, triggers=self.n_triggers, ) ) return condition def _fire(self, trigger_event: EclypseEvent | None = None) -> Any: """Fire the event. Args: trigger_event (EclypseEvent | None): The event that triggered\ this event. Defaults to None. Raises: ValueError: The event must be associated to a simulator to be fired. ValueError: The event function must return None or a dict. """ if self._simulator is None: raise ValueError("The event must be associated to a simulator to be fired.") if self._verbose: self.simulator.logger.debug( "Event fired | " + format_log_kv( name=self.name, type=self.type, calls=self.n_calls, ) ) event_data = self._call_by_type(trigger_event) self._data = event_data if event_data is not None else {} self.trigger_bucket.reset() if self.name == DRIVING_EVENT: self.simulator.logger.log( "ECLYPSE", "Simulation step | " + format_log_kv(step=self.n_calls), ) @property def name(self) -> str: """The type of the event. Returns: EventType: The type of the event. """ return self._name @property def n_calls(self) -> int: """Return the number of iterations of the simulation. Returns: int: The number of iterations. """ return self.trigger_bucket._n_executions @property def n_triggers(self) -> int: """Return the number of times the event has been triggered. Returns: int: The number of times the event has been triggered. """ return self.trigger_bucket._n_triggers @property def triggers(self) -> list[Trigger]: """The triggers associated with the event. Returns: list[Trigger]: The triggers associated with the event. """ return self.trigger_bucket.triggers @property def simulator(self) -> Simulator: """The simulator associated with the event. Returns: Simulator: The simulator associated with the event. """ if self._simulator is None: raise ValueError("The event must be associated with a simulator.") return self._simulator
[docs] def attach_simulator(self, simulator: Simulator): """Attach the event to a simulator runtime.""" self._simulator = simulator
[docs] def detach_simulator(self): """Detach the event from its simulator runtime.""" self._simulator = None
@property def data(self) -> Any: """The payload generated by the event.""" return self._data @property def role(self) -> EventRole: """The workflow role of the event.""" return self._role @property def is_metric(self) -> bool: """Whether the event is a metric.""" return self._role is EventRole.METRIC @property def is_post_event(self) -> bool: """Whether the event is executed after another event fires.""" return self._role is not EventRole.EVENT @property def remote(self) -> bool: """Whether the event must be executed by a remote service/node. Returns: bool: True if the event is remote, False otherwise. """ return self._remote @property def logger(self) -> Logger: """Get a logger for the graph, binding the graph id in the logs. Returns: Logger: The logger for the graph. """ return logger.bind(id=self.name) @property def report_types(self) -> list[str]: """Get the report types for the event. Returns: list[str]: The report types for the event. """ return self._report
[docs] def set_report_types(self, report_types: list[str]): """Replace the report formats associated with the event.""" self._report = list(report_types)
def _application_fn( fn: Callable, placements: dict[str, Placement], infr: Infrastructure, flatten: bool = False, **event_data, ) -> Any: result = { pl.application.id: fn(pl.application, pl, infr, **event_data) for pl in placements.values() } if not flatten: return result return tuple( (pl.application.id, *flat_value) for pl in placements.values() for flat_value in _flatten_value(fn(pl.application, pl, infr, **event_data)) ) def _service_fn( fn: Callable, placements: dict[str, Placement], infr: Infrastructure, flatten: bool = False, **event_data, ) -> Any: result = { pl.application.id: { s: fn(s, req, pl, infr, **event_data) for s, req in pl.application.nodes(data=True) } for pl in placements.values() } if not flatten: return result return tuple( (pl.application.id, service_id, *flat_value) for pl in placements.values() for service_id, req in pl.application.nodes(data=True) for flat_value in _flatten_value(fn(service_id, req, pl, infr, **event_data)) ) def _remote_service_fn( fn: Callable, placements: dict[str, Placement], infr: Infrastructure, flatten: bool = False, **event_data, ) -> Any: engines: dict[str, ActorHandle | None] = defaultdict(lambda: None) remotes = [] for pl in placements.values(): if pl.mapping: for s in pl.application.nodes: if engines[pl.mapping[s]] is None: engines[pl.mapping[s]] = ray_backend.get_actor( f"{infr.id}/{pl.mapping[s]}" ) actor = engines[pl.mapping[s]] if actor is None: continue remotes.append( # type: ignore[attr-defined] actor.entrypoint.remote(s, fn, **event_data) ) results = ray_backend.get(remotes) # type: ignore[arg-type] results_iter = iter(results) result = { pl.application.id: {s: next(results_iter) for s in pl.application.nodes} for pl in placements.values() if pl.mapping } if not flatten: return result report_iter = iter(results) return tuple( (pl.application.id, service_id, *flat_value) for pl in placements.values() if pl.mapping for service_id in pl.application.nodes for flat_value in _flatten_value(next(report_iter)) ) def _interaction_fn( fn: Callable, placements: dict[str, Placement], infr: Infrastructure, flatten: bool = False, **event_data, ) -> Any: result = { pl.application.id: { (source, target): fn(source, target, req, pl, infr, **event_data) for source, target, req in pl.application.edges(data=True) } for pl in placements.values() } if not flatten: return result return tuple( (pl.application.id, source, target, *flat_value) for pl in placements.values() for source, target, req in pl.application.edges(data=True) for flat_value in _flatten_value( fn(source, target, req, pl, infr, **event_data) ) ) def _infrastructure_fn( fn: Callable, infr: Infrastructure, placement_view: PlacementView, **event_data ) -> dict[str, Any]: return fn(infr, placement_view, **event_data) def _node_fn( fn: Callable, placements: dict[str, Placement], infr: Infrastructure, placement_view: PlacementView, flatten: bool = False, **event_data, ) -> Any: result = { node: fn(node, res, placements, infr, placement_view, **event_data) for node, res in infr.nodes(data=True) } if not flatten: return result return tuple( (node, *flat_value) for node, res in infr.nodes(data=True) for flat_value in _flatten_value( fn(node, res, placements, infr, placement_view, **event_data) ) ) def _link_fn( fn: Callable, placements: dict[str, Placement], infr: Infrastructure, placement_view: PlacementView, flatten: bool = False, **event_data, ) -> Any: result = { (source, target): fn( source, target, res, placements, infr, placement_view, **event_data ) for source, target, res in infr.edges(data=True) } if not flatten: return result return tuple( (source, target, *flat_value) for source, target, res in infr.edges(data=True) for flat_value in _flatten_value( fn(source, target, res, placements, infr, placement_view, **event_data) ) ) def _flatten_value(value: Any) -> Generator[tuple[Any, ...], None, None]: """Flatten a value into tuple parts.""" if isinstance(value, dict): for key, item in value.items(): for key_parts, value_parts in _flatten_pair(key, item): yield (*key_parts, *value_parts) return if isinstance(value, tuple): for parts in product(*(_flatten_value(item) for item in value)): yield tuple(part for group in parts for part in group) return yield (value,) def _flatten_pair( key: Any, value: Any ) -> Generator[tuple[tuple[Any, ...], tuple[Any, ...]], None, None]: """Flatten a key/value pair into tuple parts.""" key_parts = key if isinstance(key, tuple) else (key,) for value_parts in _flatten_value(value): yield key_parts, value_parts