Echo#
The Echo example showcases a small service-based application in which messages are exchanged repeatedly among neighbouring services.
Use it when you want to understand:
how to build an application and an infrastructure from Python code,
how to run a local simulation end to end,
how MPI-style communication behaves in a simple topology.
The full code lives in the examples/echo directory.
Run it from the repository root with:
uv run echo
Application#
The Echo Application consists of several identical services, each of which receives a message and echoes it back to all of its neighbors. This symmetrical architecture ensures that each service behaves identically.
As each service echoes messages both by broadcasting to all neighbors and unicasting individually to each neighbour, it is useful to compare the expected results of these two communication methods. Broadcasting is expected to be faster than unicasting, as it involves sending messages to all neighbors simultaneously. Unicasting, on the other hand, requires sending a separate message to each neighbor, which can result in a longer total communication time.
Application code
from .echo import EchoService
from eclypse.graph import Application
# Creating an instance of the EchoApp class
echo_app = Application("EchoApp", include_default_assets=True)
echo_app.add_service(
EchoService("Gateway"),
cpu=1,
gpu=0,
ram=0.5,
storage=0.5,
availability=0.9,
processing_time=0.1,
)
echo_app.add_service(
EchoService("SecurityService"),
cpu=2,
gpu=0,
ram=4.0,
storage=2.0,
availability=0.8,
processing_time=2.0,
)
echo_app.add_service(
EchoService("LightingService"),
cpu=1,
gpu=0,
ram=2.0,
storage=5.0,
availability=0.8,
processing_time=1.0,
)
echo_app.add_service(
EchoService("ClimateControlService"),
cpu=2,
gpu=0,
ram=3.0,
storage=8.0,
availability=0.85,
processing_time=1.5,
)
echo_app.add_service(
EchoService("EntertainmentService"),
cpu=3,
gpu=1,
ram=4.0,
storage=10.0,
availability=0.9,
processing_time=5.0,
)
echo_app.add_edge(
"Gateway",
"LightingService",
latency=100.0,
bandwidth=20.0,
symmetric=True,
)
echo_app.add_edge(
"Gateway",
"ClimateControlService",
latency=100.0,
bandwidth=10.0,
symmetric=True,
)
echo_app.add_edge(
"Gateway",
"SecurityService",
latency=50.0,
bandwidth=5.0,
symmetric=True,
)
echo_app.add_edge(
"SecurityService",
"EntertainmentService",
latency=50.0,
bandwidth=10.0,
symmetric=True,
)
Echo Service#
The EchoService class is the runtime component responsible for sending
messages to neighbour services and measuring the difference between unicast and
broadcast communication.
Service code
1import asyncio
2import time
3
4from eclypse.remote.service import Service
5
6
7class EchoService(Service):
8 def __init__(self, id: str):
9 super().__init__(id, store_step=True)
10 self.i = 0
11
12 async def step(self):
13 message = {"message": f"Hello from {self.id}!"}
14
15 neigh = await self.mpi.get_neighbors()
16 expected_wait_unicast = 0
17 t_init_unicast = time.time()
18 for n in neigh:
19 req = await self.mpi.send(n, message)
20 expected_wait_unicast += req.route.cost(message) if req.route else 0
21 t_final_unicast = time.time()
22 t_unicast = t_final_unicast - t_init_unicast
23 self.logger.info(
24 f"Service {self.id}, {self.i} - Unicasts in: {t_unicast}, expected = {expected_wait_unicast}"
25 )
26 t_init_broadcast = time.time()
27 req = await self.mpi.send(neigh, message)
28 expected_wait_broadcast = max(
29 [r.cost(message) for r in req.routes if r], default=0
30 )
31 t_final_broadcast = time.time()
32 t_broadcast = t_final_broadcast - t_init_broadcast
33 self.logger.info(
34 f"Service {self.id}, {self.i} - Broadcasts in: {t_broadcast}, expected = {expected_wait_broadcast}"
35 )
36 self.i += 1
37 await asyncio.sleep(1)
38 return (
39 self.i,
40 t_unicast,
41 expected_wait_unicast,
42 t_broadcast,
43 expected_wait_broadcast,
44 )
The service inherits from
Service and implements
step(), which is the unit of
behaviour executed during emulation.
Infrastructure#
The Echo application is deployed on a small infrastructure made of heterogeneous nodes connected through links with different latency and bandwidth values.
Infrastructure code
from .update_policy import random_update
from eclypse.graph import Infrastructure
# Creating an instance of the Infrastructure class
def get_infrastructure(seed: int = 2) -> Infrastructure:
echo_infra = Infrastructure(
"EchoInfrastructure",
update_policies=random_update,
include_default_assets=True,
seed=seed,
)
echo_infra.add_node("CloudServer")
echo_infra.add_node("EdgeGateway")
echo_infra.add_node("IoTDevice")
echo_infra.add_node("CloudStorage")
echo_infra.add_node("EdgeSensor")
echo_infra.add_edge(
"CloudServer", "EdgeGateway", latency=5.0, bandwidth=80.0, symmetric=True
)
echo_infra.add_edge(
"EdgeGateway", "IoTDevice", latency=8.0, bandwidth=50.0, symmetric=True
)
echo_infra.add_edge(
"IoTDevice", "CloudStorage", latency=15.0, bandwidth=100.0, symmetric=True
)
echo_infra.add_edge(
"CloudStorage", "EdgeSensor", latency=9.0, bandwidth=70.0, symmetric=True
)
return echo_infra
The infrastructure is also updated at each iteration through a graph update policy that mutates both nodes and links to simulate changing runtime conditions.
Update policy code
import random as rnd
from eclypse.graph import AssetGraph
def random_update(graph: AssetGraph):
for _, resources in graph.nodes.data():
if rnd.random() < 0.02:
resources["availability"] = 0
elif rnd.random() < 0.5 and resources["availability"] == 0:
resources["availability"] = 1
else:
resources["cpu"] = round(max(0, resources["cpu"] * rnd.uniform(0.95, 1.05)))
resources["gpu"] = round(max(0, resources["gpu"] * rnd.uniform(0.9, 1.1)))
resources["ram"] = round(max(0, resources["ram"] * rnd.uniform(0.8, 1.2)))
resources["storage"] = round(
max(0, resources["storage"] * rnd.uniform(0.9, 1.1))
)
resources["availability"] = min(
1, max(0, resources["availability"] * rnd.uniform(0.995, 1.005))
)
for _, _, resources in graph.edges.data():
resources["latency"] = round(
max(0, resources["latency"] * rnd.uniform(0.9, 1.1))
)
resources["bandwidth"] = round(
max(0, resources["bandwidth"] * rnd.uniform(0.95, 1.05))
)
Simulation#
The example configures a reproducible run with metrics enabled and stores the results under the default simulation path.
Simulation code
from .application import echo_app as app
from .infrastructure import get_infrastructure
from eclypse.placement.strategies import RandomStrategy
from eclypse.simulation import (
Simulation,
SimulationConfig,
)
from eclypse.utils.defaults import get_default_sim_path
def main() -> None:
"""Run the Echo example."""
seed = 2
sim_config = SimulationConfig(
seed=seed,
max_steps=30,
step_every_ms=500,
log_to_file=True,
path=get_default_sim_path() / "EchoApp",
# remote=True,
# log_level="TRACE",
include_default_metrics=True,
)
sim = Simulation(
get_infrastructure(seed=seed),
simulation_config=sim_config,
)
sim.register(app, RandomStrategy(seed=seed))
sim.run()
print(sim.report.application())
if __name__ == "__main__":
main()