Source code for eclypse.policies.replay.replay_with_mapping

"""Replay records after applying column and id mappings."""

from __future__ import annotations

from typing import TYPE_CHECKING

from eclypse.policies.replay._helpers import normalise_records
from eclypse.policies.replay.from_records import from_records

if TYPE_CHECKING:
    from eclypse.utils.types import (
        ReplayTarget,
        UpdatePolicy,
    )


[docs] def replay_with_mapping( record_source, *, target: ReplayTarget, column_mapping: dict[str, str] | None = None, id_mapping: dict[str, str] | None = None, **kwargs, ) -> UpdatePolicy: """Replay records after renaming columns and external graph ids. Args: record_source (Any): Iterable of mapping records to replay. target (ReplayTarget): Replay target, either ``"nodes"`` or ``"edges"``. column_mapping (dict[str, str] | None): Optional mapping from input column names to replay columns. id_mapping (dict[str, str] | None): Optional mapping from external graph ids to local graph ids. kwargs (Any): Additional keyword arguments forwarded to ``from_records``. Returns: Stateful replay policy. """ mapped_records = [] for record in normalise_records(record_source): mapped = { (column_mapping or {}).get(column, column): value for column, value in record.items() } for key in ("node_id", "source", "target"): if key in mapped: mapped[key] = (id_mapping or {}).get(mapped[key], mapped[key]) mapped_records.append(mapped) return from_records(mapped_records, target=target, **kwargs)