Source code for eclypse.policies.schedule.after

"""Run a policy from a given step onward."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from eclypse.graph.asset_graph import AssetGraph
    from eclypse.utils.types import UpdatePolicy


@dataclass(slots=True)
class AfterPolicy:
    """Run a policy from ``start`` onward."""

    start: int
    policy: UpdatePolicy
    step: int = 0

    def __post_init__(self):
        """Validate the schedule configuration."""
        if self.start < 0:
            raise ValueError("start must be non-negative.")

    def __call__(self, graph: AssetGraph):
        """Apply the wrapped policy from the configured step onward."""
        if self.step >= self.start:
            self.policy(graph)
            graph.logger.trace(f"Triggered after policy at step {self.step}.")
        self.step += 1


[docs] def after( start: int, policy: UpdatePolicy, ) -> UpdatePolicy: """Run a policy from ``start`` onward. Args: start (int): First step at which the policy should run. policy (UpdatePolicy): The wrapped policy. Returns: UpdatePolicy: A scheduled wrapper around ``policy``. """ return AfterPolicy(start=start, policy=policy)