Source code for eclypse.policies.schedule.cooldown
"""Run a policy with a minimum gap between applications."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from eclypse.graph.asset_graph import AssetGraph
from eclypse.utils.types import UpdatePolicy
@dataclass(slots=True)
class CooldownPolicy:
"""Run a policy at most once every ``steps`` calls."""
steps: int
policy: UpdatePolicy
step: int = 0
next_allowed_step: int = 0
def __post_init__(self):
"""Validate the schedule configuration."""
if self.steps < 0:
raise ValueError("steps must be non-negative.")
def __call__(self, graph: AssetGraph):
"""Apply the wrapped policy when the cooldown has elapsed."""
if self.step >= self.next_allowed_step:
self.policy(graph)
self.next_allowed_step = self.step + self.steps + 1
graph.logger.trace(f"Triggered cooldown policy at step {self.step}.")
self.step += 1
[docs]
def cooldown(steps: int, policy: UpdatePolicy) -> UpdatePolicy:
"""Run a policy at most once every ``steps`` calls.
Args:
steps (int): Minimum number of skipped calls after each application.
policy (UpdatePolicy): Wrapped policy to throttle.
Returns:
Stateful schedule policy.
"""
return CooldownPolicy(steps=steps, policy=policy)