Source code for eclypse.builders.application.keyword_spotting.rest_services.sensor
"""REST workflow for audio window capture."""
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class SensorService(Service):
"""Generate audio windows and start the spotting pipeline."""
[docs]
def __init__(self, service_id: str, store_step: bool = False):
"""Initialise the sensor with a rolling audio window counter."""
super().__init__(
service_id,
communication_interface="rest",
store_step=store_step,
)
self.window_id = 0
[docs]
async def step(self):
"""Drive one audio window through the REST spotting pipeline."""
self.window_id += 1
preprocess_r = await self.rest.post(
"PreprocessService/preprocess",
window_id=self.window_id,
samples=[0.1, 0.3, 0.2],
)
self.logger.info(
"Received response | "
+ format_log_kv(source="PreprocessService", body=preprocess_r.body)
)
inference_r = await self.rest.post(
"InferenceService/infer",
window_id=self.window_id,
features=preprocess_r.body["features"],
)
self.logger.info(
"Received response | "
+ format_log_kv(source="InferenceService", body=inference_r.body)
)
action_r = await self.rest.post(
"ActionService/action",
window_id=self.window_id,
keyword=inference_r.body["keyword"],
)
self.logger.info(
"Received response | "
+ format_log_kv(source="ActionService", body=action_r.body)
)
return action_r