Source code for eclypse.builders.application.thumbnailer.mpi_services.notification
"""MPI workflow for upload notification."""
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
[docs]
class NotificationService(Service):
"""Return the final thumbnail location."""
[docs]
async def step(self):
"""Handle the next storage confirmation emitted by the pipeline."""
await self.storage_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True)
def storage_request(self, _sender_id, body):
"""Return the final storage location to the upload service."""
self.logger.info("Received request | " + format_log_kv(request=body))
return "UploadService", {
"response_type": "thumbnail_ready",
"image_id": body["image_id"],
"uri": body["uri"],
"status": "stored",
}