Echo#

The Echo example showcases a small service-based application in which messages are exchanged repeatedly among neighbouring services.

Use it when you want to understand:

  • how to build an application and an infrastructure from Python code,

  • how to run a local simulation end to end,

  • how MPI-style communication behaves in a simple topology.

The full code lives in the examples/echo directory.

Run it from the repository root with:

uv run echo

Application#

The Echo Application consists of several identical services, each of which receives a message and echoes it back to all of its neighbors. This symmetrical architecture ensures that each service behaves identically.

As each service echoes messages both by broadcasting to all neighbors and unicasting individually to each neighbour, it is useful to compare the expected results of these two communication methods. Broadcasting is expected to be faster than unicasting, as it involves sending messages to all neighbors simultaneously. Unicasting, on the other hand, requires sending a separate message to each neighbor, which can result in a longer total communication time.

Application code
from .echo import EchoService

from eclypse.graph import Application

# Creating an instance of the EchoApp class
echo_app = Application("EchoApp", include_default_assets=True)

echo_app.add_service(
    EchoService("Gateway"),
    cpu=1,
    gpu=0,
    ram=0.5,
    storage=0.5,
    availability=0.9,
    processing_time=0.1,
)

echo_app.add_service(
    EchoService("SecurityService"),
    cpu=2,
    gpu=0,
    ram=4.0,
    storage=2.0,
    availability=0.8,
    processing_time=2.0,
)

echo_app.add_service(
    EchoService("LightingService"),
    cpu=1,
    gpu=0,
    ram=2.0,
    storage=5.0,
    availability=0.8,
    processing_time=1.0,
)

echo_app.add_service(
    EchoService("ClimateControlService"),
    cpu=2,
    gpu=0,
    ram=3.0,
    storage=8.0,
    availability=0.85,
    processing_time=1.5,
)


echo_app.add_service(
    EchoService("EntertainmentService"),
    cpu=3,
    gpu=1,
    ram=4.0,
    storage=10.0,
    availability=0.9,
    processing_time=5.0,
)

echo_app.add_edge(
    "Gateway",
    "LightingService",
    latency=100.0,
    bandwidth=20.0,
    symmetric=True,
)

echo_app.add_edge(
    "Gateway",
    "ClimateControlService",
    latency=100.0,
    bandwidth=10.0,
    symmetric=True,
)

echo_app.add_edge(
    "Gateway",
    "SecurityService",
    latency=50.0,
    bandwidth=5.0,
    symmetric=True,
)

echo_app.add_edge(
    "SecurityService",
    "EntertainmentService",
    latency=50.0,
    bandwidth=10.0,
    symmetric=True,
)

Echo Service#

The EchoService class is the runtime component responsible for sending messages to neighbour services and measuring the difference between unicast and broadcast communication.

Service code
 1import asyncio
 2import time
 3
 4from eclypse.remote.service import Service
 5
 6
 7class EchoService(Service):
 8    def __init__(self, id: str):
 9        super().__init__(id, store_step=True)
10        self.i = 0
11
12    async def step(self):
13        message = {"message": f"Hello from {self.id}!"}
14
15        neigh = await self.mpi.get_neighbors()
16        expected_wait_unicast = 0
17        t_init_unicast = time.time()
18        for n in neigh:
19            req = await self.mpi.send(n, message)
20            expected_wait_unicast += req.route.cost(message) if req.route else 0
21        t_final_unicast = time.time()
22        t_unicast = t_final_unicast - t_init_unicast
23        self.logger.info(
24            f"Service {self.id}, {self.i} -  Unicasts in: {t_unicast}, expected = {expected_wait_unicast}"
25        )
26        t_init_broadcast = time.time()
27        req = await self.mpi.send(neigh, message)
28        expected_wait_broadcast = max(
29            [r.cost(message) for r in req.routes if r], default=0
30        )
31        t_final_broadcast = time.time()
32        t_broadcast = t_final_broadcast - t_init_broadcast
33        self.logger.info(
34            f"Service {self.id}, {self.i} - Broadcasts in: {t_broadcast}, expected = {expected_wait_broadcast}"
35        )
36        self.i += 1
37        await asyncio.sleep(1)
38        return (
39            self.i,
40            t_unicast,
41            expected_wait_unicast,
42            t_broadcast,
43            expected_wait_broadcast,
44        )

