Source code for eclypse.policies.schedule.at
"""Run a policy at explicit steps."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from eclypse.graph.asset_graph import AssetGraph
from eclypse.utils.types import UpdatePolicy
@dataclass(slots=True)
class AtPolicy:
"""Run a policy at selected steps."""
steps: set[int]
policy: UpdatePolicy
step: int = 0
def __post_init__(self):
"""Validate the schedule configuration."""
if not self.steps:
raise ValueError("steps must not be empty.")
if any(step < 0 for step in self.steps):
raise ValueError("steps must be non-negative.")
def __call__(self, graph: AssetGraph):
"""Apply the wrapped policy when the current step is selected."""
if self.step in self.steps:
self.policy(graph)
graph.logger.trace(f"Triggered at policy at step {self.step}.")
self.step += 1
[docs]
def at(steps: int | list[int] | tuple[int, ...], policy: UpdatePolicy) -> UpdatePolicy:
"""Run a policy at one or more explicit steps.
Args:
steps (int | list[int] | tuple[int, ...]):
Step number or step numbers that trigger the policy.
policy (UpdatePolicy): Wrapped policy to call when the step matches.
Returns:
Stateful schedule policy.
"""
selected_steps = {steps} if isinstance(steps, int) else set(steps)
return AtPolicy(steps=selected_steps, policy=policy)