Source code for eclypse.builders.application.video_analytics_serving.mpi_services.tracking
"""MPI workflow for the tracking service."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class TrackingService(Service):
"""Track detected objects across frames."""
[docs]
async def step(self):
"""Handle the next detection payload produced by the detector."""
await self.detection_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True)
def detection_request(self, _sender_id, body):
"""Assign synthetic track identifiers to each detected object."""
self.logger.info("Received request | " + format_log_kv(request=body))
tracks = [
{"label": label, "track_id": index + 1}
for index, label in enumerate(body["detections"])
]
return "AnalyticsService", {
"request_type": "aggregate_events",
"frame_id": body["frame_id"],
"stream_id": body["stream_id"],
"tracks": tracks,
}