Source code for eclypse.builders.application.anomaly_detection.rest_services.alert
"""REST endpoints for anomaly alerting."""
from eclypse.remote.communication import rest
from eclypse.remote.service import RESTService
from eclypse.utils import format_log_kv
ANOMALY_THRESHOLD = 2.5
[docs]
class AlertService(RESTService):
"""Turn anomaly scores into responses."""
[docs]
@rest.endpoint("/alert", "POST")
def alert(self, window_id: int, score: float, **_):
"""Translate the anomaly score into a compact alert response."""
self.logger.info(
"Received request | " + format_log_kv(window_id=window_id, score=score)
)
return 200, {
"window_id": window_id,
"score": score,
"status": "alert" if score >= ANOMALY_THRESHOLD else "normal",
}