Source code for eclypse.policies.compose.conditional

"""Conditional policy composition."""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from collections.abc import Callable

    from eclypse.graph.asset_graph import AssetGraph
    from eclypse.utils.types import UpdatePolicy


[docs] def conditional( predicate: Callable[[AssetGraph], bool], policy: UpdatePolicy, ) -> UpdatePolicy: """Run ``policy`` only when ``predicate(graph)`` is true. Args: predicate (Callable[[AssetGraph], bool]): Callable receiving the graph and returning a truthy value. policy (UpdatePolicy): Wrapped policy to call when ``predicate`` passes. Returns: Conditional policy. """ def wrapped(graph: AssetGraph): if predicate(graph): policy(graph) return wrapped