Source code for eclypse.policies.replay.replay_events
"""Replay arbitrary update policies from time-indexed event records."""
from __future__ import annotations
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
)
from eclypse.policies.replay._helpers import (
group_records_by_step,
initial_step,
normalise_records,
resolve_replay_step,
)
if TYPE_CHECKING:
from eclypse.graph.asset_graph import AssetGraph
from eclypse.utils.types import UpdatePolicy
@dataclass(slots=True)
class ReplayEventsPolicy:
"""Replay update callables stored in records."""
records_by_step: dict[int, list[dict[str, Any]]]
policy_column: str = "policy"
current_step: int = 0
cyclic: bool = False
def __call__(self, graph: AssetGraph):
"""Apply all event policies for the current step."""
replay_step = resolve_replay_step(
self.records_by_step,
self.current_step,
cyclic=self.cyclic,
)
for record in self.records_by_step.get(replay_step, []):
record[self.policy_column](graph)
self.current_step += 1
graph.logger.trace(f"Applied replay_events policy for step {replay_step}.")
[docs]
def replay_events(
record_source,
*,
time_column: str = "time",
policy_column: str = "policy",
start_step: int | None = None,
cyclic: bool = False,
) -> UpdatePolicy:
"""Replay arbitrary update policies from time-indexed records.
Args:
record_source (Any): Iterable of records containing update policies.
time_column (str): Column containing replay steps.
policy_column (str): Column containing policy callables.
start_step (int | None): Optional starting replay step.
cyclic (bool): Whether to wrap past the final available replay step.
Returns:
Stateful event replay policy.
"""
records = normalise_records(record_source)
records_by_step = group_records_by_step(records, time_column=time_column)
return ReplayEventsPolicy(
records_by_step=records_by_step,
policy_column=policy_column,
current_step=initial_step(records_by_step, start_step),
cyclic=cyclic,
)