Source code for eclypse.policies.failure.latency_spike
"""Latency spike policy."""
from __future__ import annotations
from typing import TYPE_CHECKING
from eclypse.policies._filters import (
clamp,
coerce_numeric_like,
ensure_numeric_value,
iter_selected_edges,
)
from eclypse.policies._helpers import validate_probability
from eclypse.utils.constants import MIN_LATENCY
if TYPE_CHECKING:
from eclypse.graph.asset_graph import AssetGraph
from eclypse.policies._filters import EdgeFilter
from eclypse.utils.types import UpdatePolicy
[docs]
def latency_spike(
probability: float,
*,
min_increase: float = 1.0,
max_increase: float | None = None,
factor: float | None = None,
latency_key: str = "latency",
edge_ids: list[tuple[str, str]] | None = None,
edge_filter: EdgeFilter | None = None,
) -> UpdatePolicy:
"""Inject random latency spikes on selected edges.
Args:
probability (float): Probability of applying a spike to each selected edge.
min_increase (float): Minimum additive spike size when using additive mode.
max_increase (float | None): Maximum additive spike size when using additive
mode. Defaults to ``min_increase``.
factor (float | None): Optional multiplicative spike factor. When provided,
additive spike parameters are ignored.
latency_key (str): Edge asset storing latency.
edge_ids (list[tuple[str, str]] | None): Optional explicit list of target
edges.
edge_filter (EdgeFilter | None): Optional predicate to filter target edges.
Returns:
UpdatePolicy: A graph update policy implementing latency spikes.
"""
validate_probability("probability", probability)
if factor is not None and factor < 0:
raise ValueError("factor must be non-negative.")
if min_increase < 0:
raise ValueError("min_increase must be non-negative.")
spike_ceiling = min_increase if max_increase is None else max_increase
if spike_ceiling < min_increase:
raise ValueError("max_increase must be greater than or equal to min_increase.")
def policy(graph: AssetGraph):
for _, _, data in iter_selected_edges(
graph,
edge_ids=edge_ids,
edge_filter=edge_filter,
):
if graph.rnd.random() >= probability:
continue
current = ensure_numeric_value(latency_key, data[latency_key])
if factor is not None:
new_value = current * factor
else:
new_value = current + graph.rnd.uniform(min_increase, spike_ceiling)
data[latency_key] = coerce_numeric_like(
data[latency_key],
clamp(new_value, lower=MIN_LATENCY),
)
graph.logger.trace("Applied latency_spike policy.")
return policy