Source code for helix_ir.transform.table

"""Lazy Table class — the primary user-facing transform API."""

from __future__ import annotations

from typing import Any

import pyarrow as pa

from helix_ir.schema.schema import Schema
from helix_ir.transform.compiler.logical import (
    Aggregate,
    Filter,
    Join,
    Limit,
    Project,
    RawSQL,
    Scan,
    Sort,
    Union,
)
from helix_ir.transform.expression import Expression, SortExpr, col, star


[docs] class Table: """A lazy relational table backed by a LogicalPlan. Operations return new Table instances without executing queries. Call .to_sql() to compile or .to_arrow() to execute via DuckDB. """ def __init__(self, source_name: str, schema: Schema | None = None) -> None: self._plan: object = Scan(source_name=source_name, schema=schema) self._schema = schema @classmethod def _from_plan(cls, plan: object, schema: Schema | None = None) -> "Table": t = cls.__new__(cls) t._plan = plan t._schema = schema return t # ------------------------------------------------------------------------- # Transformations (all lazy) # -------------------------------------------------------------------------
[docs] def filter(self, predicate: Expression) -> "Table": """Filter rows by a predicate expression.""" return Table._from_plan(Filter(input=self._plan, predicate=predicate))
[docs] def where(self, predicate: Expression) -> "Table": """Alias for filter().""" return self.filter(predicate)
[docs] def select(self, *exprs: Expression) -> "Table": """Select specific columns.""" return Table._from_plan(Project(input=self._plan, columns=list(exprs)))
[docs] def with_column(self, name: str, expr: Expression) -> "Table": """Add or replace a column.""" aliased = expr.alias(name) current_cols = self._current_columns() if current_cols: # Replace existing or add new_cols = [aliased if c._alias == name else c for c in current_cols] if all(c._alias != name for c in current_cols): new_cols.append(aliased) else: new_cols = [star(), aliased] return Table._from_plan(Project(input=self._plan, columns=new_cols))
[docs] def drop(self, *columns: str) -> "Table": """Drop columns by name.""" current = self._current_columns() if current: new_cols = [c for c in current if not (isinstance(c, type(col("x"))) and c.name in columns)] else: # Can't drop without knowing columns; use EXCEPT # DuckDB supports SELECT * EXCEPT (col1, col2) from helix_ir.transform.expression import Expression as Expr class ExceptExpr(Expr): def __init__(self, cols_to_drop: list[str]) -> None: super().__init__() self.cols_to_drop = cols_to_drop def _copy(self) -> "ExceptExpr": return ExceptExpr(self.cols_to_drop) def to_sql(self, dialect: str = "duckdb") -> str: drops = ", ".join(f'"{c}"' for c in self.cols_to_drop) return f"* EXCEPT ({drops})" new_cols = [ExceptExpr(list(columns))] return Table._from_plan(Project(input=self._plan, columns=new_cols))
[docs] def rename(self, **mapping: str) -> "Table": """Rename columns: rename(old_name='new_name').""" current = self._current_columns() new_cols: list[Expression] = [] if current: for c in current: from helix_ir.transform.expression import Column if isinstance(c, Column) and c.name in mapping: new_cols.append(c.alias(mapping[c.name])) else: new_cols.append(c) else: # Build explicit renames new_cols = [star()] for old, new in mapping.items(): new_cols.append(col(old).alias(new)) return Table._from_plan(Project(input=self._plan, columns=new_cols))
[docs] def sort(self, *exprs: Expression | SortExpr) -> "Table": """Sort by expressions.""" return Table._from_plan(Sort(input=self._plan, by=list(exprs)))
[docs] def order_by(self, *exprs: Expression | SortExpr) -> "Table": """Alias for sort().""" return self.sort(*exprs)
[docs] def limit(self, n: int, offset: int = 0) -> "Table": """Limit to n rows, with optional offset.""" return Table._from_plan(Limit(input=self._plan, n=n, offset=offset))
[docs] def head(self, n: int = 5) -> "Table": """Return first n rows.""" return self.limit(n)
[docs] def group_by(self, *exprs: Expression) -> "operators.GroupedTable": """Group by expressions — returns a GroupedTable for .agg().""" from helix_ir.transform.operators import GroupedTable return GroupedTable(self, list(exprs))
[docs] def join( self, other: "Table", on: Expression, how: str = "inner", ) -> "Table": """Join with another table.""" return Table._from_plan( Join(left=self._plan, right=other._plan, on=on, how=how) )
[docs] def union(self, other: "Table", all: bool = True) -> "Table": """Union with another table.""" return Table._from_plan( Union(left=self._plan, right=other._plan, all=all) )
[docs] def distinct(self) -> "Table": """Return distinct rows (wraps in a SELECT DISTINCT).""" return Table._from_plan( RawSQL(sql=f"SELECT DISTINCT * FROM ({self.to_sql()})", alias="distinct") )
# ------------------------------------------------------------------------- # Compilation / execution # -------------------------------------------------------------------------
[docs] def to_sql(self, dialect: str = "duckdb") -> str: """Compile this table to a SQL SELECT statement.""" from helix_ir.transform.compiler.emitters import get_emitter from helix_ir.transform.compiler.optimizer import optimize optimized = optimize(self._plan) emitter = get_emitter(dialect) return emitter.emit(optimized)
[docs] def to_arrow(self) -> pa.Table: """Execute this query via DuckDB and return a PyArrow Table.""" import duckdb sql = self.to_sql(dialect="duckdb") conn = duckdb.connect() return conn.execute(sql).arrow()
def _current_columns(self) -> list[Expression]: """Return current projected columns if a Project node is at top.""" if isinstance(self._plan, Project): return list(self._plan.columns) return [] def __repr__(self) -> str: try: sql = self.to_sql() return f"Table(\n{sql}\n)" except Exception: return f"Table({type(self._plan).__name__})"