Source code for eclypse.policies.topology.add_edge
"""Edge insertion topology policy."""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from eclypse.graph.asset_graph import AssetGraph
from eclypse.utils.types import UpdatePolicy
[docs]
def add_edge(
source: str,
target: str,
*,
symmetric: bool = False,
strict: bool = False,
**assets,
) -> UpdatePolicy:
"""Add an edge if both endpoints exist.
Args:
source (str): Source node identifier.
target (str): Target node identifier.
symmetric (bool): Whether to add the symmetric edge too.
strict (bool): Whether graph insertion should use strict duplicate checks.
assets (Any): Edge assets passed to the graph.
Returns:
Policy that adds the edge when endpoints exist.
"""
def policy(graph: AssetGraph):
if graph.has_node(source) and graph.has_node(target):
graph.add_edge(source, target, symmetric=symmetric, strict=strict, **assets)
return policy