Grid analysis#

The grid analysis example runs a Ray Tune sweep over infrastructure topology, load, failure policy, random seed, and placement strategy choices.

Use it when you want to understand:

  • how to wrap an ECLYPSE simulation in a parameter-search function,

  • how to compare placement strategies across many generated infrastructures,

  • how to write custom infrastructure assets and policies for experiments.

The full code lives in the examples/grid_analysis directory.

Run it from the repository root with:

uv run grid-analysis

Simulation sweep#

Main sweep code
from pathlib import Path
from time import time

import ray
from ray import (
    train,
    tune,
)

from .applications import get_apps
from .infrastructure import get_infrastructure
from .metrics import get_metrics
from .strategy import EnergyMinimizationStrategy

from eclypse.placement.strategies import (
    BestFitStrategy,
    FirstFitStrategy,
    RandomStrategy,
)
from eclypse.simulation import Simulation
from eclypse.simulation.config import SimulationConfig
from eclypse.utils.defaults import get_default_sim_path


def eclypse_grid(config):
    print("Running with config: ", config)
    stg = tune.get_context().get_storage()
    path = (
        Path(stg.storage_fs_path)
        / stg.experiment_dir_name
        / str(stg.trial_dir_name)
        / "output"
    )
    print("PATH: ", path)

    sim_config = SimulationConfig(
        step_every_ms="auto",
        seed=config["seed"],
        max_steps=config["max_steps"],
        path=path,
        include_default_metrics=False,
        events=get_metrics(),
        log_level="CRITICAL",
    )

    apps = get_apps(seed=config["seed"])
    infr = get_infrastructure(config)
    sim = Simulation(infrastructure=infr, simulation_config=sim_config)

    for app in apps:
        sim.register(app, get_strategy(config))
    sim.run()

    print("End of simulation")


def get_strategy(config):
    if config["strategy"] == "random":
        return RandomStrategy(config["seed"])
    elif config["strategy"] == "best-fit":
        return BestFitStrategy()
    elif config["strategy"] == "first-fit":
        return FirstFitStrategy()
    elif config["strategy"] == "min-energy":
        return EnergyMinimizationStrategy()

    raise ValueError(f"Invalid strategy '{config['strategy']}'")


# Define the search space
search_space = {
    "max_steps": 600,
    "load": tune.grid_search(
        [
            0,
            0.25,
            0.5,
            0.75,
        ]
    ),
    "nodes": tune.grid_search(
        [
            50,
            100,
            300,
        ]
    ),
    "seed": tune.grid_search(
        [
            42,
            3997,
            151195,
        ]
    ),
    "policy": tune.grid_search(
        [
            ("degrade", 0.4),
            ("degrade", 0.5),
            ("degrade", 0.6),
            ("kill", 0.01),
            ("kill", 0.05),
            ("kill", 0.1),
        ]
    ),
    "strategy": tune.grid_search(
        [
            # "random",
            # "first-fit",
            # "best-fit",
            "min-energy",
        ]
    ),
    "topology": tune.grid_search(
        [
            ("hierarchical",),
            ("star",),
            # ("random", 0.25),
            ("random", 0.5),
            # ("random", 0.75),
        ]
    ),
}

def main() -> None:
    """Run the Grid Analysis Ray Tune example."""
    ray.init(address="auto")

    start_time = time()
    run_config = train.RunConfig(storage_path=get_default_sim_path().resolve())
    tuner = tune.Tuner(
        # tune.with_resources(eclypse_grid, {"cpu": 0.5}),
        eclypse_grid,
        param_space=search_space,
        run_config=run_config,
    )
    tuner.fit()
    print("Elapsed time: ", time() - start_time)


if __name__ == "__main__":
    main()

Infrastructure#

Infrastructure code
from __future__ import annotations

from typing import (
    Any,
    Callable,
    Optional,
    Union,
)

from .policies import (
    degrade_policy,
    kill_policy,
)

from eclypse.builders.infrastructure import (
    get_hierarchical,
    get_random,
    get_star,
)
from eclypse.graph import Infrastructure
from eclypse.graph.assets import Concave
from eclypse.graph.assets.space import (
    AssetSpace,
    Choice,
)
from eclypse.utils.constants import (
    MAX_FLOAT,
    MIN_FLOAT,
)
from eclypse.utils.types import PrimitiveType


def get_infrastructure(config) -> Infrastructure:
    update_policies = get_policy(config)
    common_config = {
        "update_policies": update_policies,
        "resource_init": "max",
        "symmetric": True,
        "seed": config["seed"],
        "node_assets": {"energy": idle_energy_consumption()},
        "include_default_assets": True,
    }
    if config["topology"][0] == "star":
        infr = get_star(
            infrastructure_id="star",
            n_clients=config["nodes"],
            **common_config,
        )
    elif config["topology"][0] == "random":
        infr = get_random(
            infrastructure_id="random",
            n=config["nodes"],
            p=config["topology"][1],
            **common_config,
        )
    elif config["topology"][0] == "hierarchical":
        infr = get_hierarchical(
            infrastructure_id="hierarchical",
            n=config["nodes"],
            **common_config,
        )
    else:
        raise ValueError(f"Unknown topology {config['topology']}")

    apply_load(infr, config["load"])
    return infr


