Source code for eclypse.remote.service.service

"""Module for the Service class, which is the base class for services.

Services are the basic building blocks of ECLYPSE remote applications.

In its lifecycle, a `Service` object has the following capabilities:
    1. To be included in an `Application`, implementing the business
        logic of a given service. This is done by overriding the `run`
        method, which must be asynchronous.
    2. To be deployed in a `RemoteEngine` application after a successful placement. This
        will execute the business logic of the service.
    3. To be started and stopped. This will start and stop the execution of the business
        logic of the service.
    4. To be undeployed from a `RemoteEngine` object. This will stop the execution of
        the business logic of the service and remove it from the `RemoteEngine` object.
    5. To communicate with other services through the given communication interfaces.
        Currently, only the MPI interface is supported, which is accessed via the `.mpi`
        property.
"""

from __future__ import annotations

import asyncio
import threading
from abc import (
    ABC,
    abstractmethod,
)
from collections import deque
from typing import (
    TYPE_CHECKING,
    Any,
    cast,
)

from eclypse.remote.communication.mpi import EclypseMPI
from eclypse.remote.communication.request import RouteNotFoundError
from eclypse.remote.communication.rest import EclypseREST
from eclypse.utils._logging import (
    logger,
    print_exception,
)
from eclypse.utils.defaults import (
    DEFAULT_STEP_QUEUE_SIZE,
    SUPPORTED_COMMUNICATION_INTERFACES,
)

if TYPE_CHECKING:
    from collections.abc import (
        Callable,
    )

    from eclypse.remote._node import RemoteNode
    from eclypse.remote.communication import EclypseCommunicationInterface
    from eclypse.utils._logging import Logger
    from eclypse.utils.types import CommunicationInterface


