Source code for eclypse.builders.application.anomaly_detection.rest_services.sensor
"""REST workflow for telemetry capture."""
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class SensorService(Service):
"""Generate telemetry windows and start the anomaly pipeline."""
[docs]
def __init__(self, service_id: str, store_step: bool = False):
"""Initialise the sensor with a rolling telemetry window counter."""
super().__init__(
service_id,
communication_interface="rest",
store_step=store_step,
)
self.window_id = 0
[docs]
async def step(self):
"""Drive one telemetry window through the REST inference pipeline."""
self.window_id += 1
feature_r = await self.rest.post(
"FeatureService/features",
window_id=self.window_id,
samples=[0.8, 1.2, 4.5],
)
self.logger.info(
"Received response | "
+ format_log_kv(source="FeatureService", body=feature_r.body)
)
inference_r = await self.rest.post(
"InferenceService/score",
window_id=self.window_id,
features=feature_r.body["features"],
)
self.logger.info(
"Received response | "
+ format_log_kv(source="InferenceService", body=inference_r.body)
)
alert_r = await self.rest.post(
"AlertService/alert",
window_id=self.window_id,
score=inference_r.body["score"],
)
self.logger.info(
"Received response | "
+ format_log_kv(source="AlertService", body=alert_r.body)
)
return alert_r