"""Report class backed by a pluggable DataFrame backend.
The Report reads CSV files produced by a simulation and provides convenient
accessors (application, service, etc.) returning a filtered DataFrame.
The backend is selectable (pandas, polars eager, polars lazy) and can be
extended by providing custom FrameBackend subclasses.
"""
from __future__ import annotations
import json
from collections import defaultdict
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
cast,
)
from eclypse.report.backends import get_backend
from eclypse.report.query import ReportQuery
from eclypse.report.schema import DEFAULT_REPORT_HEADERS
from eclypse.utils.defaults import (
DEFAULT_REPORT_BACKEND,
DEFAULT_REPORT_RANGE,
DEFAULT_REPORT_STEP,
DEFAULT_REPORT_TYPE,
SIMULATION_CONFIG_FILENAME,
)
if TYPE_CHECKING:
from eclypse.report.backend import FrameBackend
from eclypse.utils.types import (
EventType,
ReportFormat,
)
REPORT_TYPES: list[EventType] = cast("list[EventType]", list(DEFAULT_REPORT_HEADERS))
[docs]
class Report:
"""Report class backed by a pluggable DataFrame backend.
The report is built from CSV files produced by a simulation. It provides
methods to access report-specific DataFrames and filter them by event range,
step, and optional column filters.
Note:
When using the polars lazy backend, DataFrame-returning methods will
return a LazyFrame. Call `.collect()` to materialise a DataFrame.
"""
[docs]
def __init__(
self,
simulation_path: str | Path,
backend: str | FrameBackend = DEFAULT_REPORT_BACKEND,
report_format: ReportFormat | None = None,
):
"""Initialise the Report.
Args:
simulation_path: Path to the simulation directory containing report outputs.
backend: Backend name or a FrameBackend instance.
report_format: Storage format to read from. If omitted, uses the value
stored in ``config.json`` when available, otherwise
``DEFAULT_REPORT_TYPE``.
Raises:
FileNotFoundError: If the selected report format directory does not exist.
ValueError: If a backend name is unknown.
TypeError: If a backend object is not a FrameBackend.
"""
self._sim_path = Path(simulation_path)
self._config: dict[str, Any] | None = None
self._report_format: ReportFormat = self._resolve_report_format(report_format)
self._stats_path = self._sim_path / self._report_format
if not self._stats_path.exists():
raise FileNotFoundError(
f'No {self._report_format} report files found at "{self._stats_path}".'
)
self._backend = get_backend(backend)
self.stats: dict[EventType, Any | None] = defaultdict()
@property
def backend_name(self) -> str:
"""Return the name of the currently selected backend.
Returns:
The backend name.
"""
return self._backend.name
[docs]
def application(
self,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
event_ids: str | list[str] | None = None,
application_ids: str | list[str] | None = None,
) -> Any:
"""Return a filtered DataFrame containing application metrics.
Args:
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
event_ids: Event IDs to filter by.
application_ids: Application IDs to filter by.
Returns:
A filtered DataFrame for application metrics.
"""
return self.frame(
"application",
report_range=report_range,
report_step=report_step,
application_id=application_ids,
event_id=event_ids,
)
[docs]
def service(
self,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
event_ids: str | list[str] | None = None,
application_ids: str | list[str] | None = None,
service_ids: str | list[str] | None = None,
) -> Any:
"""Return a filtered DataFrame containing service metrics.
Args:
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
event_ids: Event IDs to filter by.
application_ids: Application IDs to filter by.
service_ids: Service IDs to filter by.
Returns:
A filtered DataFrame for service metrics.
"""
return self.frame(
"service",
report_range=report_range,
report_step=report_step,
application_id=application_ids,
event_id=event_ids,
service_id=service_ids,
)
[docs]
def interaction(
self,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
event_ids: str | list[str] | None = None,
sources: str | list[str] | None = None,
targets: str | list[str] | None = None,
application_ids: str | list[str] | None = None,
) -> Any:
"""Return a filtered DataFrame containing interaction metrics.
Args:
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
event_ids: Event IDs to filter by.
sources: Source IDs to filter by.
targets: Target IDs to filter by.
application_ids: Application IDs to filter by.
Returns:
A filtered DataFrame for interaction metrics.
"""
return self.frame(
"interaction",
report_range=report_range,
report_step=report_step,
application_id=application_ids,
event_id=event_ids,
source=sources,
target=targets,
)
[docs]
def infrastructure(
self,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
event_ids: str | list[str] | None = None,
) -> Any:
"""Return a filtered DataFrame containing infrastructure metrics.
Args:
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
event_ids: Event IDs to filter by.
Returns:
A filtered DataFrame for infrastructure metrics.
"""
return self.frame(
"infrastructure",
report_range=report_range,
report_step=report_step,
event_id=event_ids,
)
[docs]
def node(
self,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
event_ids: str | list[str] | None = None,
node_ids: str | list[str] | None = None,
) -> Any:
"""Return a filtered DataFrame containing node metrics.
Args:
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
event_ids: Event IDs to filter by.
node_ids: Node IDs to filter by.
Returns:
A filtered DataFrame for node metrics.
"""
return self.frame(
"node",
report_range=report_range,
report_step=report_step,
event_id=event_ids,
node_id=node_ids,
)
[docs]
def link(
self,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
event_ids: str | list[str] | None = None,
sources: str | list[str] | None = None,
targets: str | list[str] | None = None,
) -> Any:
"""Return a filtered DataFrame containing link metrics.
Args:
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
event_ids: Event IDs to filter by.
sources: Source IDs to filter by.
targets: Target IDs to filter by.
Returns:
A filtered DataFrame for link metrics.
"""
return self.frame(
"link",
report_range=report_range,
report_step=report_step,
event_id=event_ids,
source=sources,
target=targets,
)
[docs]
def simulation(
self,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
event_ids: str | list[str] | None = None,
) -> Any:
"""Return a filtered DataFrame containing simulation metrics.
Args:
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
event_ids: Event IDs to filter by.
Returns:
A filtered DataFrame for simulation metrics.
"""
return self.frame(
"simulation",
report_range=report_range,
report_step=report_step,
event_id=event_ids,
)
[docs]
def query(self, report_type: EventType) -> ReportQuery:
"""Create a composable query for the given report type."""
return ReportQuery(self, report_type)
[docs]
def describe(self) -> str:
"""Return a compact human-readable summary of available reports.
The summary includes total rows, unique simulation steps, unique metric
callback IDs, and a per-report breakdown. Missing report files are skipped.
Returns:
A summary string such as ``"12 rows x 3 steps x 5 metrics"``.
"""
total_rows = 0
steps: set[Any] = set()
metrics: set[Any] = set()
applications: set[Any] = set()
breakdown: list[str] = []
for report_type in REPORT_TYPES:
try:
self._read_frame(report_type)
except FileNotFoundError:
continue
frame = self.stats[report_type]
if frame is None:
continue
materialized = _materialize_frame(frame)
row_count = _frame_row_count(materialized)
total_rows += row_count
report_steps = set(_column_values(materialized, "n_event"))
report_metrics = set(_column_values(materialized, "callback_id"))
steps.update(report_steps)
metrics.update(report_metrics)
applications.update(_column_values(materialized, "application_id"))
breakdown.append(
f"{report_type}: {row_count} rows, {len(report_metrics)} metrics"
)
summary = (
f"{total_rows} rows x {len(steps)} steps x {len(metrics)} metrics"
f" | {len(applications)} applications"
)
if breakdown:
return f"{summary} | " + "; ".join(breakdown)
return summary
[docs]
def get_dataframes(
self,
report_types: list[EventType] | None = None,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
event_ids: str | list[str] | None = None,
) -> dict[str, Any]:
"""Return multiple report DataFrames for the specified report types.
Args:
report_types: Report types to fetch. If None, all report types are returned.
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
event_ids: Event IDs to filter by.
Returns:
A mapping from report type to filtered DataFrame.
Raises:
ValueError: If an invalid report type is provided.
"""
if report_types is None:
report_types = REPORT_TYPES
else:
for rt in report_types:
if rt not in REPORT_TYPES:
raise ValueError(f"Invalid report type: {rt}")
return {
report_type: self.frame(
report_type,
report_range=report_range,
report_step=report_step,
event_id=event_ids,
)
for report_type in report_types
}
[docs]
def frame(
self,
report_type: EventType,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
**kwargs: Any,
) -> Any:
"""Return a frame for the given report type with range and extra filters.
Args:
report_type: The report type (e.g. "application", "service", etc.).
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
**kwargs: Additional filters to apply. Keys must be column names.
Returns:
A filtered frame.
"""
self._read_frame(report_type)
df = self.stats[report_type]
if df is None:
raise RuntimeError(f"Report data for {report_type!r} could not be loaded.")
return self.filter(
df, report_range=report_range, report_step=report_step, **kwargs
)
def _read_frame(self, report_type: EventType):
"""Read a report file into a DataFrame and cache it.
Args:
report_type: The report type to read (e.g. "application", "service", etc.).
"""
if report_type not in self.stats:
self.stats[report_type] = self._backend.read_frame(
self._stats_path,
report_type,
self._report_format,
)
[docs]
def filter(
self,
df: Any,
report_range: tuple[int, int] = DEFAULT_REPORT_RANGE,
report_step: int = DEFAULT_REPORT_STEP,
**kwargs: Any,
) -> Any:
"""Filter a DataFrame by n_event range or step and optional equality filters.
Args:
df: The DataFrame to filter.
report_range: The inclusive range (start, end) of n_event values to include.
report_step: Step used when sampling n_event values.
**kwargs: Additional filters to apply. Values may be scalars or lists.
Returns:
A filtered DataFrame.
"""
b = self._backend
if b.is_empty(df):
return df
max_event = min(b.max(df, "n_event"), report_range[1])
filtered = b.filter_range_step(
df,
"n_event",
report_range[0],
max_event,
report_step,
)
filters = {k: v for k, v in kwargs.items() if v is not None}
cols = b.columns(filtered)
for key, value in filters.items():
if key not in cols:
continue
if isinstance(value, list):
filtered = b.filter_in(filtered, key, value)
else:
filtered = b.filter_eq(filtered, key, value)
return filtered
@property
def config(self) -> dict[str, Any]:
"""Return the simulation configuration loaded from config.json.
Returns:
The configuration mapping.
Raises:
FileNotFoundError: If config.json is missing.
json.JSONDecodeError: If the JSON file is invalid.
"""
if self._config is None:
file_path = self._sim_path / SIMULATION_CONFIG_FILENAME
with open(file_path, encoding="utf-8") as config_file:
self._config = json.load(config_file)
return self._config
@property
def report_format(self) -> ReportFormat:
"""Return the on-disk report format used for loading."""
return self._report_format
def _resolve_report_format(
self, report_format: ReportFormat | None
) -> ReportFormat:
"""Resolve report format from argument, config file, or default."""
if report_format is not None:
return report_format
config_path = self._sim_path / SIMULATION_CONFIG_FILENAME
if config_path.exists():
with open(config_path, encoding="utf-8") as config_file:
self._config = json.load(config_file)
config_format = self._config.get("report_format")
if config_format is not None:
return cast("ReportFormat", config_format)
return cast("ReportFormat", DEFAULT_REPORT_TYPE)
def _materialize_frame(frame: Any) -> Any:
"""Materialise lazy frames for summary inspection."""
collect = getattr(frame, "collect", None)
if callable(collect):
return collect()
return frame
def _frame_row_count(frame: Any) -> int:
"""Return a frame's row count across supported backends."""
if hasattr(frame, "height"):
return int(frame.height)
try:
return len(frame)
except TypeError:
return 0
def _column_values(frame: Any, column: str) -> list[Any]:
"""Return non-null values for a column across supported backends."""
if isinstance(frame, list):
return [
row[column] for row in frame if column in row and row[column] is not None
]
columns = getattr(frame, "columns", None)
if columns is None or column not in columns:
return []
get_column = getattr(frame, "get_column", None)
if callable(get_column):
series = get_column(column)
drop_nulls = getattr(series, "drop_nulls", None)
if callable(drop_nulls):
series = drop_nulls()
return list(series.to_list())
series = frame[column]
dropna = getattr(series, "dropna", None)
if callable(dropna):
series = dropna()
to_list = getattr(series, "to_list", None)
if callable(to_list):
return list(to_list())
tolist = getattr(series, "tolist", None)
if callable(tolist):
return list(tolist())
return list(series)