Source code for eclypse.report.backends.pandas_backend

"""Pandas backend implementation.

This module provides a concrete FrameBackend implementation using pandas.
Pandas is imported lazily so that it remains an optional dependency.
"""

from __future__ import annotations

from importlib import import_module
from typing import (
    TYPE_CHECKING,
    Any,
)

from eclypse.report.backend import (
    FrameBackend,
    list_parquet_parts,
    load_jsonl_rows,
)

if TYPE_CHECKING:
    from collections.abc import (
        Iterable,
    )

    from pandas import DataFrame  # type: ignore[import-untyped]


def _to_float(value: Any) -> Any:
    """Convert a value to float where possible (pandas CSV converter).

    Args:
        value: The value to convert.

    Returns:
        The float value if conversion succeeds; otherwise the original value.
    """
    try:
        return float(value)
    except (TypeError, ValueError):
        return value


[docs] class PandasBackend(FrameBackend): """Pandas implementation of the FrameBackend abstract base class."""
[docs] def __init__(self): """Initialise the pandas backend. Imports pandas lazily to keep it as an optional dependency. """ super().__init__(name="pandas") self._pd = import_module("pandas")
def _read_csv(self, source) -> DataFrame: """Read a CSV report into a pandas DataFrame.""" return self._pd.read_csv(source, converters={"value": _to_float}) def _read_parquet(self, source) -> DataFrame: """Read partitioned parquet data into a pandas DataFrame.""" return self._pd.concat( [self._pd.read_parquet(part) for part in list_parquet_parts(source)], ignore_index=True, ) def _read_json(self, source, report_type: str) -> DataFrame: """Read JSONL report data into a pandas DataFrame.""" return self._pd.DataFrame(load_jsonl_rows(source, report_type))
[docs] def is_empty(self, df: DataFrame) -> bool: """Return whether the DataFrame is empty. Args: df: The DataFrame to inspect. Returns: True if the DataFrame has no rows, otherwise False. """ return df.empty
[docs] def columns(self, df: DataFrame) -> set[str]: """Return the set of column names. Args: df: The DataFrame to inspect. Returns: A set containing the DataFrame column names. """ return set(df.columns)
[docs] def max(self, df: DataFrame, col: str) -> int: """Return the maximum value of a column as an int. Args: df: The DataFrame to inspect. col: The name of the column. Returns: The maximum value as a Python int. """ return int(df[col].max())
[docs] def filter_events( self, df: DataFrame, col: str, events: Iterable[int] ) -> DataFrame: """Filter rows where `col` is contained in `events`. Args: df: The DataFrame to filter. col: The column name to test membership against. events: The allowed values for `col`. Returns: A filtered DataFrame. """ return df[df[col].isin(list(events))]
[docs] def filter_range_step( self, df: DataFrame, col: str, start: int, stop: int, step: int ) -> DataFrame: """Filter rows where `col` is within a range and matches the given step.""" series = df[col] mask = (series >= start) & (series <= stop) if step > 1: mask &= ((series - start) % step) == 0 return df[mask]
[docs] def filter_eq(self, df: DataFrame, col: str, value: Any) -> DataFrame: """Filter rows where `col` equals `value`. Args: df: The DataFrame to filter. col: The column name to compare. value: The value to match. Returns: A filtered DataFrame. """ return df[df[col] == value]
[docs] def filter_in(self, df: DataFrame, col: str, values: Iterable[Any]) -> DataFrame: """Filter rows where `col` is contained in `values`. Args: df: The DataFrame to filter. col: The column name to test membership against. values: The allowed values for `col`. Returns: A filtered DataFrame. """ return df[df[col].isin(list(values))]