Source code for helix_ir.pii.classifier

"""PII detection: annotate a Schema with PII classes."""

from __future__ import annotations

from typing import Any

import pyarrow as pa

from helix_ir.pii.heuristics import detect_pii_from_field_name
from helix_ir.pii.regex_patterns import get_all_patterns, get_patterns
from helix_ir.schema.schema import Schema
from helix_ir.types.core import HelixType


[docs] def detect_pii( schema: Schema, sample_values: dict[str, list[Any]] | None = None, locale: str = "in", layers: list[str] | None = None, confidence_threshold: float = 0.8, ) -> Schema: """Annotate schema fields with PII classes. Args: schema: The schema to annotate. sample_values: Dict mapping path strings to sample values for regex matching. locale: Locale for PII patterns ('in', 'us', 'eu', 'all'). layers: Detection layers to use. Default: ['name', 'regex']. confidence_threshold: Minimum fraction of matched values to assign PII class. Returns: A new Schema with pii_class annotations on relevant fields. """ if layers is None: layers = ["name", "regex"] if locale == "all": patterns = get_all_patterns() else: patterns = get_patterns(locale) new_fields: list[tuple[str, HelixType]] = [] for fname, ht in schema.fields: pii_class = ht.pii_class # preserve existing # Layer 1: field name heuristics if pii_class is None and "name" in layers: pii_class = detect_pii_from_field_name(fname) # Layer 2: regex matching on sample values if pii_class is None and "regex" in layers and sample_values: values = sample_values.get(fname, []) if values: pii_class = _detect_pii_from_values(values, patterns, confidence_threshold) new_fields.append((fname, ht.evolve(pii_class=pii_class))) return Schema(name=schema.name, fields=tuple(new_fields))
def _detect_pii_from_values( values: list[Any], patterns: dict, confidence_threshold: float, ) -> str | None: """Check sample values against regex patterns and return the best PII class.""" import re str_values = [str(v) for v in values if v is not None and isinstance(v, str)] if not str_values: return None # Count matches per PII class counts: dict[str, int] = {} for pii_class, pattern in patterns.items(): matched = sum(1 for v in str_values if pattern.fullmatch(v)) if matched > 0: counts[pii_class] = matched if not counts: return None # Find best match best_class = max(counts, key=lambda c: counts[c]) ratio = counts[best_class] / len(str_values) if ratio >= confidence_threshold: return best_class return None