Source code for eclypse.report.reporters.json

"""Module for the JSON reporter, used to report simulation metrics in JSON format."""

from __future__ import annotations

import json
from datetime import datetime as dt
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
)

import aiofiles  # type: ignore[import-untyped]

from eclypse.report.reporter import Reporter
from eclypse.utils.defaults import JSON_REPORT_DIR

if TYPE_CHECKING:
    from collections.abc import (
        Generator,
    )

    from eclypse.workflow.event import EclypseEvent


[docs] class JSONReporter(Reporter): """Class to report the simulation metrics in JSON lines format."""
[docs] def __init__(self, report_path: str | Path): """Initialize the JSON reporter.""" super().__init__(report_path) self.report_path = self.report_path / JSON_REPORT_DIR self._files: dict[str, Any] = {}
[docs] def report( self, event_name: str, event_idx: int, callback: EclypseEvent, ) -> Generator[dict[str, Any], None, None]: """Reports the callback values in JSON lines format. Args: event_name (str): The name of the event. event_idx (int): The index of the event trigger (step). callback (EclypseEvent): The executed callback containing the data to report. Returns: Generator[dict[str, Any], None, None]: JSON lines entries to report lazily. """ if callback.data: yield { "timestamp": dt.now().isoformat(), "event_name": event_name, "event_idx": event_idx, "callback_name": callback.name, "data": _normalize_for_json(callback.data), }
async def _get_file(self, callback_type: str): """Get or create the append-only file handle for a callback type.""" if callback_type in self._files: return self._files[callback_type] path = Path(self.report_path / f"{callback_type}.jsonl") handle = await aiofiles.open(path, "a", encoding="utf-8") self._files[callback_type] = handle return handle
[docs] async def write(self, callback_type: str, data: list[dict[str, Any]]): """Write the JSON lines report to a file. Args: callback_type (str): The type of the callback (used for file naming). data (list[dict]): The list of dictionaries to write as JSON lines. """ if not data: return handle = await self._get_file(callback_type) await handle.write( "".join( f"{json.dumps(item, ensure_ascii=False, cls=_SafeJSONEncoder)}\n" for item in data ) )
[docs] async def close(self): """Close all open JSONL file handles.""" for handle in self._files.values(): await handle.close() self._files.clear()
class _SafeJSONEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: if hasattr(o, "isoformat"): return o.isoformat() if isinstance(o, (set, tuple)): return list(o) return super().default(o) def _normalize_for_json(value: Any) -> Any: """Recursively normalize Python values to a JSON-serializable structure.""" if hasattr(value, "isoformat"): return value.isoformat() if isinstance(value, dict): return { _normalize_key_for_json(key): _normalize_for_json(item) for key, item in value.items() } if isinstance(value, list): return [_normalize_for_json(item) for item in value] if isinstance(value, tuple): return [_normalize_for_json(item) for item in value] if isinstance(value, set): return [_normalize_for_json(item) for item in value] return value def _normalize_key_for_json(key: Any) -> str | int | float | bool | None: """Normalize dictionary keys to JSON-compatible scalars.""" if isinstance(key, (str, int, float, bool)) or key is None: return key normalized_key = _normalize_for_json(key) return json.dumps( normalized_key, ensure_ascii=False, separators=(",", ":"), cls=_SafeJSONEncoder, )