Source code for eclypse.builders.application.anomaly_detection.mpi_services.feature
"""MPI workflow for feature extraction."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class FeatureService(Service):
"""Extract simple features from telemetry."""
[docs]
async def step(self):
"""Handle the next telemetry window produced by the sensor."""
await self.sensor_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True)
def sensor_request(self, _sender_id, body):
"""Compute compact statistics for the received telemetry samples."""
self.logger.info("Received request | " + format_log_kv(request=body))
max_sample = max(body["samples"])
mean_sample = sum(body["samples"]) / len(body["samples"])
return "InferenceService", {
"request_type": "score_window",
"window_id": body["window_id"],
"features": {"max": max_sample, "mean": mean_sample},
}