Source code for eclypse.builders.application.video_analytics_serving.mpi_services.detection

"""MPI workflow for the detection service."""

from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv


[docs] class DetectionService(Service): """Detect objects in incoming frames."""
[docs] async def step(self): """Handle the next frame produced by the camera gateway.""" await self.gateway_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True) def gateway_request(self, _sender_id, body): """Convert the incoming frame payload into detections.""" self.logger.info("Received request | " + format_log_kv(request=body)) return "TrackingService", { "request_type": "track_objects", "frame_id": body["frame_id"], "stream_id": body["stream_id"], "detections": body["objects"], }