Source code for eclypse.report.backends.pandas_backend
"""Pandas backend implementation.
This module provides a concrete FrameBackend implementation using pandas.
Pandas is imported lazily so that it remains an optional dependency.
"""
from __future__ import annotations
from importlib import import_module
from typing import (
TYPE_CHECKING,
Any,
)
from eclypse.report.backend import (
FrameBackend,
list_parquet_parts,
load_jsonl_rows,
)
if TYPE_CHECKING:
from collections.abc import (
Iterable,
)
from pandas import DataFrame # type: ignore[import-untyped]
def _to_float(value: Any) -> Any:
"""Convert a value to float where possible (pandas CSV converter).
Args:
value: The value to convert.
Returns:
The float value if conversion succeeds; otherwise the original value.
"""
try:
return float(value)
except (TypeError, ValueError):
return value
[docs]
class PandasBackend(FrameBackend):
"""Pandas implementation of the FrameBackend abstract base class."""
[docs]
def __init__(self):
"""Initialise the pandas backend.
Imports pandas lazily to keep it as an optional dependency.
"""
super().__init__(name="pandas")
self._pd = import_module("pandas")
def _read_csv(self, source) -> DataFrame:
"""Read a CSV report into a pandas DataFrame."""
return self._pd.read_csv(source, converters={"value": _to_float})
def _read_parquet(self, source) -> DataFrame:
"""Read partitioned parquet data into a pandas DataFrame."""
return self._pd.concat(
[self._pd.read_parquet(part) for part in list_parquet_parts(source)],
ignore_index=True,
)
def _read_json(self, source, report_type: str) -> DataFrame:
"""Read JSONL report data into a pandas DataFrame."""
return self._pd.DataFrame(load_jsonl_rows(source, report_type))
[docs]
def is_empty(self, df: DataFrame) -> bool:
"""Return whether the DataFrame is empty.
Args:
df: The DataFrame to inspect.
Returns:
True if the DataFrame has no rows, otherwise False.
"""
return df.empty
[docs]
def columns(self, df: DataFrame) -> set[str]:
"""Return the set of column names.
Args:
df: The DataFrame to inspect.
Returns:
A set containing the DataFrame column names.
"""
return set(df.columns)
[docs]
def max(self, df: DataFrame, col: str) -> int:
"""Return the maximum value of a column as an int.
Args:
df: The DataFrame to inspect.
col: The name of the column.
Returns:
The maximum value as a Python int.
"""
return int(df[col].max())
[docs]
def filter_events(
self, df: DataFrame, col: str, events: Iterable[int]
) -> DataFrame:
"""Filter rows where `col` is contained in `events`.
Args:
df: The DataFrame to filter.
col: The column name to test membership against.
events: The allowed values for `col`.
Returns:
A filtered DataFrame.
"""
return df[df[col].isin(list(events))]
[docs]
def filter_range_step(
self, df: DataFrame, col: str, start: int, stop: int, step: int
) -> DataFrame:
"""Filter rows where `col` is within a range and matches the given step."""
series = df[col]
mask = (series >= start) & (series <= stop)
if step > 1:
mask &= ((series - start) % step) == 0
return df[mask]
[docs]
def filter_eq(self, df: DataFrame, col: str, value: Any) -> DataFrame:
"""Filter rows where `col` equals `value`.
Args:
df: The DataFrame to filter.
col: The column name to compare.
value: The value to match.
Returns:
A filtered DataFrame.
"""
return df[df[col] == value]
[docs]
def filter_in(self, df: DataFrame, col: str, values: Iterable[Any]) -> DataFrame:
"""Filter rows where `col` is contained in `values`.
Args:
df: The DataFrame to filter.
col: The column name to test membership against.
values: The allowed values for `col`.
Returns:
A filtered DataFrame.
"""
return df[df[col].isin(list(values))]