Image prediction#
The image prediction example shows a remote application with trainer, predictor, and end-user services. It combines service implementations, custom metrics, and a degradation policy in emulation mode.
Use it when you want to understand:
how to attach concrete service implementations to an application,
how to configure a remote simulation,
how to collect custom metrics from remote service behaviour.
The full code lives in the examples/image_prediction directory.
Run it from the repository root with:
uv run image-prediction
Application#
Application code
from .services import (
EndService,
PredictorService,
TrainerService,
)
from eclypse.graph import Application
from eclypse.utils.constants import MAX_LATENCY
image_app = Application("ImagePrediction", include_default_assets=True)
image_app.add_service(
EndService("EndService"),
cpu=1,
gpu=0,
ram=0.5,
storage=0.5,
availability=0.9,
processing_time=5,
)
image_app.add_service(
PredictorService("PredictorService"),
cpu=1,
gpu=1,
ram=16.0,
storage=64.0,
availability=0.9,
)
image_app.add_service(
TrainerService("TrainerService"),
cpu=16,
gpu=4,
ram=16.0,
storage=2.0,
availability=0.9,
)
image_app.add_edge(
"EndService",
"PredictorService",
latency=MAX_LATENCY,
bandwidth=20.0,
symmetric=True,
)
image_app.add_edge(
"PredictorService",
"TrainerService",
latency=MAX_LATENCY,
bandwidth=100.0,
symmetric=True,
)
Simulation#
Simulation code
from .application import image_app as app
from .metrics import get_metrics
from .services.utils import (
BASE_PATH,
STEP_EVERY_MS,
STEPS,
)
from .update_policy import DegradePolicy
from eclypse.builders.infrastructure import get_star
from eclypse.placement.strategies import RandomStrategy
from eclypse.simulation import (
Simulation,
SimulationConfig,
)
def main() -> None:
"""Run the Image Prediction example."""
seed = 2
sim_config = SimulationConfig(
seed=seed,
max_steps=STEPS,
step_every_ms=STEP_EVERY_MS,
include_default_metrics=False,
events=get_metrics(),
log_to_file=True,
path=BASE_PATH,
# Pass a RemoteBootstrap instance here to request specific Ray resources.
remote=True,
)
sim = Simulation(
get_star(
infrastructure_id="IPInfr",
n_clients=5,
seed=seed,
update_policies=DegradePolicy(epochs=STEPS),
include_default_assets=True,
resource_init="max",
symmetric=True,
),
simulation_config=sim_config,
)
strategy = RandomStrategy(spread=True)
sim.register(app, strategy)
sim.run()
if __name__ == "__main__":
main()
Metrics#
Metrics code
import os
import psutil
from eclypse.report.metrics import metric
from eclypse.report.metrics.defaults import featured_latency
from eclypse.utils.constants import DRIVING_EVENT
@metric.service(name="img_counter", remote=True)
def get_img_counter(self):
if self.id == "EndService":
if self.img_counter == 0:
self.logger.warning("No images were processed")
return 0
return self.img_counter
return None
@metric.service(name="accuracy", remote=True)
def get_acc(self):
if self.id == "EndService" and self.img_counter != 0:
acc = self.correct / self.img_counter
self.correct = 0
self.img_counter = 0
return acc
return None
@metric.service(name="model_change", remote=True)
def is_changed(self):
if self.id == "PredictorService":
if self.new_model_signal:
self.new_model_signal = False
return 1
return None
@metric.simulation(name="cpu_usage", activates_on=[DRIVING_EVENT, "stop"])
class CPUMonitor:
def __init__(self):
self.process = None
def __call__(self, event):
if self.process is None:
self.process = psutil.Process(os.getpid())
return self.process.cpu_percent(interval=0.1)
@metric.service(
name="remote_cpu_usage",
activates_on=[DRIVING_EVENT, "stop"],
remote=True,
)
class RemoteCPUMonitor:
def __init__(self):
self.process = None
def __call__(self, event):
if self.process is None:
self.process = psutil.Process(os.getpid())
return self.process.cpu_percent(interval=0.1)
@metric.simulation(
name="memory_usage",
activates_on=[DRIVING_EVENT, "stop"],
)
class MemoryMonitor:
def __init__(self):
self.process = None
def __call__(self, event):
if self.process is None:
self.process = psutil.Process(os.getpid())
memory_usage = self.process.memory_info().rss
return memory_usage / (1024 * 1024) # Convert to MB
@metric.service(
name="remote_memory_usage",
activates_on=[DRIVING_EVENT, "stop"],
remote=True,
)
class RemoteMemoryMonitor:
def __init__(self):
self.process = None
def __call__(self, event):
if self.process is None:
self.process = psutil.Process(os.getpid())
memory_usage = self.process.memory_info().rss
return memory_usage / (1024 * 1024)
def get_metrics():
return [
get_img_counter,
get_acc,
is_changed,
featured_latency,
CPUMonitor(),
MemoryMonitor(),
RemoteCPUMonitor(),
RemoteMemoryMonitor(),
]