Source code for eclypse.builders.application.deathstarbench.hotel_reservation.mpi_services.reservation
"""MPI workflow for reservation orchestration."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class ReservationService(Service):
"""Reserve a hotel room and coordinate payment."""
[docs]
def __init__(self, service_id: str, store_step: bool = False):
"""Initialise the reservation orchestrator state."""
super().__init__(service_id, store_step=store_step)
self.pending_reservation: dict[str, object] = {}
[docs]
async def step(self):
"""Create a reservation and wait for the payment response."""
await self.frontend_request()
return await self.payment_request()
@mpi.exchange(receive=True, send=True)
def frontend_request(self, _sender_id, body):
"""Store the reservation context and trigger payment."""
self.logger.info("Received request | " + format_log_kv(request=body))
self.pending_reservation = {
"hotel": body["hotel"],
"user": body["user"],
"reservation_id": "rsv-2001",
}
return "PaymentService", {
"request_type": "charge_card",
"reservation_id": "rsv-2001",
"amount": body["hotel"]["price"],
}
@mpi.exchange(receive=True, send=True)
def payment_request(self, _sender_id, body):
"""Return the completed reservation once payment succeeds."""
self.logger.info("Received request | " + format_log_kv(request=body))
return "FrontendService", {
"response_type": "reservation_response",
"reservation_id": self.pending_reservation["reservation_id"],
"hotel_name": self.pending_reservation["hotel"]["name"],
"transaction_id": body["transaction_id"],
"status": body["status"],
}