Source code for helix_ir.diff.classifier

"""Schema diff: compare two schemas and classify changes."""

from __future__ import annotations

from dataclasses import dataclass, field

import pyarrow as pa

from helix_ir.schema.path import Path
from helix_ir.schema.schema import Schema
from helix_ir.types.core import HelixType
from helix_ir.types.lattice import subsumes


[docs] @dataclass(frozen=True) class SchemaChange: """A single change between two schema versions.""" path: Path kind: str # 'added', 'removed', 'type_changed', 'nullable_changed', # 'semantic_changed', 'pii_changed', 'description_changed' severity: str # 'safe', 'risky', 'breaking' old_type: HelixType | None new_type: HelixType | None description: str
[docs] @dataclass(frozen=True) class SchemaDiff: """Result of comparing two schemas.""" old_name: str new_name: str changes: tuple[SchemaChange, ...] @property def has_breaking_changes(self) -> bool: return any(c.severity == "breaking" for c in self.changes) @property def has_risky_changes(self) -> bool: return any(c.severity in ("breaking", "risky") for c in self.changes)
[docs] def filter(self, severity: str) -> "SchemaDiff": """Return a new SchemaDiff containing only changes of the given severity.""" filtered = tuple(c for c in self.changes if c.severity == severity) return SchemaDiff( old_name=self.old_name, new_name=self.new_name, changes=filtered, )
[docs] def filter_kind(self, kind: str) -> "SchemaDiff": """Return a new SchemaDiff containing only changes of the given kind.""" filtered = tuple(c for c in self.changes if c.kind == kind) return SchemaDiff( old_name=self.old_name, new_name=self.new_name, changes=filtered, )
[docs] def summary(self) -> dict[str, int]: """Return a summary count of changes by severity.""" counts: dict[str, int] = {"safe": 0, "risky": 0, "breaking": 0} for c in self.changes: counts[c.severity] = counts.get(c.severity, 0) + 1 return counts
def __len__(self) -> int: return len(self.changes) def __bool__(self) -> bool: return len(self.changes) > 0
[docs] def diff(old: Schema, new: Schema) -> SchemaDiff: """Compare two schemas and return a SchemaDiff. Classification rules: - Field removed: BREAKING (downstream consumers break) - Field added (nullable): SAFE - Field added (non-nullable): RISKY (existing data won't have this field) - Type narrowed (e.g. string → int): BREAKING - Type widened (e.g. int32 → int64): SAFE - Type changed incompatibly: BREAKING - Nullable → non-nullable: RISKY - Non-nullable → nullable: SAFE - Semantic changed: RISKY - PII class added: RISKY - PII class removed: SAFE (becoming less sensitive) """ changes: list[SchemaChange] = [] old_fields = dict(old.fields) new_fields = dict(new.fields) all_names = list(old_fields.keys()) for n in new_fields: if n not in old_fields: all_names.append(n) for name in all_names: path = Path.parse(name) old_ht = old_fields.get(name) new_ht = new_fields.get(name) if old_ht is None and new_ht is not None: # Field added is_nullable = new_ht.null_ratio > 0 or new_ht.is_nullable() severity = "safe" if is_nullable else "risky" changes.append( SchemaChange( path=path, kind="added", severity=severity, old_type=None, new_type=new_ht, description=f"Field '{name}' added ({'nullable' if is_nullable else 'non-nullable'})", ) ) elif old_ht is not None and new_ht is None: # Field removed — always breaking changes.append( SchemaChange( path=path, kind="removed", severity="breaking", old_type=old_ht, new_type=None, description=f"Field '{name}' removed", ) ) else: assert old_ht is not None and new_ht is not None # Compare the types type_changes = _classify_type_change(path, name, old_ht, new_ht) changes.extend(type_changes) return SchemaDiff( old_name=old.name, new_name=new.name, changes=tuple(changes), )
def _classify_type_change( # noqa: C901 path: Path, name: str, old_ht: HelixType, new_ht: HelixType, ) -> list[SchemaChange]: """Classify differences between two HelixType instances for the same field.""" changes: list[SchemaChange] = [] # Check Arrow type change if old_ht.arrow_type != new_ht.arrow_type: severity = _classify_arrow_type_change(old_ht.arrow_type, new_ht.arrow_type) desc = ( f"Field '{name}' type changed from {old_ht.arrow_type} to {new_ht.arrow_type}" ) changes.append( SchemaChange( path=path, kind="type_changed", severity=severity, old_type=old_ht, new_type=new_ht, description=desc, ) ) # Check nullability change old_nullable = old_ht.null_ratio > 0 new_nullable = new_ht.null_ratio > 0 if old_nullable != new_nullable: if old_nullable and not new_nullable: # Became non-nullable: RISKY (existing data may have nulls) severity = "risky" desc = f"Field '{name}' became non-nullable" else: # Became nullable: SAFE severity = "safe" desc = f"Field '{name}' became nullable" changes.append( SchemaChange( path=path, kind="nullable_changed", severity=severity, old_type=old_ht, new_type=new_ht, description=desc, ) ) # Check semantic change if old_ht.semantic != new_ht.semantic: changes.append( SchemaChange( path=path, kind="semantic_changed", severity="risky", old_type=old_ht, new_type=new_ht, description=( f"Field '{name}' semantic changed from {old_ht.semantic!r} " f"to {new_ht.semantic!r}" ), ) ) # Check PII change if old_ht.pii_class != new_ht.pii_class: if old_ht.pii_class is None and new_ht.pii_class is not None: severity = "risky" desc = f"Field '{name}' now classified as PII ({new_ht.pii_class!r})" elif old_ht.pii_class is not None and new_ht.pii_class is None: severity = "safe" desc = f"Field '{name}' PII classification removed (was {old_ht.pii_class!r})" else: severity = "risky" desc = ( f"Field '{name}' PII class changed from {old_ht.pii_class!r} " f"to {new_ht.pii_class!r}" ) changes.append( SchemaChange( path=path, kind="pii_changed", severity=severity, old_type=old_ht, new_type=new_ht, description=desc, ) ) return changes def _classify_arrow_type_change( old_type: pa.DataType, new_type: pa.DataType, ) -> str: """Classify the severity of changing from old_type to new_type.""" from helix_ir.types.core import HelixType old_ht = HelixType(arrow_type=old_type) new_ht = HelixType(arrow_type=new_type) # New type subsumes old → safe widening if subsumes(new_ht, old_ht): return "safe" # Old type subsumes new but new doesn't subsume old → narrowing is risky if subsumes(old_ht, new_ht): return "risky" # String to non-string: risky (may fail parsing) if pa.types.is_string(old_type) and not pa.types.is_string(new_type): return "risky" # Incompatible types: breaking return "breaking"