Source code for eclypse.policies.schedule.jittered_every

"""Run a periodic policy with bounded jitter."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from eclypse.graph.asset_graph import AssetGraph
    from eclypse.utils.types import UpdatePolicy


@dataclass(slots=True)
class JitteredEveryPolicy:
    """Run a policy periodically with integer step jitter."""

    interval: int
    policy: UpdatePolicy
    jitter: int = 0
    start: int = 0
    step: int = 0
    next_step: int | None = None

    def __post_init__(self):
        """Validate the schedule configuration."""
        if self.interval <= 0:
            raise ValueError("interval must be strictly positive.")
        if self.jitter < 0:
            raise ValueError("jitter must be non-negative.")
        if self.start < 0:
            raise ValueError("start must be non-negative.")

    def __call__(self, graph: AssetGraph):
        """Apply the wrapped policy when the jittered step is reached."""
        if self.next_step is None:
            self.next_step = self.start
        if self.step >= self.next_step:
            self.policy(graph)
            delta = self.interval
            if self.jitter:
                delta += graph.rnd.randint(-self.jitter, self.jitter)
            self.next_step = self.step + max(1, delta)
            graph.logger.trace(f"Triggered jittered_every policy at step {self.step}.")
        self.step += 1


[docs] def jittered_every( interval: int, policy: UpdatePolicy, *, jitter: int = 0, start: int = 0, ) -> UpdatePolicy: """Run a policy every ``interval`` steps with optional integer jitter. Args: interval (int): Base interval between applications. policy (UpdatePolicy): Wrapped policy to call. jitter (int): Maximum integer offset added to each next interval. start (int): First eligible step. Returns: Stateful schedule policy. """ return JitteredEveryPolicy( interval=interval, policy=policy, jitter=jitter, start=start, )