Source code for eclypse.builders.application.anomaly_detection.mpi_services.sensor

"""MPI workflow for telemetry capture."""

from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv


[docs] class SensorService(Service): """Generate telemetry windows and start the anomaly pipeline."""
[docs] def __init__(self, service_id: str, store_step: bool = False): """Initialise the sensor with a rolling telemetry window counter.""" super().__init__(service_id, store_step=store_step) self.window_id = 0
[docs] async def step(self): """Create the next telemetry window and wait for the alert result.""" self.window_id += 1 await self.capture_window() response = await self.mpi.recv() self.logger.info("Received response | " + format_log_kv(response=response)) return response
@mpi.exchange(send=True) def capture_window(self): """Send a synthetic telemetry window to the feature extractor.""" return "FeatureService", { "request_type": "extract_features", "window_id": self.window_id, "samples": [0.8, 1.2, 4.5], }