"""Module for the EclypseMPI class.
It implements the MPI communication protocol among services in the same application.
"""
from __future__ import annotations
import inspect
from asyncio import Queue
from datetime import datetime
from typing import (
TYPE_CHECKING,
Any,
)
from eclypse.remote.communication.interface import (
EclypseCommunicationInterface,
)
from eclypse.remote.utils import ResponseCode
from .requests import (
BroadcastRequest,
MulticastRequest,
UnicastRequest,
)
from .response import Response
if TYPE_CHECKING:
from collections.abc import Coroutine
from eclypse.remote.communication import Route
from eclypse.remote.service import Service
[docs]
class EclypseMPI(EclypseCommunicationInterface):
"""EclypseMPI class.
It implements the MPI communication protocol among services in the
same application, deployed within the same infrastructure.
It allows to send and receive messages among services, and to
broadcast messages as well. The protocol is implemented by using
the `MPIRequest` objects, which employ asynchrony to handle the
simulation of communication costs of interactions.
"""
[docs]
def __init__(self, service: Service):
"""Initializes the MPI interface.
Args:
service (Service): The service that uses the MPI interface.
"""
super().__init__(service)
self._input_queue: Queue = Queue()
[docs]
def send(
self,
recipient_ids: str | list[str],
body: dict[str, Any],
timestamp: datetime | None = None,
) -> UnicastRequest | MulticastRequest:
"""Sends a message to a single recipient or multiple recipients.
When awaited, the total wait time is the communication cost
between the sender and the recipient in the case of a unicast,
and the maximum communication cost among the interactions with
the recipients in the case of a multicast. The result of this
method **must be awaited**.
Args:
recipient_ids (str | list[str] | None):
The ids of the recipients. If a single id is specified,
the message is sent to a single recipient. If a list of
ids is specified, the message is sent to multiple
recipients.
body (dict[str, Any]): The data to be sent. It must be a pickleable object.
timestamp (datetime.datetime | None, optional): The timestamp of the \
message. Defaults to datetime.datetime.now().
Returns:
UnicastRequest | MulticastRequest: The MPI request.
"""
if not timestamp:
timestamp = datetime.now()
if not isinstance(body, dict):
raise ValueError("body must be a dictionary")
if isinstance(recipient_ids, str):
return UnicastRequest(recipient_ids, body=body, _mpi=self)
if isinstance(recipient_ids, list):
return MulticastRequest(recipient_ids, body=body, _mpi=self)
raise ValueError("recipient_ids must be a string or a list of strings")
[docs]
def bcast(
self,
body: Any,
timestamp: datetime | None = None,
) -> BroadcastRequest:
"""Broadcasts a message to all neighbor services.
When awaited, the total wait time is the maximum communication cost
among the interactions with neighbours. The result of this
method **must be awaited**.
Args:
body (Any): The data to be sent. It must be a pickleable object.
timestamp (datetime.datetime | None, optional): The timestamp of the \
message. Defaults to datetime.datetime.now().
Returns:
BroadcastRequest: The Broadcast MPI request.
"""
if not isinstance(body, dict):
raise ValueError("body must be a dictionary")
return BroadcastRequest(body=body, _mpi=self, timestamp=timestamp)
[docs]
def recv(self) -> Coroutine[Any, Any, dict[str, Any]]:
"""Receive a message in the input queue.
The result of this method **must be awaited**.
Returns:
Task[Any]: The message in the input queue.
"""
return self._input_queue.get()
async def _not_connected_response(self) -> Any:
return Response(ResponseCode.ERROR)
async def _execute_request( # pylint: disable=arguments-differ
self, route: Route, **body
) -> Response:
body["sender_id"] = route.sender_id
await self._input_queue.put(body)
return Response()
[docs]
def exchange(
*,
receive: bool = False,
send: bool = False,
broadcast: bool = False,
):
"""Decorator to require and send a message in a Service method.
The decorated function must receive, send, or broadcast a message.
Sending and broadcasting are mutually exclusive.
Args:
receive (bool, optional): True if the decorated function receives a message. \
Defaults to False.
send (bool, optional): True if the decorated function sends a message. \
Defaults to False.
broadcast (bool, optional): True if the decorated function
broadcasts a message. Defaults to False.
"""
if send and broadcast:
raise ValueError(
"The decorated function cannot send and broadcast at the same time"
)
if not send and not broadcast and not receive:
raise ValueError(
"The decorated function must send, broadcast, or receive a message"
)
def decorator(func):
async def wrapper(self: Service, *args, **kwargs):
if receive:
message = await self.mpi.recv()
sender_id: str = message.pop("sender_id")
# add message to args
args = (sender_id, message, *args)
if inspect.iscoroutinefunction(func):
next_args = await func(self, *args, **kwargs)
else:
next_args = func(self, *args, **kwargs)
if send:
return await self.mpi.send(*next_args)
if broadcast:
return await self.mpi.bcast(next_args)
return next_args
return wrapper
return decorator