Source code for eclypse.builders.application.crud_api.mpi_services.item
"""MPI workflow for item management."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class ItemService(Service):
"""Create an item and emit an audit event."""
[docs]
def __init__(self, service_id: str, store_step: bool = False):
"""Initialise the item store used by the CRUD workflow."""
super().__init__(service_id, store_step=store_step)
self.items: dict[str, dict[str, str]] = {}
[docs]
async def step(self):
"""Create the item, then wait for the audit confirmation."""
await self.gateway_request()
return await self.audit_request()
@mpi.exchange(receive=True, send=True)
def gateway_request(self, _sender_id, body):
"""Store the item and forward an audit event."""
self.logger.info("Received request | " + format_log_kv(request=body))
item = body["item"]
self.items[item["id"]] = item
return "AuditService", {
"request_type": "record_event",
"item_id": item["id"],
"action": "create",
}
@mpi.exchange(receive=True, send=True)
def audit_request(self, _sender_id, body):
"""Return the updated item list after the audit succeeds."""
self.logger.info("Received request | " + format_log_kv(request=body))
return "GatewayService", {
"response_type": "crud_response",
"status": body["status"],
"items": list(self.items.values()),
}