def get_policy(config):
    if config["policy"][0] == "degrade":
        return degrade_policy(config["policy"][1], config["max_steps"])
    return kill_policy(config["policy"][1])


def apply_load(infr: Infrastructure, load: float):
    if load != 0:
        for _, attr in infr.nodes(data=True):
            for key in ["cpu", "gpu", "ram", "storage"]:
                attr[key] = int(attr[key] * (1 - load))

        for _, _, attr in infr.edges(data=True):
            for key in ["bandwidth"]:
                attr[key] = attr[key] * (1 - load)


def idle_energy_consumption(
    lower_bound: float = MAX_FLOAT,
    upper_bound: float = MIN_FLOAT,
    init_fn_or_value: Optional[
        Union[PrimitiveType, AssetSpace, Callable[[], Any]]
    ] = None,
) -> Concave:
    """Create a new additive asset for idle energy consumption.
    Args:
        lower_bound (float): The lower bound of the asset.
        upper_bound (float): The upper bound of the asset.
        init_fn_or_value (Optional[Union[PrimitiveType, AssetSpace, Callable[[], Any]]]):
            The function/scalar to initialize the idle energy consumption value.

    Returns:
        Concave: The idle energy consumption asset.
    """
    _init_fn = (
        Choice([20, 50, 80, 150]) if init_fn_or_value is None else init_fn_or_value
    )
    return Concave(lower_bound, upper_bound, _init_fn, functional=False)

Placement strategy#

Strategy code
from __future__ import annotations

import random as rnd
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Optional,
)

from eclypse.placement.strategies import PlacementStrategy
from eclypse.utils.constants import MAX_FLOAT

if TYPE_CHECKING:
    from eclypse.graph import (
        Application,
        Infrastructure,
    )
    from eclypse.placement import (
        Placement,
        PlacementView,
    )


class EnergyMinimizationStrategy(PlacementStrategy):
    """A placement strategy that minimizes energy consumption based on the allocated
    CPU, GPU, RAM, and storage."""

    def __init__(
        self,
        cpu_weight: float = 0.25,
        gpu_weight: float = 0.25,
        ram_weight: float = 0.25,
        storage_weight: float = 0.25,
    ):
        """Initializes the strategy with specific weights for the energy function.

        Args:
            cpu_weight (float): Weight of the CPU energy consumption.
            gpu_weight (float): Weight of the GPU energy consumption.
            ram_weight (float): Weight of the RAM energy consumption.
            storage_weight (float): Weight of the storage energy consumption.
        """
        self.cpu_weight = cpu_weight
        self.gpu_weight = gpu_weight
        self.ram_weight = ram_weight
        self.storage_weight = storage_weight

        self.initial_resources = None

    def _energy_consumption(
        self,
        idle: float,
        cpu: float,
        gpu: float,
        ram: float,
        storage: float,
    ) -> float:
        """Calculates the energy based on the allocated resources.

        Args:
            cpu (float): Allocated CPU.
            gpu (float): Allocated GPU.
            ram (float): Allocated RAM.
            storage (float): Allocated storage.

        Returns:
            float: The calculated energy.
        """
        return (
            idle
            + self.cpu_weight * cpu
            + self.gpu_weight * gpu
            + self.ram_weight * ram
            + self.storage_weight * storage
        )

    def place(
        self,
        infrastructure: Infrastructure,
        application: Application,
        _: Dict[str, Placement],
        placement_view: PlacementView,
    ) -> Dict[Any, Any]:
        """Places the services of an application on the infrastructure nodes to minimize
        energy consumption.

        Args:
            infrastructure (Infrastructure): The infrastructure to place the application on.
            application (Application): The application to place on the infrastructure.

        Returns:
            Dict[str, str]: A mapping of services to infrastructure nodes.
        """
        if self.initial_resources is None:
            self.initial_resources = infrastructure.nodes(data=True)
        if not self.is_feasible(infrastructure, application):
            return {}

        mapping = {}
        infrastructure_nodes = list(infrastructure.available.nodes(data=True))
        rnd.shuffle(infrastructure_nodes)

        for service, sattr in application.nodes(data=True):
            min_energy: Optional[float] = MAX_FLOAT
            best_fit: Optional[str] = None
            best_nattr: Optional[Dict[str, Any]] = None

            for node, nattr in infrastructure_nodes:
                if infrastructure.node_assets.satisfies(nattr, sattr):
                    allocated_nattr = infrastructure.node_assets.consume(
                        self.initial_resources[node], nattr
                    )
                    energy = self._energy_consumption(
                        allocated_nattr.get("energy", 0),
                        allocated_nattr.get("cpu", 0),
                        allocated_nattr.get("gpu", 0),
                        allocated_nattr.get("ram", 0),
                        allocated_nattr.get("storage", 0),
                    )

                    if energy < min_energy and energy > 0:
                        min_energy = energy
                        best_fit = node
                        best_nattr = nattr

            mapping[service] = best_fit
            if best_fit is None or best_nattr is None:
                continue

            new_res = infrastructure.node_assets.consume(best_nattr, sattr)
            infrastructure_nodes.remove((best_fit, best_nattr))
            infrastructure_nodes.append((best_fit, new_res))
        return mapping