Source code for eclypse.builders.application.anomaly_detection.mpi_services.alert
"""MPI workflow for anomaly alerting."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
ANOMALY_THRESHOLD = 2.5
[docs]
class AlertService(Service):
"""Turn anomaly scores into responses."""
[docs]
async def step(self):
"""Handle the next inference result emitted by the pipeline."""
await self.inference_request()
@mpi.exchange(receive=True, send=True)
def inference_request(self, _sender_id, body):
"""Map the anomaly score to a status and respond to the sensor."""
self.logger.info("Received request | " + format_log_kv(request=body))
return "SensorService", {
"response_type": "anomaly_response",
"window_id": body["window_id"],
"score": body["score"],
"status": "alert" if body["score"] >= ANOMALY_THRESHOLD else "normal",
}