Source code for eclypse.report.metrics.defaults

# pylint: disable=protected-access
"""Default metrics to be reported by the ECLYPSE Reporter."""

from __future__ import annotations

import os
from time import time
from typing import (
    TYPE_CHECKING,
    Any,
)

import networkx as nx

from eclypse.utils.constants import (
    DRIVING_EVENT,
    MAX_LATENCY,
    MIN_BANDWIDTH,
    MIN_FLOAT,
    MIN_LATENCY,
    RND_SEED,
    STOP_EVENT,
)
from eclypse.utils.defaults import GML_REPORT_DIR

from . import metric

if TYPE_CHECKING:
    from eclypse.graph import (
        Application,
        Infrastructure,
    )
    from eclypse.placement import (
        Placement,
        PlacementView,
    )
    from eclypse.remote.service import Service


@metric.application
def response_time(
    app: Application,
    placement: Placement,
    infr: Infrastructure,
) -> float | None:
    """Return the response time for each application.

    Args:
        app (Application): The application.
        placement (Placement): The placement of the application.
        infr (Infrastructure): The infrastructure.

    Returns:
        float | None: The maximum response time for the application, if any,
            'inf' otherwise.
    """
    response_times = []
    if placement.mapping:
        for flow in app.flows:
            rt = 0.0

            for service, next_service in nx.utils.pairwise(flow):
                p_service = placement.service_placement(service)
                p_next_service = placement.service_placement(next_service)
                service_processing_time = app.nodes[service].get(
                    "processing_time", MIN_FLOAT
                )
                node_processing_time = infr.nodes[p_service].get(
                    "processing_time", MIN_FLOAT
                )
                link_latency = infr.path_resources(p_service, p_next_service).get(
                    "latency", MIN_LATENCY
                )
                rt += service_processing_time + node_processing_time + link_latency

                # Add the last service and the last node processing time
            last_service = flow[-1]
            rt += app.nodes[last_service].get("processing_time", MIN_FLOAT)
            rt += infr.nodes[placement.service_placement(last_service)].get(
                "processing_time", MIN_FLOAT
            )

            # Store response time for the flow
            response_times.append(rt)

    return max(response_times) if response_times else float("inf")


@metric.service(name="placement")
def placement_mapping(
    service_id: str,
    _: dict[str, Any],
    placement: Placement,
    __: Infrastructure,
) -> str:
    """Return the placement of each service in each application.

    Args:
        service_id (str): The service ID.
        _: The requirements of the service.
        placement (Placement): The placement of the applications.
        __: The infrastructure.

    Returns:
        str: The placement of the service in the application, or "EMPTY" if not placed.
    """
    return placement.mapping.get(service_id, "EMPTY")


@metric.service
def required_cpu(
    _: str,
    requirements: dict[str, Any],
    __: Placement,
    ___: Infrastructure,
) -> float:
    """Return the required CPU for each service in each application.

    Args:
        _: the service ID.
        requirements (dict[str, Any]): The requirements of the service.
        __: The placement of the application the service belongs to.
        ___: The infrastructure.

    Returns:
        float: The required CPU for each service in each application.
    """
    return requirements.get("cpu", MIN_FLOAT)


@metric.service
def required_ram(
    _: str,
    requirements: dict[str, Any],
    __: Placement,
    ___: Infrastructure,
) -> float:
    """Return the required RAM for each service in each application.

    Args:
        _: the service ID.
        requirements (dict[str, Any]): The requirements of the service.
        __: The placement of the application the service belongs to.
        ___: The infrastructure.

    Returns:
        float: The required RAM for each service in each application.
    """
    return requirements.get("ram", MIN_FLOAT)


@metric.service
def required_storage(
    _: str,
    requirements: dict[str, Any],
    __: Placement,
    ___: Infrastructure,
) -> float:
    """Return the required storage for each service in each application.

    Args:
        _: the service ID.
        requirements (dict[str, Any]): The requirements of the service.
        __: The placement of the application the service belongs to.
        ___: The infrastructure.

    Returns:
        float: The required storage for each service in each application.
    """
    return requirements.get("storage", MIN_FLOAT)


