Source code for eclypse.report.reporter

"""Module for the Reporter abstract class.

It defines the basic structure of a reporter, which is used to generate reports during
the simulation.
"""

from __future__ import annotations

from abc import (
    ABC,
    abstractmethod,
)
from itertools import product
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
)

if TYPE_CHECKING:
    from collections.abc import (
        Generator,
    )

    from eclypse.workflow.event.event import EclypseEvent


[docs] class Reporter(ABC): """Abstract class to report the simulation metrics. It provides the interface for the simulation reporters. """
[docs] def __init__( self, report_path: str | Path, ): """Create a new Reporter. Args: report_path (str | Path): The path to save the reports. """ self.report_path = Path(report_path)
[docs] async def init(self): """Perform any preparation logic (file creation, folder setup, headers, etc).""" self.report_path.mkdir(parents=True, exist_ok=True)
[docs] async def close(self): """Perform any shutdown logic (closing file handles, flushing state, etc).""" return None
[docs] @abstractmethod async def write(self, callback_type: str, data: Any): """Write a batch of buffered data to the destination (file, db, etc)."""
[docs] @abstractmethod def report( self, event_name: str, event_idx: int, callback: EclypseEvent, ) -> Generator[Any, None, None]: """Report the simulation reportable callbacks. Args: event_name (str): The name of the event. event_idx (int): The index of the event trigger (step). callback (EclypseEvent): The executed event. Returns: Generator[Any, None, None]: The entries to be written lazily. """
[docs] def dfs_data(self, data: Any) -> Generator[list[Any], None, None]: """Perform DFS on nested dictionaries and build concatenated key paths. Args: data (Any): The data to traverse. Returns: list: The list of paths. """ def dfs(d): if isinstance(d, dict): for key, value in d.items(): for key_path, value_path in product(dfs(key), dfs(value)): yield key_path + value_path elif isinstance(d, tuple): for path in product(*map(dfs, d)): yield [item for subpath in path for item in subpath] else: yield [d] yield from dfs(data)
[docs] def callback_rows(self, callback: EclypseEvent) -> Generator[list[Any], None, None]: """Return callback tuple rows directly, otherwise DFS-flatten the payload.""" if callback.is_metric and isinstance(callback.data, tuple): for row in callback.data: yield list(row) return yield from self.dfs_data(callback.data)