User distribution#
The user distribution example customises a generated infrastructure with a
user_count asset and updates latency and placement conditions as the user
distribution evolves.
Use it when you want to understand:
how to add a custom infrastructure asset,
how to write metrics around domain-specific infrastructure state,
how update policies can drive a longer-running simulation.
The full code lives in the examples/user_distribution directory.
Run it from the repository root with:
uv run user-distribution
Infrastructure#
Infrastructure code
import networkx as nx
from .metric import user_count_asset
from .update_policy import (
LatencyUpdatePolicy,
UserDistributionPolicy,
kill_policy,
)
from eclypse.builders.infrastructure import get_hierarchical
def get_infrastructure(seed: int):
kill_probability = 0.1
i = get_hierarchical(
node_assets={"user_count": user_count_asset()},
infrastructure_id="get_hierarchical",
n=187,
update_policies=[
kill_policy(kill_probability=kill_probability),
LatencyUpdatePolicy(kill_probability=kill_probability),
UserDistributionPolicy(),
],
include_default_assets=True,
symmetric=True,
seed=seed,
)
mapping = {old_name: new_name for new_name, old_name in enumerate(i.nodes())}
i = nx.relabel_nodes(i, mapping, copy=False)
return i
Metrics#
Metrics code
from __future__ import annotations
import math
import os
from typing import (
TYPE_CHECKING,
Any,
Dict,
)
import psutil
from eclypse.graph.assets import Additive
from eclypse.report.metrics import metric
from eclypse.report.metrics.defaults import (
SimulationTime,
response_time,
)
if TYPE_CHECKING:
from eclypse.graph import (
Application,
Infrastructure,
)
from eclypse.placement import Placement
# Node asset for the infrastructure
def user_count_asset(
lower_bound: float = 0.0,
upper_bound: float = float("inf"),
init_value: int = 0,
) -> Additive:
return Additive(lower_bound, upper_bound, init_value, functional=False)
# Metrics for the simulation
@metric.node(name="user_count")
def user_count_metric(_: str, resources: Dict[str, Any], __, ___, ____) -> float:
return resources.get("user_count", 0)
@metric.node(name="user_delay")
def user_delay(
node: str,
resources: Dict[str, Any],
placements: Dict[str, Placement],
infr: Infrastructure,
__,
) -> float:
placement = placements.get("SockShop")
# If the application is not placed, return infinity
if placement is None or placement.is_partial:
return float("inf")
frontend_node = placement.service_placement("FrontendService")
latency = infr.path_resources(node, frontend_node)["latency"]
user_count = resources.get("user_count", 0)
return (
latency + (user_count * math.log(1 + user_count))
if latency is not None
else float("inf")
)
@metric.application(name="used_nodes")
def used_nodes(_: Application, placement: Placement, __: Infrastructure) -> Application:
return len(set(placement.mapping.values()))
@metric.simulation(name="cpu_usage", activates_on=["enact", "stop"])
class CPUMonitor:
def __init__(self):
self.process = psutil.Process(os.getpid())
def __call__(self, event):
return self.process.cpu_percent(interval=0.1)
@metric.simulation(name="memory_usage", activates_on=["enact", "stop"])
class MemoryMonitor:
def __init__(self):
self.process = psutil.Process(os.getpid())
def __call__(self, event):
memory_usage = self.process.memory_info().rss
return memory_usage / (1024 * 1024) # Convert to MB
def get_metrics():
return [
user_count_metric,
user_delay,
response_time,
CPUMonitor(),
MemoryMonitor(),
SimulationTime(),
]
Simulation#
Simulation code
from time import time
from .infrastructure import get_infrastructure
from .metric import get_metrics
from eclypse.builders.application import get_sock_shop
from eclypse.placement.strategies import BestFitStrategy
from eclypse.simulation import (
Simulation,
SimulationConfig,
)
from eclypse.utils.defaults import get_default_sim_path
SEED = 42
STEPS = 4167
def main() -> None:
"""Run the user distribution example."""
app = get_sock_shop(seed=SEED)
strategy = BestFitStrategy()
sim_config = SimulationConfig(
step_every_ms="auto",
seed=SEED,
max_steps=STEPS,
path=get_default_sim_path() / "user-distribution",
events=get_metrics(),
log_to_file=True,
)
infrastructure = get_infrastructure(SEED)
sim = Simulation(infrastructure, simulation_config=sim_config)
sim.register(app, strategy)
start_time = time()
sim.run()
print("Elapsed time: ", time() - start_time)
if __name__ == "__main__":
main()