Source code for eclypse.graph.application

"""Module for the Application class.

It extends the AssetGraph class to represent an application, with nodes representing
services and edges representing the interactions between them.
"""

from __future__ import annotations

from functools import cached_property
from typing import (
    TYPE_CHECKING,
)

import networkx as nx

from eclypse.graph import AssetGraph
from eclypse.graph.assets.defaults import (
    get_default_edge_assets,
    get_default_node_assets,
)
from eclypse.remote.service import Service

if TYPE_CHECKING:
    from eclypse.utils.types import (
        InitPolicy,
        UpdatePolicies,
    )

    from .assets import Asset


[docs] class Application(AssetGraph): # pylint: disable=too-few-public-methods """Class to represent a multi-service Application."""
[docs] def __init__( self, application_id: str, update_policies: UpdatePolicies = None, node_assets: dict[str, Asset] | None = None, edge_assets: dict[str, Asset] | None = None, include_default_assets: bool = True, requirement_init: InitPolicy = "min", flows: list[list[str]] | None = None, seed: int | None = None, ): """Create a new Application. Args: application_id (str): The ID of the application. update_policies (Callable | list[Callable] | None):\ Graph update policies executed during ``evolve()``. node_assets (dict[str, Asset] | None): The assets of the nodes. edge_assets (dict[str, Asset] | None): The assets of the edges. include_default_assets (bool): Whether to include the default assets. \ Defaults to True. requirement_init (InitPolicy): The initialization of the requirements. flows (list[list[str]] | None): The flows of the application. seed (int | None): The seed for the random number generator. """ _node_assets = get_default_node_assets() if include_default_assets else {} _edge_assets = get_default_edge_assets() if include_default_assets else {} _node_assets.update(node_assets or {}) _edge_assets.update(edge_assets or {}) super().__init__( graph_id=application_id, update_policies=update_policies, node_assets=_node_assets, edge_assets=_edge_assets, attr_init=requirement_init, seed=seed, flip_assets=True, ) self.services: dict[str, Service] = {} self.flows = flows if flows is not None else []
[docs] def add_service(self, service: Service, **assets): """Add a service to the application. Args: service (Service): The service to add. **assets : The assets to add to the service. """ if not isinstance(service, Service): raise TypeError(f"Expected Service instance, got {type(service).__name__}") # prevent silent "application_id" overwrite current_app_id = getattr(service, "application_id", None) if current_app_id is not None and current_app_id != self.id: raise ValueError( f"Service '{service.id}' is already assigned to " f"application '{current_app_id}'. " f"Cannot reassign to '{self.id}'." ) service.application_id = self.id self.services[service.id] = service self.add_node(service.id, **assets)
[docs] def set_flows(self, ingress: str = "gateway"): """Set the flows of the application, using the following rules. - If the flows are already set, do nothing. - If the flows are not set, use the `ingress` as the source and all the other nodes as the target. - If there is no ingress, set the flows to an empty list. Args: ingress (str): The name of the ingress node. Defaults to "gateway". """ if self.flows == []: gateway_name = next((s for s in self.nodes if s == ingress), None) if gateway_name is not None: self.flows = [] for target in self.nodes: if target == gateway_name: continue try: self.flows.append( nx.shortest_path(self, source=gateway_name, target=target) ) except nx.NetworkXNoPath: continue
@cached_property def has_service_implementations(self) -> bool: """Check if the application has a logic for each service. This property requires to be True for the remote execution. """ return all(x in self.services for x in self.nodes)