Source code for eclypse.builders.application.crud_api.mpi_services.auth
"""MPI workflow for authentication."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class AuthService(Service):
"""Validate an API key."""
[docs]
async def step(self):
"""Handle the next authentication request from the gateway."""
await self.gateway_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True)
def gateway_request(self, sender_id, body):
"""Authorise the request and return a synthetic token."""
self.logger.info("Received request | " + format_log_kv(request=body))
return sender_id, {
"response_type": "auth_response",
"token": f"token:{body['api_key']}",
"status": "authorized",
}