Source code for df_eval.parquet
from __future__ import annotations
from collections.abc import Iterable, Iterator, Sequence
from pathlib import Path
from typing import Any
import pandas as pd
def _import_pyarrow_dataset() -> Any:
"""Import pyarrow.dataset lazily to keep the dependency optional."""
try:
import pyarrow.dataset as ds
except ImportError as exc: # pragma: no cover - environment dependent
raise ImportError(
"Parquet helpers require the optional dependency. "
"Install with: pip install 'df-eval[parquet]'"
) from exc
return ds
def _import_pyarrow_parquet() -> Any:
"""Import pyarrow.parquet lazily to keep the dependency optional."""
try:
import pyarrow.parquet as pq
except ImportError as exc: # pragma: no cover - environment dependent
raise ImportError(
"Parquet helpers require the optional dependency. "
"Install with: pip install 'df-eval[parquet]'"
) from exc
return pq
def _import_pyarrow() -> Any:
"""Import pyarrow lazily for table conversion support."""
try:
import pyarrow as pa
except ImportError as exc: # pragma: no cover - environment dependent
raise ImportError(
"Parquet helpers require the optional dependency. "
"Install with: pip install 'df-eval[parquet]'"
) from exc
return pa
def _normalize_columns(columns: Sequence[str] | None) -> list[str] | None:
"""Validate and normalize an optional list of projected column names."""
if columns is None:
return None
if isinstance(columns, (str, bytes)):
raise TypeError("columns must be a sequence of strings, not a string")
normalized = list(columns)
if not all(isinstance(name, str) for name in normalized):
raise TypeError("columns must only contain strings")
return normalized
[docs]
def iter_parquet_row_chunks(
path: str | Path,
*,
chunk_size: int = 100_000,
columns: Sequence[str] | None = None,
) -> Iterator[pd.DataFrame]:
"""Yield Parquet rows as pandas DataFrame chunks.
This treats a Parquet file or directory-backed Parquet dataset as an
out-of-memory DataFrame and streams it into manageable in-memory chunks.
Args:
path: Path to a Parquet file or a directory containing a Parquet dataset.
chunk_size: Maximum number of rows to include per yielded chunk.
columns: Optional subset of columns to project while scanning.
Yields:
DataFrame chunks with at most ``chunk_size`` rows.
Raises:
FileNotFoundError: If ``path`` does not exist.
TypeError: If ``path``, ``chunk_size``, or ``columns`` have invalid types.
ValueError: If ``chunk_size`` is less than 1.
ImportError: If ``pyarrow`` is not installed.
"""
if not isinstance(path, (str, Path)):
raise TypeError("path must be a str or pathlib.Path")
parquet_path = Path(path).expanduser()
if not parquet_path.exists():
raise FileNotFoundError(f"Parquet path does not exist: {parquet_path}")
if isinstance(chunk_size, bool) or not isinstance(chunk_size, int):
raise TypeError("chunk_size must be an integer")
if chunk_size < 1:
raise ValueError("chunk_size must be >= 1")
projected_columns = _normalize_columns(columns)
ds = _import_pyarrow_dataset()
dataset = ds.dataset(parquet_path, format="parquet")
for batch in dataset.to_batches(columns=projected_columns, batch_size=chunk_size):
yield batch.to_pandas()
[docs]
def write_parquet_row_chunks(
chunks: Iterable[pd.DataFrame],
output_path: str | Path,
*,
compression: str = "snappy",
) -> Path:
"""Write DataFrame chunks to a Parquet file.
Args:
chunks: DataFrame chunks to write sequentially.
output_path: Destination Parquet file path.
compression: Parquet compression codec.
Returns:
The normalized output path.
Raises:
TypeError: If ``output_path``, ``compression``, or chunk values are invalid.
ValueError: If ``compression`` is empty or no chunks are provided.
ImportError: If ``pyarrow`` is not installed.
"""
if not isinstance(output_path, (str, Path)):
raise TypeError("output_path must be a str or pathlib.Path")
if not isinstance(compression, str):
raise TypeError("compression must be a string")
if not compression:
raise ValueError("compression must not be empty")
parquet_output_path = Path(output_path).expanduser()
parquet_output_path.parent.mkdir(parents=True, exist_ok=True)
pq = _import_pyarrow_parquet()
pa = _import_pyarrow()
writer = None
wrote_any = False
try:
for chunk in chunks:
if not isinstance(chunk, pd.DataFrame):
raise TypeError("chunks must contain pandas DataFrame values")
table = pa.Table.from_pandas(chunk, preserve_index=False)
if writer is None:
writer = pq.ParquetWriter(
str(parquet_output_path),
table.schema,
compression=compression,
)
writer.write_table(table)
wrote_any = True
finally:
if writer is not None:
writer.close()
if not wrote_any:
raise ValueError("chunks did not yield any DataFrame values")
return parquet_output_path
__all__ = ["iter_parquet_row_chunks", "write_parquet_row_chunks"]