Grid analysis#
The grid analysis example runs a Ray Tune sweep over infrastructure topology, load, failure policy, random seed, and placement strategy choices.
Use it when you want to understand:
how to wrap an ECLYPSE simulation in a parameter-search function,
how to compare placement strategies across many generated infrastructures,
how to write custom infrastructure assets and policies for experiments.
The full code lives in the examples/grid_analysis directory.
Run it from the repository root with:
uv run grid-analysis
Simulation sweep#
Main sweep code
from pathlib import Path
from time import time
import ray
from ray import (
train,
tune,
)
from .applications import get_apps
from .infrastructure import get_infrastructure
from .metrics import get_metrics
from .strategy import EnergyMinimizationStrategy
from eclypse.placement.strategies import (
BestFitStrategy,
FirstFitStrategy,
RandomStrategy,
)
from eclypse.simulation import Simulation
from eclypse.simulation.config import SimulationConfig
from eclypse.utils.defaults import get_default_sim_path
def eclypse_grid(config):
print("Running with config: ", config)
stg = tune.get_context().get_storage()
path = (
Path(stg.storage_fs_path)
/ stg.experiment_dir_name
/ str(stg.trial_dir_name)
/ "output"
)
print("PATH: ", path)
sim_config = SimulationConfig(
step_every_ms="auto",
seed=config["seed"],
max_steps=config["max_steps"],
path=path,
include_default_metrics=False,
events=get_metrics(),
log_level="CRITICAL",
)
apps = get_apps(seed=config["seed"])
infr = get_infrastructure(config)
sim = Simulation(infrastructure=infr, simulation_config=sim_config)
for app in apps:
sim.register(app, get_strategy(config))
sim.run()
print("End of simulation")
def get_strategy(config):
if config["strategy"] == "random":
return RandomStrategy(config["seed"])
elif config["strategy"] == "best-fit":
return BestFitStrategy()
elif config["strategy"] == "first-fit":
return FirstFitStrategy()
elif config["strategy"] == "min-energy":
return EnergyMinimizationStrategy()
raise ValueError(f"Invalid strategy '{config['strategy']}'")
# Define the search space
search_space = {
"max_steps": 600,
"load": tune.grid_search(
[
0,
0.25,
0.5,
0.75,
]
),
"nodes": tune.grid_search(
[
50,
100,
300,
]
),
"seed": tune.grid_search(
[
42,
3997,
151195,
]
),
"policy": tune.grid_search(
[
("degrade", 0.4),
("degrade", 0.5),
("degrade", 0.6),
("kill", 0.01),
("kill", 0.05),
("kill", 0.1),
]
),
"strategy": tune.grid_search(
[
# "random",
# "first-fit",
# "best-fit",
"min-energy",
]
),
"topology": tune.grid_search(
[
("hierarchical",),
("star",),
# ("random", 0.25),
("random", 0.5),
# ("random", 0.75),
]
),
}
def main() -> None:
"""Run the Grid Analysis Ray Tune example."""
ray.init(address="auto")
start_time = time()
run_config = train.RunConfig(storage_path=get_default_sim_path().resolve())
tuner = tune.Tuner(
# tune.with_resources(eclypse_grid, {"cpu": 0.5}),
eclypse_grid,
param_space=search_space,
run_config=run_config,
)
tuner.fit()
print("Elapsed time: ", time() - start_time)
if __name__ == "__main__":
main()
Infrastructure#
Infrastructure code
from __future__ import annotations
from typing import (
Any,
Callable,
Optional,
Union,
)
from .policies import (
degrade_policy,
kill_policy,
)
from eclypse.builders.infrastructure import (
get_hierarchical,
get_random,
get_star,
)
from eclypse.graph import Infrastructure
from eclypse.graph.assets import Concave
from eclypse.graph.assets.space import (
AssetSpace,
Choice,
)
from eclypse.utils.constants import (
MAX_FLOAT,
MIN_FLOAT,
)
from eclypse.utils.types import PrimitiveType
def get_infrastructure(config) -> Infrastructure:
update_policies = get_policy(config)
common_config = {
"update_policies": update_policies,
"resource_init": "max",
"symmetric": True,
"seed": config["seed"],
"node_assets": {"energy": idle_energy_consumption()},
"include_default_assets": True,
}
if config["topology"][0] == "star":
infr = get_star(
infrastructure_id="star",
n_clients=config["nodes"],
**common_config,
)
elif config["topology"][0] == "random":
infr = get_random(
infrastructure_id="random",
n=config["nodes"],
p=config["topology"][1],
**common_config,
)
elif config["topology"][0] == "hierarchical":
infr = get_hierarchical(
infrastructure_id="hierarchical",
n=config["nodes"],
**common_config,
)
else:
raise ValueError(f"Unknown topology {config['topology']}")
apply_load(infr, config["load"])
return infr
def get_policy(config):
if config["policy"][0] == "degrade":
return degrade_policy(config["policy"][1], config["max_steps"])
return kill_policy(config["policy"][1])
def apply_load(infr: Infrastructure, load: float):
if load != 0:
for _, attr in infr.nodes(data=True):
for key in ["cpu", "gpu", "ram", "storage"]:
attr[key] = int(attr[key] * (1 - load))
for _, _, attr in infr.edges(data=True):
for key in ["bandwidth"]:
attr[key] = attr[key] * (1 - load)
def idle_energy_consumption(
lower_bound: float = MAX_FLOAT,
upper_bound: float = MIN_FLOAT,
init_fn_or_value: Optional[
Union[PrimitiveType, AssetSpace, Callable[[], Any]]
] = None,
) -> Concave:
"""Create a new additive asset for idle energy consumption.
Args:
lower_bound (float): The lower bound of the asset.
upper_bound (float): The upper bound of the asset.
init_fn_or_value (Optional[Union[PrimitiveType, AssetSpace, Callable[[], Any]]]):
The function/scalar to initialize the idle energy consumption value.
Returns:
Concave: The idle energy consumption asset.
"""
_init_fn = (
Choice([20, 50, 80, 150]) if init_fn_or_value is None else init_fn_or_value
)
return Concave(lower_bound, upper_bound, _init_fn, functional=False)
Placement strategy#
Strategy code
from __future__ import annotations
import random as rnd
from typing import (
TYPE_CHECKING,
Any,
Dict,
Optional,
)
from eclypse.placement.strategies import PlacementStrategy
from eclypse.utils.constants import MAX_FLOAT
if TYPE_CHECKING:
from eclypse.graph import (
Application,
Infrastructure,
)
from eclypse.placement import (
Placement,
PlacementView,
)
class EnergyMinimizationStrategy(PlacementStrategy):
"""A placement strategy that minimizes energy consumption based on the allocated
CPU, GPU, RAM, and storage."""
def __init__(
self,
cpu_weight: float = 0.25,
gpu_weight: float = 0.25,
ram_weight: float = 0.25,
storage_weight: float = 0.25,
):
"""Initializes the strategy with specific weights for the energy function.
Args:
cpu_weight (float): Weight of the CPU energy consumption.
gpu_weight (float): Weight of the GPU energy consumption.
ram_weight (float): Weight of the RAM energy consumption.
storage_weight (float): Weight of the storage energy consumption.
"""
self.cpu_weight = cpu_weight
self.gpu_weight = gpu_weight
self.ram_weight = ram_weight
self.storage_weight = storage_weight
self.initial_resources = None
def _energy_consumption(
self,
idle: float,
cpu: float,
gpu: float,
ram: float,
storage: float,
) -> float:
"""Calculates the energy based on the allocated resources.
Args:
cpu (float): Allocated CPU.
gpu (float): Allocated GPU.
ram (float): Allocated RAM.
storage (float): Allocated storage.
Returns:
float: The calculated energy.
"""
return (
idle
+ self.cpu_weight * cpu
+ self.gpu_weight * gpu
+ self.ram_weight * ram
+ self.storage_weight * storage
)
def place(
self,
infrastructure: Infrastructure,
application: Application,
_: Dict[str, Placement],
placement_view: PlacementView,
) -> Dict[Any, Any]:
"""Places the services of an application on the infrastructure nodes to minimize
energy consumption.
Args:
infrastructure (Infrastructure): The infrastructure to place the application on.
application (Application): The application to place on the infrastructure.
Returns:
Dict[str, str]: A mapping of services to infrastructure nodes.
"""
if self.initial_resources is None:
self.initial_resources = infrastructure.nodes(data=True)
if not self.is_feasible(infrastructure, application):
return {}
mapping = {}
infrastructure_nodes = list(infrastructure.available.nodes(data=True))
rnd.shuffle(infrastructure_nodes)
for service, sattr in application.nodes(data=True):
min_energy: Optional[float] = MAX_FLOAT
best_fit: Optional[str] = None
best_nattr: Optional[Dict[str, Any]] = None
for node, nattr in infrastructure_nodes:
if infrastructure.node_assets.satisfies(nattr, sattr):
allocated_nattr = infrastructure.node_assets.consume(
self.initial_resources[node], nattr
)
energy = self._energy_consumption(
allocated_nattr.get("energy", 0),
allocated_nattr.get("cpu", 0),
allocated_nattr.get("gpu", 0),
allocated_nattr.get("ram", 0),
allocated_nattr.get("storage", 0),
)
if energy < min_energy and energy > 0:
min_energy = energy
best_fit = node
best_nattr = nattr
mapping[service] = best_fit
if best_fit is None or best_nattr is None:
continue
new_res = infrastructure.node_assets.consume(best_nattr, sattr)
infrastructure_nodes.remove((best_fit, best_nattr))
infrastructure_nodes.append((best_fit, new_res))
return mapping