Source code for eclypse.remote.communication.interface

"""Module for the EclypseCommunicationInterface class.

It contains the implementation of the interface used by services to communicate.
"""

from __future__ import annotations

import asyncio
from typing import (
    TYPE_CHECKING,
    Any,
)

from eclypse.remote import ray_backend

if TYPE_CHECKING:
    from asyncio import (
        Future,
        Task,
    )
    from collections.abc import Coroutine

    from eclypse.remote.service import Service
    from eclypse.simulation._simulator.remote import RemoteSimulator

    from .route import Route


[docs] class EclypseCommunicationInterface: """EclypseCommunicationInterface class. It is used to implement and simulate the interactions between services deployed and running in the same infrastructure. It allows to interact with the `RemoteSimulator`, which provides the details regarding the current state of the infrastructure, and simulate the behaviour of the services accordingly. """
[docs] def __init__(self, service: Service): """Initializes the communication interface. Args: service (Service): The service that uses the communication interface. """ self._service: Service = service self._im: RemoteSimulator | None = None
[docs] def connect(self): """Connects the communication interface to the `RemoteSimulator`.""" self._im = ray_backend.get_actor(name=self.service.node.manager_actor_name)
[docs] def disconnect(self): """Disconnects the communication interface from the `RemoteSimulator`.""" self._im = None
[docs] def request_route(self, recipient_id: str) -> Future[Route]: """Interact with the `RemoteSimulator` to request a service route. The result of the function can be obtained by calling `ray.get` or by awaiting it. Args: recipient_id (str): The ID of the recipient service. Returns: Task[Route]: The route to the recipient service. """ if self._im: return self._im.route.remote( # type: ignore[attr-defined] self.service.application_id, self.service.id, recipient_id, ) raise ValueError( "The communication interface is not connected to the RemoteSimulator." )
[docs] def get_neighbors(self) -> Task[list[str]]: """Interact with the InfrastructureManager to request service neighbours. The result of the function can be obtained by calling `ray.get` or by awaiting it. Returns: Task[list[str]]: The list of neighbor service IDs. """ if self._im: return self._im.get_neighbors.remote( # type: ignore[attr-defined] self.service.application_id, self.service.id, ) raise ValueError( "The communication interface is not connected to the RemoteSimulator." )
def _handle_request( self, *args, **kwargs ) -> Coroutine[Any, Any, Any] | Future[Any]: """Enqueue a message in the input queue. This method is called internally by the communication interface. Args: *args: The arguments of the request. **kwargs: The keyword arguments of the request. """ if not self.connected: return self._not_connected_response() return asyncio.wrap_future( asyncio.run_coroutine_threadsafe( self._execute_request(*args, **kwargs), self.service.event_loop, ) ) async def _not_connected_response(self) -> Any: """Returns the response when the communication interface is not connected. Returns: Any: The response. Raises: NotImplementedError: The method is not implemented. """ raise NotImplementedError async def _execute_request(self, *args, **kwargs) -> Any: """Enqueue a message in the input queue. This method is called internally by the communication interface. Args: *args: The arguments of the request. **kwargs: The keyword arguments of the request. Returns: Any: The response. """ raise NotImplementedError @property def connected(self) -> bool: """Return True if the communication interface is connected to the simulator. Returns: bool: True if the communication interface is connected. """ return self._im is not None @property def service(self) -> Service: """Returns the service leveraging the communication interface. Returns: Service: The service leveraging the communication interface. """ return self._service