Source code for eclypse.builders.application.video_analytics_serving.rest_services.camera_gateway

"""REST workflow for the camera gateway service."""

from eclypse.remote.service import Service
from eclypse.utils import format_log_kv


[docs] class CameraGatewayService(Service): """Entry-point service that starts the video analytics pipeline."""
[docs] def __init__(self, service_id: str, store_step: bool = False): """Initialise the gateway with a rolling frame counter.""" super().__init__( service_id, communication_interface="rest", store_step=store_step, ) self.frame_id = 0
[docs] async def step(self): """Drive one frame through the REST analytics pipeline.""" self.frame_id += 1 detection_r = await self.rest.post( "DetectionService/detect", frame_id=self.frame_id, stream_id="camera-a", objects=["person", "forklift"], ) self.logger.info( "Received response | " + format_log_kv(source="DetectionService", body=detection_r.body) ) tracking_r = await self.rest.post( "TrackingService/track", frame_id=self.frame_id, stream_id="camera-a", detections=detection_r.body["detections"], ) self.logger.info( "Received response | " + format_log_kv(source="TrackingService", body=tracking_r.body) ) analytics_r = await self.rest.post( "AnalyticsService/analyse", frame_id=self.frame_id, stream_id="camera-a", tracks=tracking_r.body["tracks"], ) self.logger.info( "Received response | " + format_log_kv(source="AnalyticsService", body=analytics_r.body) ) return analytics_r