@metric.service
def required_gpu(
    _: str,
    requirements: dict[str, Any],
    __: Placement,
    ___: Infrastructure,
) -> float:
    """Return the required GPU for each service in each application.

    Args:
        _: the service ID.
        requirements (dict[str, Any]): The requirements of the service.
        __: The placement of the application the service belongs to.
        ___: The infrastructure.

    Returns:
        float: The required GPU for each service in each application.
    """
    return requirements.get("gpu", MIN_FLOAT)


@metric.interaction
def required_latency(
    _: str,
    __: str,
    requirements: dict[str, Any],
    ___: Placement,
    ____: Infrastructure,
) -> float:
    """Return the required latency for each interaction in each application.

    Args:
        _: The source service ID.
        __: The destination service ID.
        requirements (dict[str, Any]): The requirements of the interaction.
        ___: The placement of the application the service belongs to.
        ____: The infrastructure.

    Returns:
        InteractionValue: The required latency for each interaction in each application.
    """
    return requirements.get("latency", MIN_LATENCY)


@metric.interaction
def required_bandwidth(
    _: str,
    __: str,
    requirements: dict[str, Any],
    ___: Placement,
    ____: Infrastructure,
) -> float:
    """Return the required bandwidth for each interaction in each application.

    Args:
        _: The source service ID.
        __: The destination service ID.
        requirements (dict[str, Any]): The requirements of the interaction.
        ___: The placement of the application the service belongs to.
        ____: The infrastructure.

    Returns:
        float: The required bandwidth for each interaction in each application.
    """
    return requirements.get("bandwidth", MIN_BANDWIDTH)

    ### Infrastructure


@metric.infrastructure
def alive_nodes(infr: Infrastructure, _: PlacementView) -> int:
    """Return the number of alive nodes in the infrastructure.

    Args:
        infr (Infrastructure): The infrastructure.
        _: The placement view.

    Returns:
        InfrastructureValue: The number of alive nodes in the infrastructure.
    """
    return len(infr.available.nodes)


@metric.node
def featured_cpu(
    _: str,
    resources: dict[str, Any],
    __: dict[str, Placement],
    ___: Infrastructure,
    ____: PlacementView,
) -> float:
    """Return the featured CPU of each node in the infrastructure.

    Args:
        _: The placement of the application the service belongs to.
        resources (dict[str, Any]): The resources of the node.
        __: The infrastructure.
        ___: The placement view.

    Returns:
        float: The featured CPU of each node.
    """
    return resources.get("cpu", MIN_FLOAT)


@metric.node
def featured_ram(
    _: str,
    resources: dict[str, Any],
    __: dict[str, Placement],
    ___: Infrastructure,
    ____: PlacementView,
) -> float:
    """Return the featured RAM of each node in the infrastructure.

    Args:
        _: The placement of the application the service belongs to.
        resources (dict[str, Any]): The resources of the node.
        __: The infrastructure.
        ___: The placement view.

    Returns:
        float: The featured RAM of each node.
    """
    return resources.get("ram", MIN_FLOAT)


@metric.node
def featured_storage(
    _: str,
    resources: dict[str, Any],
    __: dict[str, Placement],
    ___: Infrastructure,
    ____: PlacementView,
) -> float:
    """Return the featured storage of each node in the infrastructure.

    Args:
        _: The placement of the application the service belongs to.
        resources (dict[str, Any]): The resources of the node.
        __: The infrastructure.
        ___: The placement view.

    Returns:
        float: The featured storage of each node.
    """
    return resources.get("storage", MIN_FLOAT)


@metric.node
def featured_gpu(
    _: str,
    resources: dict[str, Any],
    __: dict[str, Placement],
    ___: Infrastructure,
    ____: PlacementView,
) -> float:
    """Return the featured GPU of each node in the infrastructure.

    Args:
        _: The placement of the application the service belongs to.
        resources (dict[str, Any]): The resources of the node.
        __: The infrastructure.
        ___: The placement view.

    Returns:
        float: The featured GPU of each node.
    """
    return resources.get("gpu", MIN_FLOAT)


@metric.link
def featured_latency(
    _: str,
    __: str,
    resources: dict[str, Any],
    ___: dict[str, Placement],
    ____: Infrastructure,
    _____: PlacementView,
) -> float:
    """Return the featured latency of each link in the infrastructure.

    Args:
        _: The source node ID.
        __: The destination node ID.
        resources (dict[str, Any]): The resources of the link.
        ___: The placement of the application the service belongs to.
        ____: The infrastructure.
        _____: The placement view.

    Returns:
        LinkValue: The featured latency of each link.
    """
    return resources.get("latency", MAX_LATENCY)


