Source code for eclypse.builders.application.video_analytics_serving.mpi_services.camera_gateway

"""MPI workflow for the camera gateway service."""

from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv


[docs] class CameraGatewayService(Service): """Entry-point service that starts the video analytics pipeline."""
[docs] def __init__(self, service_id: str, store_step: bool = False): """Initialise the gateway with a rolling frame counter.""" super().__init__(service_id, store_step=store_step) self.frame_id = 0
[docs] async def step(self): """Capture the next frame and wait for the analytics summary.""" self.frame_id += 1 await self.start_pipeline() response = await self.mpi.recv() self.logger.info("Received response | " + format_log_kv(response=response)) return response
@mpi.exchange(send=True) def start_pipeline(self): """Send a synthetic frame to the detection service.""" return "DetectionService", { "request_type": "analyse_frame", "frame_id": self.frame_id, "stream_id": "camera-a", "objects": ["person", "forklift"], }