Source code for eclypse.builders.application.video_analytics_serving.rest_services.analytics

"""REST endpoints for the analytics service."""

from eclypse.remote.communication import rest
from eclypse.remote.service import RESTService
from eclypse.utils import format_log_kv


[docs] class AnalyticsService(RESTService): """Aggregate tracked events into a compact result."""
[docs] @rest.endpoint("/analyse", "POST") def analyse(self, frame_id: int, stream_id: str, tracks: list[dict], **_): """Summarise tracked objects for the requested frame.""" self.logger.info( "Received request | " + format_log_kv(frame_id=frame_id, stream_id=stream_id, tracks=tracks) ) labels = [track["label"] for track in tracks] return 200, { "frame_id": frame_id, "stream_id": stream_id, "object_count": len(tracks), "summary": ", ".join(labels), }