Source code for eclypse.policies.failure.network_partition
"""Network partition policy."""
from __future__ import annotations
from typing import TYPE_CHECKING
from eclypse.utils.constants import MIN_AVAILABILITY
_MIN_PARTITIONS = 2
if TYPE_CHECKING:
from eclypse.graph.asset_graph import AssetGraph
from eclypse.utils.types import UpdatePolicy
[docs]
def network_partition(
groups: list[list[str]],
*,
availability_key: str = "availability",
unavailable_value: float = MIN_AVAILABILITY,
remove_edges: bool = False,
) -> UpdatePolicy:
"""Partition node groups by disabling or removing cross-group edges.
Args:
groups (list[list[str]]): Node identifiers grouped by partition.
availability_key (str): Edge asset used when cross-group edges are disabled.
unavailable_value (float): Value written to disabled cross-group edges.
remove_edges (bool): Whether to remove cross-group edges instead of mutating them.
Returns:
Policy that isolates the configured partitions.
"""
if len(groups) < _MIN_PARTITIONS:
raise ValueError("groups must contain at least two partitions.")
node_to_group = {
node_id: group_idx
for group_idx, group in enumerate(groups)
for node_id in group
}
def policy(graph: AssetGraph):
for source, target, data in list(graph.edges.data()):
source_group = node_to_group.get(source)
target_group = node_to_group.get(target)
if (
source_group is None
or target_group is None
or source_group == target_group
):
continue
if remove_edges:
graph.remove_edge(source, target)
else:
data[availability_key] = unavailable_value
graph.logger.trace("Applied network_partition policy.")
return policy