Source code for eclypse.report.reporters.csv

# pylint: disable=unused-argument
"""Module for the CSVReporter class.

It is used to report the simulation metrics in a CSV format.
"""

from __future__ import annotations

from datetime import datetime as dt
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
)

import aiofiles  # type: ignore[import-untyped]

from eclypse.report.reporter import Reporter
from eclypse.report.schema import DEFAULT_REPORT_HEADERS
from eclypse.utils.defaults import CSV_REPORT_DIR

if TYPE_CHECKING:
    from collections.abc import (
        Generator,
    )

    from eclypse.workflow.event import EclypseEvent

CSV_DELIMITER = ","


[docs] class CSVReporter(Reporter): """Class to report the simulation metrics in CSV format. It prints an header with the format of the rows and then the values of the reportable. """
[docs] def __init__(self, report_path: str | Path): """Initialize the CSV reporter.""" super().__init__(report_path) self.report_path = self.report_path / CSV_REPORT_DIR self._files: dict[str, Any] = {}
[docs] def report( self, event_name: str, event_idx: int, callback: EclypseEvent, ) -> Generator[str, None, None]: """Reports the callback values in a CSV file, one per line. Args: event_name (str): The name of the event. event_idx (int): The index of the event trigger (step). callback (EclypseEvent): The executed callback containing the data to report. """ for line in self.callback_rows(callback): if line[-1] is None: continue fields = [dt.now().isoformat(), event_name, event_idx, callback.name, *line] yield CSV_DELIMITER.join(str(field) for field in fields)
async def _get_file(self, callback_type: str): """Get or create the append-only file handle for a callback type.""" if callback_type in self._files: return self._files[callback_type] path = Path(self.report_path / f"{callback_type}.csv") exists = path.exists() handle = await aiofiles.open(path, "a", encoding="utf-8") if not exists: await handle.write( f"{CSV_DELIMITER.join(DEFAULT_REPORT_HEADERS[callback_type])}\n" ) self._files[callback_type] = handle return handle
[docs] async def write(self, callback_type: str, data: Any): """Writes the data to a CSV file based on the callback type. Args: callback_type (str): The type of the callback. data (Any): The data to write to the CSV file. """ if not data: return handle = await self._get_file(callback_type) await handle.write("".join(f"{line}\n" for line in data))
[docs] async def close(self): """Close all open CSV file handles.""" for handle in self._files.values(): await handle.close() self._files.clear()