Source code for eclypse.policies.failure.kill_nodes

"""Random node failure policy."""

from __future__ import annotations

from typing import TYPE_CHECKING

from eclypse.policies._filters import (
    ensure_numeric_value,
    iter_selected_nodes,
)
from eclypse.policies._helpers import validate_probability
from eclypse.policies.failure._helpers import set_availability_with_probability
from eclypse.utils.constants import MIN_AVAILABILITY

if TYPE_CHECKING:
    from eclypse.graph.asset_graph import AssetGraph
    from eclypse.policies._filters import NodeFilter
    from eclypse.utils.types import UpdatePolicy


[docs] def kill_nodes( probability: float, *, revive_probability: float | None = None, down_availability: float = MIN_AVAILABILITY, revived_availability: float = 0.99, availability_key: str = "availability", node_ids: list[str] | None = None, node_filter: NodeFilter | None = None, ) -> UpdatePolicy: """Randomly mark selected nodes as unavailable, with optional revival. Args: probability (float): Probability of marking a selected node as unavailable. revive_probability (float | None): Optional probability of reviving an unavailable selected node. down_availability (float): Availability value assigned to failed nodes. revived_availability (float): Availability value assigned to revived nodes. availability_key (str): Node asset storing availability. node_ids (list[str] | None): Optional explicit list of node ids to target. node_filter (NodeFilter | None): Optional predicate to filter target nodes. Returns: UpdatePolicy: A graph update policy implementing node failures. """ validate_probability("probability", probability) validate_probability("revive_probability", revive_probability) def policy(graph: AssetGraph): for _, data in iter_selected_nodes( graph, node_ids=node_ids, node_filter=node_filter, ): availability = ensure_numeric_value( availability_key, data[availability_key], ) if graph.rnd.random() < probability: data[availability_key] = down_availability elif revive_probability is not None and availability <= down_availability: set_availability_with_probability( data, probability=revive_probability, availability_key=availability_key, target_availability=revived_availability, random=graph.rnd, ) graph.logger.trace("Applied kill_nodes policy.") return policy