Source code for eclypse.report.backends.polars_backend

"""Polars eager backend implementation.

This module provides a concrete FrameBackend implementation using polars eager
DataFrames. Polars is imported lazily so that it remains an optional dependency.
"""

from __future__ import annotations

from importlib import import_module
from typing import (
    TYPE_CHECKING,
    Any,
)

from eclypse.report.backend import (
    FrameBackend,
    list_parquet_parts,
    load_jsonl_rows,
)

if TYPE_CHECKING:
    from collections.abc import (
        Iterable,
    )

    from polars import DataFrame


[docs] class PolarsBackend(FrameBackend): """Polars eager implementation of the FrameBackend abstract base class."""
[docs] def __init__(self): """Initialise the polars backend. Imports polars lazily to keep it as an optional dependency. """ super().__init__(name="polars") self._pl = import_module("polars")
def _read_csv(self, source) -> DataFrame: """Read a CSV report into a polars DataFrame.""" pl = self._pl return self._coerce_value_column(pl.read_csv(source)) def _read_parquet(self, source) -> DataFrame: """Read partitioned parquet data into a polars DataFrame.""" pl = self._pl return self._coerce_value_column( pl.read_parquet([str(part) for part in list_parquet_parts(source)]) ) def _read_json(self, source, report_type: str) -> DataFrame: """Read JSONL report data into a polars DataFrame.""" pl = self._pl return self._coerce_value_column( pl.DataFrame(load_jsonl_rows(source, report_type)) ) def _coerce_value_column(self, df: DataFrame) -> DataFrame: """Cast the common ``value`` column when present.""" pl = self._pl if "value" in df.columns: return df.with_columns( pl.col("value").cast(pl.Float64, strict=False).alias("value") ) return df
[docs] def is_empty(self, df: DataFrame) -> bool: """Return whether the DataFrame is empty. Args: df: The DataFrame to inspect. Returns: True if the DataFrame has no rows, otherwise False. """ return df.height == 0
[docs] def columns(self, df: DataFrame) -> set[str]: """Return the set of column names. Args: df: The DataFrame to inspect. Returns: A set containing the DataFrame column names. """ return set(df.columns)
[docs] def max(self, df: DataFrame, col: str) -> int: """Return the maximum value of a column as an int. Args: df: The DataFrame to inspect. col: The name of the column. Returns: The maximum value as a Python int. """ pl = self._pl return int(df.select(pl.col(col).max()).item())
[docs] def filter_events( self, df: DataFrame, col: str, events: Iterable[int] ) -> DataFrame: """Filter rows where `col` is contained in `events`. Args: df: The DataFrame to filter. col: The column name to test membership against. events: The allowed values for `col`. Returns: A filtered DataFrame. """ pl = self._pl return df.filter(pl.col(col).is_in(list(events)))
[docs] def filter_range_step( self, df: DataFrame, col: str, start: int, stop: int, step: int ) -> DataFrame: """Filter rows where `col` is within a range and matches the given step.""" pl = self._pl expr = (pl.col(col) >= start) & (pl.col(col) <= stop) if step > 1: expr = expr & (((pl.col(col) - start) % step) == 0) return df.filter(expr)
[docs] def filter_eq(self, df: DataFrame, col: str, value: Any) -> DataFrame: """Filter rows where `col` equals `value`. Args: df: The DataFrame to filter. col: The column name to compare. value: The value to match. Returns: A filtered DataFrame. """ pl = self._pl return df.filter(pl.col(col) == value)
[docs] def filter_in(self, df: DataFrame, col: str, values: Iterable[Any]) -> DataFrame: """Filter rows where `col` is contained in `values`. Args: df: The DataFrame to filter. col: The column name to test membership against. values: The allowed values for `col`. Returns: A filtered DataFrame. """ pl = self._pl return df.filter(pl.col(col).is_in(list(values)))