Source code for eclypse.builders.application.deathstarbench.media_service.mpi_services.user_review
"""MPI workflow for per-user review indexing."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class UserReviewService(Service):
"""Index reviews by author."""
[docs]
def __init__(self, service_id: str, store_step: bool = False):
"""Initialise the user review index."""
super().__init__(service_id, store_step=store_step)
self.by_user: dict[int, list[int]] = {}
[docs]
async def step(self):
"""Handle the next user-review request."""
await self.handle_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True)
def handle_request(self, _sender_id, body):
"""Index the review by user and forward it to the movie review index."""
self.logger.info("Received request | " + format_log_kv(request=body))
self.by_user.setdefault(body["review"]["user"]["user_id"], []).append(
body["review"]["review_id"],
)
return "MovieReviewService", {
**body,
"request_type": "write_movie_review",
}