Source code for eclypse.builders.application.video_analytics_serving.rest_services.tracking
"""REST endpoints for the tracking service."""
from eclypse.remote.communication import rest
from eclypse.remote.service import RESTService
from eclypse.utils import format_log_kv
[docs]
class TrackingService(RESTService):
"""Track detected objects across frames."""
[docs]
@rest.endpoint("/track", "POST")
def track(self, frame_id: int, stream_id: str, detections: list[str], **_):
"""Assign synthetic track identifiers to each detected object."""
self.logger.info(
"Received request | "
+ format_log_kv(
frame_id=frame_id,
stream_id=stream_id,
detections=detections,
)
)
return 200, {
"frame_id": frame_id,
"stream_id": stream_id,
"tracks": [
{"label": label, "track_id": index + 1}
for index, label in enumerate(detections)
],
}