Source code for helix_ir.lineage.graph
"""Data lineage graph: track field-level transformations."""
from __future__ import annotations
from dataclasses import dataclass, field
from helix_ir.schema.path import Path
[docs]
@dataclass(frozen=True)
class LineageEdge:
"""A directed edge in the lineage graph."""
source: Path
target: Path
transform: str | None = None
confidence: float = 1.0
[docs]
class Lineage:
"""A lineage graph tracking field-level data flow."""
def __init__(self) -> None:
self._edges: list[LineageEdge] = []
[docs]
def record(
self,
source: Path | str,
target: Path | str,
transform: str | None = None,
confidence: float = 1.0,
) -> None:
"""Record a lineage edge from source to target."""
if isinstance(source, str):
source = Path.parse(source)
if isinstance(target, str):
target = Path.parse(target)
self._edges.append(LineageEdge(
source=source,
target=target,
transform=transform,
confidence=confidence,
))
[docs]
def upstream(self, path: Path | str) -> list[LineageEdge]:
"""Return all edges where target == path."""
if isinstance(path, str):
path = Path.parse(path)
return [e for e in self._edges if e.target == path]
[docs]
def downstream(self, path: Path | str) -> list[LineageEdge]:
"""Return all edges where source == path."""
if isinstance(path, str):
path = Path.parse(path)
return [e for e in self._edges if e.source == path]
[docs]
def all_edges(self) -> list[LineageEdge]:
"""Return all recorded edges."""
return list(self._edges)
[docs]
def to_openlineage(self) -> list[dict]:
"""Export lineage as OpenLineage-compatible dicts."""
result = []
for edge in self._edges:
result.append(
{
"eventType": "COMPLETE",
"inputs": [
{
"namespace": "helix",
"name": str(edge.source),
"facets": {},
}
],
"outputs": [
{
"namespace": "helix",
"name": str(edge.target),
"facets": {},
}
],
"job": {
"namespace": "helix",
"name": edge.transform or "identity",
},
"run": {"runId": "00000000-0000-0000-0000-000000000000"},
"producer": "helix_ir",
"schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json",
}
)
return result
[docs]
def to_dot(self) -> str:
"""Export lineage as a Graphviz DOT string."""
lines = ["digraph lineage {"]
lines.append(' rankdir="LR";')
seen_nodes: set[str] = set()
for edge in self._edges:
src_str = str(edge.source)
tgt_str = str(edge.target)
for node in (src_str, tgt_str):
if node not in seen_nodes:
escaped = node.replace('"', '\\"')
lines.append(f' "{escaped}";')
seen_nodes.add(node)
src_escaped = src_str.replace('"', '\\"')
tgt_escaped = tgt_str.replace('"', '\\"')
label = f' [label="{edge.transform or ""}"]' if edge.transform else ""
lines.append(f' "{src_escaped}" -> "{tgt_escaped}"{label};')
lines.append("}")
return "\n".join(lines)
def __len__(self) -> int:
return len(self._edges)
def __repr__(self) -> str:
return f"Lineage({len(self._edges)} edges)"