Source code for eclypse.builders.application.deathstarbench.media_service.mpi_services.movie_review
"""MPI workflow for per-movie review indexing."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class MovieReviewService(Service):
"""Index reviews by movie and serve review lookups."""
[docs]
def __init__(self, service_id: str, store_step: bool = False):
"""Initialise the movie review index."""
super().__init__(service_id, store_step=store_step)
self.by_movie: dict[str, list[dict[str, object]]] = {}
[docs]
async def step(self):
"""Handle the next movie-review request."""
await self.handle_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True)
def handle_request(self, sender_id, body):
"""Index reviews by movie or return stored movie reviews."""
self.logger.info("Received request | " + format_log_kv(request=body))
if body["request_type"] == "read_movie_reviews":
return sender_id, {
"response_type": "read_movie_reviews_response",
"reviews": self.by_movie.get(body["movie_id"], []),
}
self.by_movie.setdefault(body["review"]["movie_id"], []).append(body["review"])
return body["reply_to"], {
"response_type": "compose_review_response",
"review_id": body["review"]["review_id"],
"movie_id": body["review"]["movie_id"],
"movie_title": body["review"]["movie_title"],
"status": "stored",
"review_count": len(self.by_movie[body["review"]["movie_id"]]),
}