Source code for eclypse.policies.workload.arrival_process

"""Poisson arrival workload policy."""

from __future__ import annotations

from typing import TYPE_CHECKING

from eclypse.policies._filters import (
    iter_selected_edges,
    iter_selected_nodes,
)
from eclypse.policies.distribution.poisson import _sample_poisson
from eclypse.policies.workload._helpers import apply_selected_asset_transform

if TYPE_CHECKING:
    from eclypse.graph.asset_graph import AssetGraph
    from eclypse.policies._filters import (
        EdgeFilter,
        NodeFilter,
    )
    from eclypse.utils.types import UpdatePolicy


[docs] def arrival_process( rate: float, *, node_assets: str | list[str] | None = None, edge_assets: str | list[str] | None = None, node_ids: list[str] | None = None, node_filter: NodeFilter | None = None, edge_ids: list[tuple[str, str]] | None = None, edge_filter: EdgeFilter | None = None, ) -> UpdatePolicy: """Add Poisson arrivals to selected workload assets. Args: rate (float): Poisson arrival rate. node_assets (str | list[str] | None): Optional node asset key selector. edge_assets (str | list[str] | None): Optional edge asset key selector. node_ids (list[str] | None): Optional explicit node identifiers to mutate. node_filter (NodeFilter | None): Optional predicate receiving ``(node_id, data)``. edge_ids (list[tuple[str, str]] | None): Optional explicit edge identifiers to mutate. edge_filter (EdgeFilter | None): Optional predicate receiving ``(source, target, data)``. Returns: Policy that increments selected workload assets. """ if rate < 0: raise ValueError("rate must be non-negative.") if node_assets is None and edge_assets is None: raise ValueError("At least one of node_assets or edge_assets must be provided.") def policy(graph: AssetGraph): for _, data in iter_selected_nodes( graph, node_ids=node_ids, node_filter=node_filter, ): _add_arrivals(data, node_assets, rate, graph) for _, _, data in iter_selected_edges( graph, edge_ids=edge_ids, edge_filter=edge_filter, ): _add_arrivals(data, edge_assets, rate, graph) return policy
def _add_arrivals(data, assets, rate, graph): """Apply sampled arrivals to one asset mapping. Args: data (dict[str, object]): Asset mapping to mutate. assets (str | list[str] | None): Optional asset selector. rate (float): Poisson arrival rate. graph (AssetGraph): Graph providing the random generator. Returns: None. """ apply_selected_asset_transform( data, assets, transform=lambda _key, current: current + _sample_poisson(graph.rnd, rate), )