Source code for eclypse.report.backends.polars_lazy_backend
"""Polars lazy backend implementation.
This module provides a concrete FrameBackend implementation using polars LazyFrame.
It builds a lazy query plan and only executes when you call `.collect()`.
Polars is imported lazily so that it remains an optional dependency.
"""
from __future__ import annotations
from importlib import import_module
from typing import (
TYPE_CHECKING,
Any,
)
from eclypse.report.backend import (
FrameBackend,
list_parquet_parts,
load_jsonl_rows,
)
if TYPE_CHECKING:
from collections.abc import (
Iterable,
)
from polars import LazyFrame
[docs]
class PolarsLazyBackend(FrameBackend):
"""Polars lazy implementation of the FrameBackend abstract base class.
Note:
When using this backend, Report methods return a LazyFrame. Call `.collect()`
to materialise a DataFrame.
"""
[docs]
def __init__(self):
"""Initialise the polars lazy backend.
Imports polars lazily to keep it as an optional dependency.
"""
super().__init__(name="polars_lazy")
self._pl = import_module("polars")
def _read_csv(self, source) -> LazyFrame:
"""Read a CSV report into a polars LazyFrame."""
pl = self._pl
return self._coerce_value_column(pl.scan_csv(source))
def _read_parquet(self, source) -> LazyFrame:
"""Read partitioned parquet data into a polars LazyFrame."""
pl = self._pl
return self._coerce_value_column(
pl.scan_parquet([str(part) for part in list_parquet_parts(source)])
)
def _read_json(self, source, report_type: str) -> LazyFrame:
"""Read JSONL report data into a polars LazyFrame."""
pl = self._pl
return self._coerce_value_column(
pl.DataFrame(load_jsonl_rows(source, report_type)).lazy()
)
def _coerce_value_column(self, lf: LazyFrame) -> LazyFrame:
"""Cast the common ``value`` column when present."""
pl = self._pl
if "value" in lf.collect_schema():
return lf.with_columns(
pl.col("value").cast(pl.Float64, strict=False).alias("value")
)
return lf
[docs]
def is_empty(self, df: LazyFrame) -> bool:
"""Return whether the LazyFrame is empty.
This performs a minimal execution (fetching up to one row).
Args:
df: The LazyFrame to inspect.
Returns:
True if it has no rows, otherwise False.
"""
return df.limit(1).collect().height == 0
[docs]
def columns(self, df: LazyFrame) -> set[str]:
"""Return the set of column names.
Args:
df: The LazyFrame to inspect.
Returns:
A set containing the column names.
"""
return set(df.collect_schema().names())
[docs]
def max(self, df: LazyFrame, col: str) -> int:
"""Return the maximum value of a column as an int.
This executes an aggregation query.
Args:
df: The LazyFrame to inspect.
col: The name of the column.
Returns:
The maximum value as a Python int.
"""
pl = self._pl
return int(df.select(pl.col(col).max()).collect().item())
[docs]
def filter_events(
self, df: LazyFrame, col: str, events: Iterable[int]
) -> LazyFrame:
"""Filter rows where `col` is contained in `events`.
Args:
df: The LazyFrame to filter.
col: The column name to test membership against.
events: The allowed values for `col`.
Returns:
A filtered LazyFrame.
"""
pl = self._pl
return df.filter(pl.col(col).is_in(list(events)))
[docs]
def filter_range_step(
self, df: LazyFrame, col: str, start: int, stop: int, step: int
) -> LazyFrame:
"""Filter rows where `col` is within a range and matches the given step."""
pl = self._pl
expr = (pl.col(col) >= start) & (pl.col(col) <= stop)
if step > 1:
expr = expr & (((pl.col(col) - start) % step) == 0)
return df.filter(expr)
[docs]
def filter_eq(self, df: LazyFrame, col: str, value: Any) -> LazyFrame:
"""Filter rows where `col` equals `value`.
Args:
df: The LazyFrame to filter.
col: The column name to compare.
value: The value to match.
Returns:
A filtered LazyFrame.
"""
pl = self._pl
return df.filter(pl.col(col) == value)
[docs]
def filter_in(self, df: LazyFrame, col: str, values: Iterable[Any]) -> LazyFrame:
"""Filter rows where `col` is contained in `values`.
Args:
df: The LazyFrame to filter.
col: The column name to test membership against.
values: The allowed values for `col`.
Returns:
A filtered LazyFrame.
"""
pl = self._pl
return df.filter(pl.col(col).is_in(list(values)))