Source code for eclypse.builders.application.deathstarbench.media_service.rest_services.movie_review
"""REST endpoints for per-movie review indexing."""
from eclypse.remote.communication import rest
from eclypse.remote.communication.rest import HTTPStatusCode
from eclypse.remote.service import RESTService
from eclypse.utils import format_log_kv
[docs]
class MovieReviewService(RESTService):
"""Index reviews by movie and serve review lookups."""
[docs]
def __init__(self, service_id: str, store_step: bool = False):
"""Initialise the movie review index."""
super().__init__(service_id, store_step=store_step)
self.by_movie: dict[str, list[dict[str, object]]] = {}
[docs]
@rest.endpoint("/write", "POST")
def write(self, review: dict, reply_to: str, **_):
"""Index the review by movie and return the compose-review result."""
self.logger.info(
"Received request | "
+ format_log_kv(review_id=review["review_id"], movie_id=review["movie_id"])
)
self.by_movie.setdefault(review["movie_id"], []).append(review)
return HTTPStatusCode.CREATED, {
"reply_to": reply_to,
"review_id": review["review_id"],
"movie_id": review["movie_id"],
"movie_title": review["movie_title"],
"status": "stored",
"review_count": len(self.by_movie[review["movie_id"]]),
}
[docs]
@rest.endpoint("/read", "GET")
def read(self, movie_id: str, **_):
"""Return the indexed reviews for a movie."""
self.logger.info("Received request | " + format_log_kv(movie_id=movie_id))
return 200, {"reviews": self.by_movie.get(movie_id, [])}