Source code for eclypse.remote.communication.request

# pylint: disable=protected-access
"""Module for Request class, which is the unit of communication between services."""

from __future__ import annotations

import asyncio
from datetime import datetime
from functools import cached_property
from typing import (
    TYPE_CHECKING,
    Any,
)

from eclypse.remote import ray_backend

if TYPE_CHECKING:
    from asyncio import Future
    from collections.abc import (
        Generator,
    )
    from datetime import timedelta

    from ray import ObjectRef

    from eclypse.remote._node import RemoteNode

    from .interface import EclypseCommunicationInterface
    from .route import Route


[docs] class RouteNotFoundError(RuntimeError): """Raised when no route can be found for a remote service request."""
[docs] def __init__(self, recipient_id: str): """Create an error describing the recipient that could not be reached.""" self.recipient_id = recipient_id super().__init__(f"Route to {recipient_id} not found")
[docs] class EclypseRequest: """Class for an Eclypse request."""
[docs] def __init__( self, recipient_ids: list[str], data: dict[str, Any], _comm: EclypseCommunicationInterface, timestamp: datetime | None = None, ): """Create a new EclypseRequest. Args: recipient_ids (list[str]): The ids of the recipients. data (dict[str, Any]): The data of the request. _comm (EclypseCommunicationInterface): The communication interface. timestamp (datetime | None, optional): The timestamp of the request. Defaults to None. """ self._data = data self._timestamp = timestamp or datetime.now() self._recipient_ids: list[str] = recipient_ids self._routes: list[Future[Route]] = [ asyncio.wrap_future( _comm.request_route(recipient_id).future(), # type: ignore[attr-defined] loop=_comm.service.node.engine_loop, ) for recipient_id in self._recipient_ids ] self._futures = [ asyncio.wrap_future( asyncio.run_coroutine_threadsafe( _process_request( self.data, self._ref_args, route, recipient_id, _comm ), loop=_comm.service.node.engine_loop, ) ) for route, recipient_id in zip( self._routes, self._recipient_ids, strict=False ) ]
def __await__(self) -> Generator[Any, None, EclypseRequest]: """Await the request to complete. Returns: Awaitable: The result of the request. """ async def wrapper(obj: EclypseRequest) -> EclypseRequest: await asyncio.gather(*obj._futures) # pylint: disable=protected-access return obj return wrapper(self).__await__() @property def data(self) -> dict[str, Any]: """Get the data of the request. Returns: dict[str, Any]: The data. """ return self._data @property def timestamp(self) -> datetime: """Get the timestamp of the request. Returns: datetime: The timestamp. """ return self._timestamp @property def recipient_ids(self) -> list[str]: """The ids of the recipients. Returns: list[str]: The ids. """ return self._recipient_ids @property def routes(self) -> list[Route | None]: """Wait for the routes to be computed. This method can be awaited explicitly to compute the routes to the recipients. Otherwise, it is awaited implicitly when the `EclypseRequest` object is awaited to process the request. """ return [(r.result() if r.done() else None) for r in self._routes] @property def responses(self) -> list[Any | None]: """Wait for the responses to the MPI request. This method can be called explicitly to wait for the responses to the EclypseRequest. Otherwise, it is called implicitly when the `EclypseRequest` object is awaited to process the request. """ return [(f.result()["future"] if f.done() else None) for f in self._futures] @property def elapsed_times(self) -> list[timedelta | None]: """The elapsed times until the responses were received. Returns: list[timedelta]: The elapsed times until the responses were received. If a response is not yet available, a timedelta of 0 is returned for the corresponding recipient. """ times: list[timedelta | None] = [] for r in self._futures: if r.done(): times.append(r.result()["timestamp"] - self.timestamp) else: times.append(None) return times @cached_property def _ref_args(self) -> dict[str, ObjectRef]: return {k: ray_backend.put(v) for k, v in self.data.items()}
async def _process_request( args: dict[str, Any], args_ref: dict[str, ObjectRef], route: Future[Route], recipient_id: str, _comm: EclypseCommunicationInterface, ) -> dict[str, Any | datetime]: _route = route.result() if route.done() else await route if _route is None: raise RouteNotFoundError(recipient_id) if _route.no_hop: future = _comm.service.node.service_comm_entrypoint( _route, _comm.__class__, **args, ) else: infrastructure_id = _comm.service.infrastructure_id actor_name = f"{infrastructure_id}/{_route.recipient_node_id}" handle: RemoteNode = _comm.service.node.get_actor(actor_name) await asyncio.sleep(_route.cost(args)) future = handle.service_comm_entrypoint.remote( # type: ignore[attr-defined] _route, _comm.__class__, **args_ref, ) return {"future": await future, "timestamp": datetime.now()}