Image prediction#

The image prediction example shows a remote application with trainer, predictor, and end-user services. It combines service implementations, custom metrics, and a degradation policy in emulation mode.

Use it when you want to understand:

  • how to attach concrete service implementations to an application,

  • how to configure a remote simulation,

  • how to collect custom metrics from remote service behaviour.

The full code lives in the examples/image_prediction directory.

Run it from the repository root with:

uv run image-prediction

Application#

Application code
from .services import (
    EndService,
    PredictorService,
    TrainerService,
)

from eclypse.graph import Application
from eclypse.utils.constants import MAX_LATENCY

image_app = Application("ImagePrediction", include_default_assets=True)

image_app.add_service(
    EndService("EndService"),
    cpu=1,
    gpu=0,
    ram=0.5,
    storage=0.5,
    availability=0.9,
    processing_time=5,
)

image_app.add_service(
    PredictorService("PredictorService"),
    cpu=1,
    gpu=1,
    ram=16.0,
    storage=64.0,
    availability=0.9,
)

image_app.add_service(
    TrainerService("TrainerService"),
    cpu=16,
    gpu=4,
    ram=16.0,
    storage=2.0,
    availability=0.9,
)


image_app.add_edge(
    "EndService",
    "PredictorService",
    latency=MAX_LATENCY,
    bandwidth=20.0,
    symmetric=True,
)

image_app.add_edge(
    "PredictorService",
    "TrainerService",
    latency=MAX_LATENCY,
    bandwidth=100.0,
    symmetric=True,
)

Simulation#

Simulation code
from .application import image_app as app
from .metrics import get_metrics
from .services.utils import (
    BASE_PATH,
    STEP_EVERY_MS,
    STEPS,
)
from .update_policy import DegradePolicy

from eclypse.builders.infrastructure import get_star
from eclypse.placement.strategies import RandomStrategy
from eclypse.simulation import (
    Simulation,
    SimulationConfig,
)


def main() -> None:
    """Run the Image Prediction example."""
    seed = 2

    sim_config = SimulationConfig(
        seed=seed,
        max_steps=STEPS,
        step_every_ms=STEP_EVERY_MS,
        include_default_metrics=False,
        events=get_metrics(),
        log_to_file=True,
        path=BASE_PATH,
        # Pass a RemoteBootstrap instance here to request specific Ray resources.
        remote=True,
    )

    sim = Simulation(
        get_star(
            infrastructure_id="IPInfr",
            n_clients=5,
            seed=seed,
            update_policies=DegradePolicy(epochs=STEPS),
            include_default_assets=True,
            resource_init="max",
            symmetric=True,
        ),
        simulation_config=sim_config,
    )
    strategy = RandomStrategy(spread=True)

    sim.register(app, strategy)
    sim.run()


if __name__ == "__main__":
    main()

Metrics#

Metrics code
import os

import psutil

from eclypse.report.metrics import metric
from eclypse.report.metrics.defaults import featured_latency
from eclypse.utils.constants import DRIVING_EVENT


@metric.service(name="img_counter", remote=True)
def get_img_counter(self):
    if self.id == "EndService":
        if self.img_counter == 0:
            self.logger.warning("No images were processed")
            return 0
        return self.img_counter
    return None


@metric.service(name="accuracy", remote=True)
def get_acc(self):
    if self.id == "EndService" and self.img_counter != 0:
        acc = self.correct / self.img_counter
        self.correct = 0
        self.img_counter = 0
        return acc
    return None


@metric.service(name="model_change", remote=True)
def is_changed(self):
    if self.id == "PredictorService":
        if self.new_model_signal:
            self.new_model_signal = False
            return 1
    return None


@metric.simulation(name="cpu_usage", activates_on=[DRIVING_EVENT, "stop"])
class CPUMonitor:

    def __init__(self):
        self.process = None

    def __call__(self, event):
        if self.process is None:
            self.process = psutil.Process(os.getpid())
        return self.process.cpu_percent(interval=0.1)


@metric.service(
    name="remote_cpu_usage",
    activates_on=[DRIVING_EVENT, "stop"],
    remote=True,
)
class RemoteCPUMonitor:

    def __init__(self):
        self.process = None

    def __call__(self, event):
        if self.process is None:
            self.process = psutil.Process(os.getpid())
        return self.process.cpu_percent(interval=0.1)


@metric.simulation(
    name="memory_usage",
    activates_on=[DRIVING_EVENT, "stop"],
)
class MemoryMonitor:

    def __init__(self):
        self.process = None

    def __call__(self, event):
        if self.process is None:
            self.process = psutil.Process(os.getpid())
        memory_usage = self.process.memory_info().rss
        return memory_usage / (1024 * 1024)  # Convert to MB


@metric.service(
    name="remote_memory_usage",
    activates_on=[DRIVING_EVENT, "stop"],
    remote=True,
)
class RemoteMemoryMonitor:

    def __init__(self):
        self.process = None

    def __call__(self, event):
        if self.process is None:
            self.process = psutil.Process(os.getpid())
        memory_usage = self.process.memory_info().rss
        return memory_usage / (1024 * 1024)


def get_metrics():
    return [
        get_img_counter,
        get_acc,
        is_changed,
        featured_latency,
        CPUMonitor(),
        MemoryMonitor(),
        RemoteCPUMonitor(),
        RemoteMemoryMonitor(),
    ]