Source code for eclypse.policies.replay.from_parquet
"""Replay policy builders from parquet files."""
from __future__ import annotations
from typing import TYPE_CHECKING
from eclypse.policies.replay.from_dataframe import from_dataframe
if TYPE_CHECKING:
from eclypse.policies._filters import (
EdgeFilter,
NodeFilter,
)
from eclypse.utils.types import (
MissingPolicyBehaviour,
ReplayTarget,
UpdatePolicy,
)
[docs]
def from_parquet(
path: str,
*,
target: ReplayTarget,
node_id_column: str = "node_id",
source_column: str = "source",
target_column: str = "target",
time_column: str = "time",
value_columns: list[str] | tuple[str, ...] | None = None,
node_ids: list[str] | None = None,
node_filter: NodeFilter | None = None,
edge_ids: list[tuple[str, str]] | None = None,
edge_filter: EdgeFilter | None = None,
missing: MissingPolicyBehaviour = "ignore",
start_step: int | None = None,
cyclic: bool = False,
) -> UpdatePolicy:
"""Build a replay policy from a parquet file using pandas when available.
Args:
path (str): Parquet file path.
target (ReplayTarget): Replay target, either ``"nodes"`` or ``"edges"``.
node_id_column (str): Column containing node identifiers.
source_column (str): Column containing edge source identifiers.
target_column (str): Column containing edge target identifiers.
time_column (str): Column containing replay steps.
value_columns (list[str] | tuple[str, ...] | None):
Optional explicit columns to copy from records.
node_ids (list[str] | None): Optional explicit node identifiers to mutate.
node_filter (NodeFilter | None): Optional predicate receiving ``(node_id, data)``.
edge_ids (list[tuple[str, str]] | None): Optional explicit edge identifiers to mutate.
edge_filter (EdgeFilter | None): Optional predicate receiving ``(source, target, data)``.
missing (MissingPolicyBehaviour): Behaviour when a replay record targets a missing item.
start_step (int | None): Optional starting replay step.
cyclic (bool): Whether to wrap past the final available replay step.
Returns:
Stateful replay policy.
"""
try:
import pandas as pd
except ImportError as exc: # pragma: no cover - optional dependency
raise ImportError(
"from_parquet requires pandas with parquet support installed."
) from exc
return from_dataframe(
pd.read_parquet(path),
target=target,
node_id_column=node_id_column,
source_column=source_column,
target_column=target_column,
time_column=time_column,
value_columns=value_columns,
node_ids=node_ids,
node_filter=node_filter,
edge_ids=edge_ids,
edge_filter=edge_filter,
missing=missing,
start_step=start_step,
cyclic=cyclic,
)