Source code for eclypse.builders.application.anomaly_detection.mpi_services.inference

"""MPI workflow for anomaly inference."""

from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv


[docs] class InferenceService(Service): """Compute a simple anomaly score."""
[docs] async def step(self): """Handle the next feature payload emitted by the extractor.""" await self.feature_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True) def feature_request(self, _sender_id, body): """Estimate an anomaly score from the extracted features.""" self.logger.info("Received request | " + format_log_kv(request=body)) score = body["features"]["max"] / max(body["features"]["mean"], 0.1) return "AlertService", { "request_type": "emit_alert", "window_id": body["window_id"], "score": round(score, 2), }