User distribution#

The user distribution example customises a generated infrastructure with a user_count asset and updates latency and placement conditions as the user distribution evolves.

Use it when you want to understand:

  • how to add a custom infrastructure asset,

  • how to write metrics around domain-specific infrastructure state,

  • how update policies can drive a longer-running simulation.

The full code lives in the examples/user_distribution directory.

Run it from the repository root with:

uv run user-distribution

Infrastructure#

Infrastructure code
import networkx as nx

from .metric import user_count_asset
from .update_policy import (
    LatencyUpdatePolicy,
    UserDistributionPolicy,
    kill_policy,
)

from eclypse.builders.infrastructure import get_hierarchical


def get_infrastructure(seed: int):
    kill_probability = 0.1
    i = get_hierarchical(
        node_assets={"user_count": user_count_asset()},
        infrastructure_id="get_hierarchical",
        n=187,
        update_policies=[
            kill_policy(kill_probability=kill_probability),
            LatencyUpdatePolicy(kill_probability=kill_probability),
            UserDistributionPolicy(),
        ],
        include_default_assets=True,
        symmetric=True,
        seed=seed,
    )

    mapping = {old_name: new_name for new_name, old_name in enumerate(i.nodes())}
    i = nx.relabel_nodes(i, mapping, copy=False)
    return i

Metrics#

Metrics code
from __future__ import annotations

import math
import os
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
)

import psutil

from eclypse.graph.assets import Additive
from eclypse.report.metrics import metric
from eclypse.report.metrics.defaults import (
    SimulationTime,
    response_time,
)

if TYPE_CHECKING:
    from eclypse.graph import (
        Application,
        Infrastructure,
    )
    from eclypse.placement import Placement

# Node asset for the infrastructure


def user_count_asset(
    lower_bound: float = 0.0,
    upper_bound: float = float("inf"),
    init_value: int = 0,
) -> Additive:
    return Additive(lower_bound, upper_bound, init_value, functional=False)


# Metrics for the simulation


@metric.node(name="user_count")
def user_count_metric(_: str, resources: Dict[str, Any], __, ___, ____) -> float:
    return resources.get("user_count", 0)


@metric.node(name="user_delay")
def user_delay(
    node: str,
    resources: Dict[str, Any],
    placements: Dict[str, Placement],
    infr: Infrastructure,
    __,
) -> float:

    placement = placements.get("SockShop")

    # If the application is not placed, return infinity
    if placement is None or placement.is_partial:
        return float("inf")

    frontend_node = placement.service_placement("FrontendService")

    latency = infr.path_resources(node, frontend_node)["latency"]
    user_count = resources.get("user_count", 0)
    return (
        latency + (user_count * math.log(1 + user_count))
        if latency is not None
        else float("inf")
    )


@metric.application(name="used_nodes")
def used_nodes(_: Application, placement: Placement, __: Infrastructure) -> Application:
    return len(set(placement.mapping.values()))


@metric.simulation(name="cpu_usage", activates_on=["enact", "stop"])
class CPUMonitor:

    def __init__(self):
        self.process = psutil.Process(os.getpid())

    def __call__(self, event):
        return self.process.cpu_percent(interval=0.1)


@metric.simulation(name="memory_usage", activates_on=["enact", "stop"])
class MemoryMonitor:

    def __init__(self):
        self.process = psutil.Process(os.getpid())

    def __call__(self, event):
        memory_usage = self.process.memory_info().rss
        return memory_usage / (1024 * 1024)  # Convert to MB


def get_metrics():
    return [
        user_count_metric,
        user_delay,
        response_time,
        CPUMonitor(),
        MemoryMonitor(),
        SimulationTime(),
    ]

Simulation#

Simulation code
from time import time

from .infrastructure import get_infrastructure
from .metric import get_metrics

from eclypse.builders.application import get_sock_shop
from eclypse.placement.strategies import BestFitStrategy
from eclypse.simulation import (
    Simulation,
    SimulationConfig,
)
from eclypse.utils.defaults import get_default_sim_path

SEED = 42
STEPS = 4167


def main() -> None:
    """Run the user distribution example."""
    app = get_sock_shop(seed=SEED)
    strategy = BestFitStrategy()

    sim_config = SimulationConfig(
        step_every_ms="auto",
        seed=SEED,
        max_steps=STEPS,
        path=get_default_sim_path() / "user-distribution",
        events=get_metrics(),
        log_to_file=True,
    )
    infrastructure = get_infrastructure(SEED)

    sim = Simulation(infrastructure, simulation_config=sim_config)
    sim.register(app, strategy)

    start_time = time()
    sim.run()
    print("Elapsed time: ", time() - start_time)


if __name__ == "__main__":
    main()