Source code for helix_ir.sources.parquet_source

"""Parquet file source."""

from __future__ import annotations

from typing import Any, Iterable


[docs] class ParquetSource: """Read documents from a Parquet file using PyArrow.""" def __init__(self, path: str, batch_size: int = 1000) -> None: self.path = path self.batch_size = batch_size
[docs] def read(self) -> Iterable[dict[str, Any]]: import pyarrow.parquet as pq pf = pq.ParquetFile(self.path) for batch in pf.iter_batches(batch_size=self.batch_size): tbl = batch.to_pydict() n_rows = len(next(iter(tbl.values()), [])) for i in range(n_rows): yield {k: v[i] for k, v in tbl.items()}
[docs] def schema_hint(self) -> dict[str, Any] | None: import pyarrow.parquet as pq pf = pq.ParquetFile(self.path) schema = pf.schema_arrow return {"arrow_schema": schema}