[docs] class Service(ABC): """Base class for services in ECLYPSE remote applications."""
[docs] def __init__( self, service_id: str, communication_interface: CommunicationInterface = "mpi", store_step: bool = False, ): """Initializes a Service object. Args: service_id (str): The name of the service. communication_interface (CommunicationInterface, optional): The communication interface of the service. Defaults to "mpi". store_step (bool, optional): Whether to store the results of each step. Defaults to False. """ if communication_interface not in SUPPORTED_COMMUNICATION_INTERFACES: raise ValueError("Invalid communication interface.") self._service_id: str = service_id self._communication_interface: CommunicationInterface = communication_interface self._store_step: bool = store_step self._application_id: str | None = None self._node: RemoteNode | None = None self._thread: threading.Thread | None = None self._comm: EclypseCommunicationInterface | None = None self._loop: asyncio.AbstractEventLoop | None = None self._run_task: asyncio.Task | None = None self._run_task_fn: Callable[[], asyncio.Task] | None = None self._running: bool = False self._step_count: int = 0 self._step_queue: deque[Any] = deque(maxlen=DEFAULT_STEP_QUEUE_SIZE)
[docs] async def run(self): """Runs the service. It provides a default behaviour where the service runs the `step` method in a loop until the service is stopped. This method can be overridden by the user to provide a custom behaviour. """ while self.running: self._step_count += 1 try: step_result = await self.step() except RouteNotFoundError as error: self.logger.warning( "Skipping service step " f"{self.step_count} because route to {error.recipient_id} " "was not found." ) continue if step_result is not None and self._store_step: self._step_queue.append(step_result)
[docs] @abstractmethod async def step(self): """The service's main loop. Subclasses must implement this method with their service logic. Returns: Any: The result of the step (if any). """
[docs] def on_deploy(self): """Hook called when the service is deployed on a node.""" return None
[docs] def on_undeploy(self): """Hook called when the service is undeployed from a node.""" return None
def _init_thread(self): """Initializes the thread for the service.""" self._run_task = self._run_task_fn() self._thread = threading.Thread(target=_start_loop, args=(self,)) def _deploy(self, node: RemoteNode): """Deploys the service on a node.""" if self.deployed: raise RuntimeError(f"Service {self.id} is already deployed.") self.attach_node(node) self.on_deploy() self._loop = asyncio.new_event_loop() self._run_task_fn = lambda: self.event_loop.create_task( self.run(), name=f"task-{self.id}", ) self._init_thread() def _start(self): """Starts the service.""" if not self.deployed: raise RuntimeError(f"Service {self.id} is not deployed on any node") if self._communication_interface == "mpi": self._comm = EclypseMPI(self) if self._communication_interface == "rest": self._comm = EclypseREST(self) self._comm.connect() self._running = True self._thread.start() def _stop(self): """Stops the service.""" if not self.deployed: raise RuntimeError(f"Service {self.id} is not deployed on any node.") if self.running: self._running = False self._run_task.cancel() self._loop.call_soon_threadsafe(self._loop.stop) self._thread.join() def _undeploy(self): """Undeploys the service from the node.""" if not self.deployed: raise RuntimeError(f"Service {self.id} is not deployed on any node.") if self.running: raise RuntimeError( f"Service {self.id} is running and cannot be undeployed." ) self.on_undeploy() self._comm.disconnect() # self._step_queue.clear() self._comm = None self._loop = None self._run_task_fn = None self._run_task = None self._thread = None self.detach_node() @property def mpi(self) -> EclypseMPI: """Returns the EclypseMPI interface of the service.""" if not self.deployed: raise RuntimeError(f"Service {self.id} is not deployed on any node.") if self._communication_interface != "mpi": raise RuntimeError( f"Service {self.id} implements " f"{self._communication_interface}, not mpi." ) return cast("EclypseMPI", self._comm) @property def rest(self) -> EclypseREST: """Returns the EclypseREST interface of the service.""" if not self.deployed: raise RuntimeError(f"Service {self.id} is not deployed on any node.") if self._communication_interface != "rest": raise RuntimeError( f"Service {self.id} implements " f"{self._communication_interface}, not rest." ) return cast("EclypseREST", self._comm) @property def event_loop(self) -> asyncio.AbstractEventLoop: """Returns the asyncio event loop of the service.""" if self._loop is None: raise RuntimeError(f"Service {self.id} is not deployed on any node.") return self._loop @property def node(self) -> RemoteNode: """Return the remote node hosting the service.""" if self._node is None: raise RuntimeError(f"Service {self.id} is not deployed on any node.") return self._node @property def infrastructure_id(self) -> str: """Return the infrastructure identifier of the hosting node.""" return self.node.infrastructure_id @property def full_id(self) -> str: """Return the fully-qualified service identifier.""" if self._application_id is None: raise ValueError("Application ID not set.") return f"{self._application_id}/{self._service_id}" @property def id(self): """Return the local service identifier inside its application.""" return self._service_id @property def application_id(self): """Returns the ID of the application the service belongs to.""" return self._application_id @application_id.setter def application_id(self, application_id: str): """Sets the ID of the application the service belongs to.""" self._application_id = application_id
[docs] def attach_node(self, node: RemoteNode): """Attach the service to a remote node.""" self._node = node
[docs] def detach_node(self): """Detach the service from its remote node.""" self._node = None
@property def deployed(self): """Returns True if the service is deployed on a node.""" return self._node is not None @property def running(self): """Returns True if the service is running.""" return self._running @property def step_count(self) -> int: """Return the number of attempted service loop iterations.""" return self._step_count @property def logger(self) -> Logger: """Returns the logger of the service, binding the service ID in the logs. Returns: Logger: The logger fo the Service. """ return self.node._logger.bind(id=self.id)
# pylint: disable=protected-access def _start_loop(service: Service): asyncio.set_event_loop(service.event_loop) try: if service._run_task is not None: service.event_loop.run_until_complete(service._run_task) else: service.event_loop.run_forever() except asyncio.CancelledError: pass except RuntimeError as e: if str(e) == "Event loop stopped before Future completed.": pass else: print_exception(e, f"{service.id}", _exception_logger(service)) except Exception as e: print_exception(e, f"{service.id}", _exception_logger(service)) if service._comm is not None: service._comm.disconnect() service.event_loop.close() def _exception_logger(service: Service) -> Logger: """Return a service-bound logger without masking the original exception.""" try: return service.logger except Exception: return logger.bind(id=service.id)