Source code for eclypse.builders.application.keyword_spotting.mpi_services.action
"""MPI workflow for acting on a detected keyword."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class ActionService(Service):
"""Convert a keyword into a final command response."""
[docs]
async def step(self):
"""Handle the next inference result produced by the keyword model."""
await self.inference_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True)
def inference_request(self, _sender_id, body):
"""Map the detected keyword to a command for the sensor."""
self.logger.info("Received request | " + format_log_kv(request=body))
return "SensorService", {
"response_type": "keyword_response",
"window_id": body["window_id"],
"command": "wake" if body["keyword"] == "eclypse" else "idle",
}