"""Module for the Infrastructure class.
It represents a network, with nodes representing devices and
edges representing links between them.
The infrastructure also stores:
- A set of path assets aggregators, one per edge asset.
- A path algorithm to compute the paths between nodes.
- A view of the available nodes and edges.
- A cache of the computed paths and their costs.
"""
from __future__ import annotations
from typing import (
TYPE_CHECKING,
Any,
)
import networkx as nx
from networkx.classes.filters import no_filter
from eclypse.graph import AssetGraph
from eclypse.utils._logging import (
format_log_kv,
log_placement_violations,
)
from eclypse.utils.constants import (
COST_RECOMPUTATION_THRESHOLD,
MIN_FLOAT,
)
from eclypse.utils.defaults import (
DEFAULT_COST_ATTR,
DEFAULT_EDGE_LATENCY,
)
from .assets.defaults import (
get_default_edge_assets,
get_default_node_assets,
get_default_path_aggregators,
)
if TYPE_CHECKING:
from collections.abc import Callable
from eclypse.graph.assets.asset import Asset
from eclypse.utils.types import (
InitPolicy,
UpdatePolicies,
)
[docs]
class Infrastructure(AssetGraph): # pylint: disable=too-few-public-methods
"""Class to represent a Cloud-Edge infrastructure."""
[docs]
def __init__(
self,
infrastructure_id: str = "Infrastructure",
update_policies: UpdatePolicies = None,
node_assets: dict[str, Asset] | None = None,
edge_assets: dict[str, Asset] | None = None,
include_default_assets: bool = True,
path_assets_aggregators: dict[str, Callable[[list[Any]], Any]] | None = None,
path_algorithm: Callable[[nx.Graph, str, str], list[str]] | None = None,
resource_init: InitPolicy = "min",
seed: int | None = None,
):
"""Create a new Infrastructure.
Args:
infrastructure_id (str): The ID of the infrastructure.
update_policies (Callable | list[Callable] | None):\
Graph update policies executed during ``evolve()``.
node_assets (dict[str, Asset] | None): The assets of the nodes.
edge_assets (dict[str, Asset] | None): The assets of the edges.
include_default_assets (bool): Whether to include the default assets. \
Defaults to True.
path_assets_aggregators (dict[str, Callable[[list[Any]], Any]] | None): \
The aggregators to use for the path assets.
path_algorithm (Callable[[nx.Graph, str, str], list[str]] | None): \
The algorithm to use to compute the paths.
resource_init (InitPolicy):
The initialization method for the resources.
seed (int | None): The seed for the random number generator.
"""
_node_assets = get_default_node_assets() if include_default_assets else {}
_edge_assets = get_default_edge_assets() if include_default_assets else {}
_node_assets.update(node_assets or {})
_edge_assets.update(edge_assets or {})
super().__init__(
graph_id=infrastructure_id,
update_policies=update_policies,
node_assets=_node_assets,
edge_assets=_edge_assets,
attr_init=resource_init,
seed=seed,
)
default_path_aggregator = (
get_default_path_aggregators() if include_default_assets else {}
)
_path_assets_aggregators = (
path_assets_aggregators if path_assets_aggregators is not None else {}
)
for k in _edge_assets:
if k not in _path_assets_aggregators:
if k not in default_path_aggregator:
raise ValueError(
f'The path asset aggregator for "{k}" is not defined.'
)
_path_assets_aggregators[k] = default_path_aggregator[k]
missing = _edge_assets.keys() - _path_assets_aggregators.keys()
if missing:
raise ValueError(
"Every edge asset must have a corresponding path aggregator. "
f"Missing aggregators for: {missing}"
)
self.path_assets_aggregators = _path_assets_aggregators
self._path_algorithm: Callable[[nx.Graph, str, str], list[str]] = (
path_algorithm
if path_algorithm is not None
else _get_default_path_algorithm
)
self._available: nx.DiGraph | None = None
self._paths: dict[str, dict[str, list[str]]] = {}
self._costs: dict[str, dict[str, list[tuple[str, str, Any]]]] = {}
self._path_resources: dict[str, dict[str, dict[str, Any]]] = {}
self._processing_times: dict[str, dict[str, float]] = {}
[docs]
def evolve(self):
"""Update the infrastructure and invalidate derived path caches."""
super().evolve()
self._invalidate_cache()
[docs]
def add_node(self, node_for_adding: str, strict: bool = False, **assets: Any):
"""Add a node and invalidate the path cache.
Args:
node_for_adding (str): The node to add.
strict (bool): If True, raise an error if the node already exists.
**assets: Additional node assets.
"""
super().add_node(node_for_adding, strict=strict, **assets)
self._invalidate_cache()
[docs]
def add_edge(
self,
u_of_edge: str,
v_of_edge: str,
symmetric: bool = False,
strict: bool = False,
**assets: Any,
):
"""Add an edge and invalidate the path cache.
Args:
u_of_edge (str): The source node of the edge.
v_of_edge (str): The target node of the edge.
symmetric (bool): If True, add the edge in both directions.
strict (bool): If True, raise an error if the edge already exists.
**assets: Additional edge assets.
"""
super().add_edge(
u_of_edge, v_of_edge, symmetric=symmetric, strict=strict, **assets
)
self._invalidate_cache()
[docs]
def remove_node(self, n: str):
"""Remove a node and invalidate the path cache.
Args:
n (str): The node to remove.
"""
super().remove_node(n)
self._invalidate_cache()
[docs]
def remove_edge(self, u: str, v: str):
"""Remove an edge and invalidate the path cache.
Args:
u (str): The source node of the edge.
v (str): The target node of the edge.
"""
super().remove_edge(u, v)
self._invalidate_cache()
[docs]
def validate(self, other: nx.DiGraph) -> list[str]:
"""Validate the infrastructure against a set of requirements.
Compares the requirements of the nodes and edges in the PlacementView with
the resources of the nodes and edges in the Infrastructure.
Args:
other (Infrastructure): The Infrastructure to compare with.
Returns:
list[str]: A list of nodes whose requirements are not respected or
whose connected links are not respected.
"""
not_respected = set()
for n, req in other.nodes(data=True):
res = self.nodes[n]
node_violations = self.node_assets.satisfies(res, req, violations=True)
if isinstance(node_violations, dict) and node_violations:
self.logger.warning(
f'Node "{n}" violated | '
+ format_log_kv(assets=",".join(sorted(node_violations)))
)
log_placement_violations(self.logger, node_violations)
not_respected.add(n)
for u, v, req in other.edges(data=True):
res = self.path_resources(u, v)
edge_violations = self.edge_assets.satisfies(res, req, violations=True)
if isinstance(edge_violations, dict) and edge_violations:
self.logger.warning(
f'Link "{u} -> {v}" violated | '
+ format_log_kv(assets=",".join(sorted(edge_violations)))
)
log_placement_violations(self.logger, edge_violations)
not_respected.add(u)
not_respected.add(v)
return list(not_respected)
[docs]
def path(
self,
source: str,
target: str,
cost_attr: str = DEFAULT_COST_ATTR,
) -> list[tuple[str, str, dict[str, Any]]] | None:
"""Retrieve the hop-level path between two nodes, if it exists.
If the path has not been computed yet, or if any hop cost has changed
by more than the configured threshold, the path is recomputed and cached.
Args:
source (str): The name of the source node.
target (str): The name of the target node.
cost_attr (str): The edge attribute to consider as the cost for \
determining whether to recompute the path. Defaults to "latency".
Returns:
list[tuple[str, str, dict[str, Any]]] | None: The per-hop costs as \
(source, target, edge_attributes), or None if no path exists.
"""
try:
if source not in self._paths or target not in self._paths[source]:
self._compute_path(source, target)
if not all(n in self.available for n in self._paths[source][target]):
self._compute_path(source, target)
else:
costs = [
c.get(cost_attr, DEFAULT_EDGE_LATENCY)
for _, _, c in self._path_costs(self._paths[source][target])
]
cached_costs = [
cc.get(cost_attr, DEFAULT_EDGE_LATENCY)
for _, _, cc in self._costs[source][target]
]
if len(costs) != len(cached_costs) or any(
_cost_changed(c, cc)
for c, cc in zip(costs, cached_costs, strict=False)
):
self._compute_path(source, target)
return self._costs[source][target]
except (nx.NetworkXNoPath, nx.NodeNotFound):
return None
[docs]
def processing_time(self, source: str, target: str) -> float:
"""Compute the total processing time of all nodes along the path.
Calls :py:meth:`path` first to ensure the node list is cached, then
sums the ``processing_time`` attribute of every node on that path.
Returns 0.0 when source and target are the same node or when no path
exists.
Args:
source (str): The name of the source node.
target (str): The name of the target node.
Returns:
float: The total processing time along the path, in the same unit \
as the individual node ``processing_time`` attributes.
"""
if source == target or self.path(source, target) is None:
return 0.0
return self._processing_times[source][target]
[docs]
def path_resources(self, source: str, target: str) -> dict[str, Any]:
"""Retrieve the resources of the path between two nodes, if it exists.
If the path does not exist, it is computed and cached.
Args:
source (str): The name of the source node.
target (str): The name of the target node.
Returns:
PathResources: The resources of the path between the two nodes, or None if \
the path does not exist.
"""
if source == target:
return self.edge_assets.upper_bound
path = self.path(source, target)
if path is None:
return self.edge_assets.lower_bound
return self._path_resources[source][target]
def _invalidate_cache(self):
"""Invalidate the path and cost caches, and reset the available view.
Must be called whenever the graph topology changes (node or edge
addition/removal), so that stale paths are not returned.
"""
self._paths.clear()
self._costs.clear()
self._path_resources.clear()
self._processing_times.clear()
self._available = None
def _compute_path(self, source: str, target: str):
"""Compute the path between two nodes using the given algorithm, and cache it.
Args:
source (str): The name of the source node.
target (str): The name of the target node.
"""
path_nodes = self._path_algorithm(self.available, source, target)
path_costs = self._path_costs(path_nodes)
self._paths.setdefault(source, {})[target] = path_nodes
self._costs.setdefault(source, {})[target] = path_costs
self._path_resources.setdefault(source, {})[target] = {
k: aggr([c[k] for _, _, c in path_costs])
for k, aggr in self.path_assets_aggregators.items()
}
self._processing_times.setdefault(source, {})[target] = sum(
self.nodes[n].get("processing_time", MIN_FLOAT) for n in path_nodes
)
def _path_costs(self, path: list[str]) -> list[tuple[str, str, dict[str, Any]]]:
"""Compute the per-hop costs of a path.
Args:
path (list[str]): The path as a list of node IDs.
Returns:
list[tuple[str, str, dict[str, Any]]]: The per-hop costs as \
(source, target, edge_attributes) for each consecutive pair.
"""
return [(s, t, self.edges[s, t]) for s, t in nx.utils.pairwise(path)]
@property
def available(self) -> nx.DiGraph:
"""Return a filtered view containing only the available nodes.
Uses nx.subgraph_view to avoid creating a full Infrastructure instance.
The view is dynamic: it reflects the current state of the graph at all
times, filtering out nodes where availability <= 0.
Returns:
nx.DiGraph: A subgraph view with only the available nodes.
"""
if self._available is None:
self._available = nx.subgraph_view(
self,
filter_node=self.is_available,
filter_edge=no_filter,
)
return self._available
[docs]
def is_available(self, n: str):
"""Check if the node is available.
Args:
n (str): The node to check.
Returns:
bool: True if the node is available, False otherwise.
"""
return self.nodes[n].get("availability", 1) > 0
def _default_weight_function(_: str, __: str, eattr: dict[str, Any]) -> float:
"""Function to compute the weight of an edge in the shortest path algorithm.
The weight is given by the 'latency' attribute if it exists, 1 otherwise (i.e., it
counts as an hop).
Args:
u (str): The name of the source node.
v (str): The name of the target node.
eattr (dict[str, Any]): The attributes of the edge.
Returns:
float: The weight of the edge.
"""
return eattr.get("latency", DEFAULT_EDGE_LATENCY)
def _get_default_path_algorithm(g: nx.Graph, source: str, target: str) -> list[str]:
"""Compute the path between two nodes using Dijkstra's algorithm.
It tries to use the 'latency' attribute of the edges as the weight,
or the number of hops if it does not exist.
Args:
g (nx.Graph): The graph to compute the path on.
source (str): The name of the source node.
target (str): The name of the target node.
Returns:
list[str]: The list of node IDs in the shortest path.
"""
return nx.dijkstra_path(g, source, target, weight=_default_weight_function)
def _cost_changed(current: float, cached: float) -> bool:
"""Check whether a hop cost has changed beyond the recomputation threshold.
If the cached cost is zero and the current cost is not, the change
is considered significant by definition (avoids division by zero).
Args:
current (float): The current cost of the hop.
cached (float): The previously cached cost of the hop.
Returns:
bool: True if the cost changed beyond the threshold.
"""
return (
current != 0
if cached == 0
else abs(current - cached) / cached >= COST_RECOMPUTATION_THRESHOLD
)