The service inherits from Service and implements step(), which is the unit of behaviour executed during emulation.

Infrastructure#

The Echo application is deployed on a small infrastructure made of heterogeneous nodes connected through links with different latency and bandwidth values.

Infrastructure code
from .update_policy import random_update

from eclypse.graph import Infrastructure


# Creating an instance of the Infrastructure class
def get_infrastructure(seed: int = 2) -> Infrastructure:
    echo_infra = Infrastructure(
        "EchoInfrastructure",
        update_policies=random_update,
        include_default_assets=True,
        seed=seed,
    )
    echo_infra.add_node("CloudServer")
    echo_infra.add_node("EdgeGateway")
    echo_infra.add_node("IoTDevice")
    echo_infra.add_node("CloudStorage")
    echo_infra.add_node("EdgeSensor")

    echo_infra.add_edge(
        "CloudServer", "EdgeGateway", latency=5.0, bandwidth=80.0, symmetric=True
    )
    echo_infra.add_edge(
        "EdgeGateway", "IoTDevice", latency=8.0, bandwidth=50.0, symmetric=True
    )
    echo_infra.add_edge(
        "IoTDevice", "CloudStorage", latency=15.0, bandwidth=100.0, symmetric=True
    )
    echo_infra.add_edge(
        "CloudStorage", "EdgeSensor", latency=9.0, bandwidth=70.0, symmetric=True
    )

    return echo_infra

The infrastructure is also updated at each iteration through a graph update policy that mutates both nodes and links to simulate changing runtime conditions.

Update policy code
import random as rnd

from eclypse.graph import AssetGraph


def random_update(graph: AssetGraph):
    for _, resources in graph.nodes.data():
        if rnd.random() < 0.02:
            resources["availability"] = 0
        elif rnd.random() < 0.5 and resources["availability"] == 0:
            resources["availability"] = 1
        else:
            resources["cpu"] = round(max(0, resources["cpu"] * rnd.uniform(0.95, 1.05)))
            resources["gpu"] = round(max(0, resources["gpu"] * rnd.uniform(0.9, 1.1)))
            resources["ram"] = round(max(0, resources["ram"] * rnd.uniform(0.8, 1.2)))
            resources["storage"] = round(
                max(0, resources["storage"] * rnd.uniform(0.9, 1.1))
            )
            resources["availability"] = min(
                1, max(0, resources["availability"] * rnd.uniform(0.995, 1.005))
            )

    for _, _, resources in graph.edges.data():
        resources["latency"] = round(
            max(0, resources["latency"] * rnd.uniform(0.9, 1.1))
        )
        resources["bandwidth"] = round(
            max(0, resources["bandwidth"] * rnd.uniform(0.95, 1.05))
        )

Simulation#

The example configures a reproducible run with metrics enabled and stores the results under the default simulation path.

Simulation code
from .application import echo_app as app
from .infrastructure import get_infrastructure

from eclypse.placement.strategies import RandomStrategy
from eclypse.simulation import (
    Simulation,
    SimulationConfig,
)
from eclypse.utils.defaults import get_default_sim_path


def main() -> None:
    """Run the Echo example."""
    seed = 2
    sim_config = SimulationConfig(
        seed=seed,
        max_steps=30,
        step_every_ms=500,
        log_to_file=True,
        path=get_default_sim_path() / "EchoApp",
        # remote=True,
        # log_level="TRACE",
        include_default_metrics=True,
    )

    sim = Simulation(
        get_infrastructure(seed=seed),
        simulation_config=sim_config,
    )

    sim.register(app, RandomStrategy(seed=seed))
    sim.run()
    print(sim.report.application())


if __name__ == "__main__":
    main()