Source code for eclypse.builders.application.video_analytics_serving.mpi_services.analytics
"""MPI workflow for the analytics service."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class AnalyticsService(Service):
"""Aggregate tracked events into a compact result."""
[docs]
async def step(self):
"""Handle the next tracking result emitted by the pipeline."""
await self.tracking_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True)
def tracking_request(self, _sender_id, body):
"""Summarise tracked objects for the camera gateway."""
self.logger.info("Received request | " + format_log_kv(request=body))
labels = [track["label"] for track in body["tracks"]]
return "CameraGatewayService", {
"response_type": "analytics_result",
"frame_id": body["frame_id"],
"stream_id": body["stream_id"],
"object_count": len(body["tracks"]),
"summary": ", ".join(labels),
}