Source code for eclypse.report.reporters.parquet

"""Module for the Parquet reporter, used to report simulation metrics in Parquet."""

from __future__ import annotations

import asyncio
from datetime import datetime as dt
from importlib import import_module
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
)

from eclypse.report.reporter import Reporter
from eclypse.report.schema import DEFAULT_REPORT_HEADERS
from eclypse.utils.defaults import PARQUET_REPORT_DIR

if TYPE_CHECKING:
    from collections.abc import (
        Generator,
    )

    from eclypse.workflow.event import EclypseEvent


[docs] class ParquetReporter(Reporter): """Class to report simulation metrics in partitioned Parquet files."""
[docs] def __init__(self, report_path: str | Path): """Initialize the Parquet reporter.""" super().__init__(report_path) self.report_path = self.report_path / PARQUET_REPORT_DIR self._partitions: dict[str, int] = {} self._pl = None
[docs] async def init(self): """Initialize the reporter and import polars lazily.""" await super().init() self._pl = import_module("polars")
[docs] def report( self, event_name: str, event_idx: int, callback: EclypseEvent, ) -> Generator[dict[str, Any], None, None]: """Report callback values as row dictionaries suitable for Parquet.""" callback_type = callback.type if callback_type is None: return columns = DEFAULT_REPORT_HEADERS[callback_type] for line in self.callback_rows(callback): if line[-1] is None: continue values = [ dt.now().isoformat(), event_name, event_idx, callback.name, *line, ] yield {column: value for column, value in zip(columns, values, strict=True)}
[docs] async def write(self, callback_type: str, data: list[dict[str, Any]]): """Write a batch of callback rows to a parquet part file.""" if not data: return if self._pl is None: raise RuntimeError("Parquet reporter is not initialised.") part_idx = self._partitions.get(callback_type, 0) self._partitions[callback_type] = part_idx + 1 output_dir = Path(self.report_path / callback_type) output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / f"part-{part_idx:06d}.parquet" columns = DEFAULT_REPORT_HEADERS[callback_type] await asyncio.to_thread( _write_parquet, self._pl, data, columns, output_path, )
def _write_parquet(pl: Any, data: list[dict[str, Any]], columns: list[str], path: Path): """Write a parquet batch synchronously on a worker thread.""" frame = pl.DataFrame(data).select(columns) frame.write_parquet(path, compression="zstd")