@metric.link
def featured_bandwidth(
    _: str,
    __: str,
    resources: dict[str, Any],
    ___: dict[str, Placement],
    ____: Infrastructure,
    _____: PlacementView,
) -> float:
    """Return the featured bandwidth of each link in the infrastructure.

    Args:
        _: The source node ID.
        __: The destination node ID.
        resources (dict[str, Any]): The resources of the link.
        ___: The placement of the application the service belongs to.
        ____: The infrastructure.
        _____: The placement view.

    Returns:
        float: The featured bandwidth of each link.
    """
    return resources.get("bandwidth", MIN_BANDWIDTH)


@metric.simulation(activates_on=STOP_EVENT)
def seed(*_) -> str:
    """Return the seed used in the simulation.

    Args:
        _: The event triggering the reporting of the seed.

    Returns:
        SimulationValue: The seed used in the simulation.
    """
    return os.environ[RND_SEED]


@metric.simulation(name="step_number", activates_on=[DRIVING_EVENT, STOP_EVENT])
class StepNumber:
    """Return the current step number."""

    def __init__(self):
        """Initialize the step number to 0."""
        self.step = 0

    def __call__(self, event):
        """Increment the step number by 1 and return it.

        Args:
            event (EclypseEvent): The event triggering the reporting of the step number.

        Returns:
            int | None: The step number if the event is the DRIVING_EVENT or 'stop', \
                None otherwise.
        """
        if event.name == DRIVING_EVENT:
            self.step += 1
        if event.name == STOP_EVENT:
            return self.step
        return None


@metric.simulation
class SimulationTime:
    """Return the elapsed time since the simulation started."""

    def __init__(self):
        """Initialize the start time to the current time."""
        self.start = time()

    def __call__(self, _):
        """Return the elapsed time since the simulation started.

        Args:
            _ (EclypseEvent): The event triggering the reporting of the simulation time.

        Returns:
            float | None: The elapsed time since the simulation started \
                if the event is 'stop', None otherwise.
        """
        return time() - self.start


@metric.application(report=GML_REPORT_DIR, activates_on=STOP_EVENT)
def app_gml(app: Application, _: Placement, __: Infrastructure) -> Application:
    """Return the application graph to be saved in a GML file.

    Args:
        app (Application): The application.
        _: The placement of the application.
        __: The infrastructure.

    Returns:
        dict[str, Application]: The application graph to be saved in a GML file.
    """
    return app


@metric.infrastructure(report=GML_REPORT_DIR, activates_on=STOP_EVENT)
def infr_gml(infr: Infrastructure, __: PlacementView) -> Infrastructure:
    """Return the infrastructure graph to be saved in a GML file.

    Args:
        infr (Infrastructure): The infrastructure.
        __: The placement view.

    Returns:
        Infrastructure: The infrastructure graph to be saved in a GML file.
    """
    return infr


@metric.service(remote=True)
def step_result(service: Service) -> Any | None:
    """Return the result of the step executed by the service.

    Args:
        service (Service): The service.

    Returns:
        Any | None: The result of the step executed by the service.
    """
    return service._step_queue.popleft() if service._step_queue else None


[docs] def get_default_metrics(): """Return the default metrics for the simulation report. Returns: list[Callable]: The default metrics for the simulation report: - required assets - featured_assets - placement_mapping - response_time - alive_nodes - seed - step number - simulation time - application in GML format - infrastructure in GML format """ return [ # REQUIRED ASSETS required_cpu, required_ram, required_storage, required_gpu, required_latency, required_bandwidth, # FEATURED ASSETS featured_cpu, featured_ram, featured_storage, featured_gpu, featured_latency, featured_bandwidth, # APPLICATION placement_mapping, response_time, # INFRASTRUCTURE alive_nodes, # SIMULATION seed, StepNumber(), SimulationTime(), # GML app_gml, infr_gml, # REMOTE step_result, ]
__all__ = [ "SimulationTime", "StepNumber", "alive_nodes", "app_gml", "featured_bandwidth", "featured_cpu", "featured_gpu", "featured_latency", "featured_ram", "featured_storage", "infr_gml", "placement_mapping", "required_bandwidth", "required_cpu", "required_gpu", "required_latency", "required_ram", "required_storage", "response_time", "seed", "step_result", ]