from pathlib import Path
from typing import Optional
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
[docs]
class LazyLocIndexer:
[docs]
def __init__(self, parent):
self.parent = parent
def __getitem__(self, key):
# If key is (row, col), ensure col is loaded
if isinstance(key, tuple) and len(key) == 2:
row_key, col_key = key
_ = self.parent[col_key] # Triggers column load and cache
return self.parent.to_pandas().loc[key]
else:
return self.parent.to_pandas().loc[key]
def __setitem__(self, key, value):
df = self.parent.to_pandas()
df.loc[key] = value
# noinspection PyProtectedMember
self.parent._update_from_pandas(df)
[docs]
class LazyParquetDataFrame:
[docs]
def __init__(self, path, index_cols: Optional[list[str]] = None):
""" Initialize a LazyParquetDataFrame.
Args:
path (str or Path): Path to the Parquet file.
index_cols (list[str]): List of column names, if any, to be used as index columns.
"""
self.path = path
self._schema = pq.read_schema(path)
self._loaded_columns = {}
self._extra_columns = {}
self._column_order = list(self._schema.names)
self._pandas_cache = None
self._index_cols = []
self._index = None
meta = pq.read_metadata(path).metadata or {}
if index_cols is not None:
self._index_cols = list(index_cols)
index_df = pq.read_table(path, columns=self._index_cols).to_pandas()
if len(self._index_cols) == 1:
col = self._index_cols[0]
self._index = pd.Index(index_df[col], name=col)
else:
self._index = pd.MultiIndex.from_frame(index_df)
self._column_order = [c for c in self._column_order if c not in self._index_cols]
elif b'pandas' in meta:
df = pq.read_table(path).to_pandas()
self._index = df.index
self._column_order = list(df.columns)
else:
num_rows = pq.read_table(path).num_rows
self._index = pd.RangeIndex(num_rows)
[docs]
def set_index(self, columns):
"""Set the index of the DataFrame to the specified columns."""
if not all(col in self._column_order for col in columns):
raise KeyError(f"One or more columns {columns} are not in the DataFrame.")
try:
index_df = self.to_pandas()[columns]
self._index = pd.MultiIndex.from_frame(index_df) if len(columns) > 1 else pd.Index(index_df[columns[0]])
except Exception as e:
raise ValueError(f"Failed to set index: {e}")
self._invalidate_cache()
[docs]
def reset_index(self, drop=False):
"""Reset the index of the DataFrame, optionally dropping it."""
if drop:
self._index = pd.RangeIndex(len(self.to_pandas()))
else:
df = self.to_pandas()
index_df = self._index.to_frame(index=False) if isinstance(self._index, pd.MultiIndex) else self._index
index_cols = list(index_df.columns) if isinstance(index_df, pd.DataFrame) else [self._index.name]
for col in index_cols:
if col in self._column_order:
raise ValueError(f"Cannot reset index: column '{col}' already exists.")
# Add index columns to extra_columns and column_order at the front
if isinstance(index_df, pd.DataFrame):
for col in index_df.columns:
self._extra_columns[col] = index_df[col]
self._column_order = index_cols + self._column_order
else:
self._extra_columns[self._index.name] = index_df
self._column_order = [self._index.name] + self._column_order
self._index = pd.RangeIndex(len(df))
self._invalidate_cache()
[docs]
def to_pandas(self):
"""Convert the Parquet file to a pandas DataFrame, caching the result."""
if self._pandas_cache is not None:
return self._pandas_cache
df = pq.read_table(self.path).to_pandas()
for k, v in self._extra_columns.items():
df[k] = v
df = df[self._column_order]
df.index = self._index
self._pandas_cache = df
return df
[docs]
def iter_chunks(self, batch_size=100_000, columns=None):
"""Yield pandas DataFrames in row-wise chunks, including extra columns."""
pf = pq.ParquetFile(self.path)
start = 0
columns = columns or self._column_order
parquet_columns = [c for c in columns if c in self._schema.names]
extra_columns = [c for c in columns if c in self._extra_columns]
for batch in pf.iter_batches(batch_size=batch_size, columns=parquet_columns):
df = batch.to_pandas()
# Add extra columns, sliced to the current chunk
for col in extra_columns:
col_data = pd.Series(self._extra_columns[col][start:start + len(df)])
df[col] = col_data.reset_index(drop=True)
# Reorder columns
df = df[columns]
# Set index to the corresponding slice of self._index
df.index = self._index[start:start + len(df)]
start += len(df)
yield df
def _invalidate_cache(self):
self._pandas_cache = None
def __getattr__(self, name):
# Delegate to pandas if method exists
if hasattr(pd.DataFrame, name):
return getattr(self.to_pandas(), name)
raise AttributeError(f"'LazyParquetDataFrame' object has no attribute '{name}'")
def __getitem__(self, key):
if key in self._loaded_columns:
return self._loaded_columns[key]
elif key in self._schema.names:
col = pq.read_table(self.path, columns=[key]).to_pandas()[key]
# If the column is empty, set dtype from schema or default to float64 if null
if col.empty:
field_type = self._schema.field(key).type
if field_type == "null" or str(field_type) == "null":
dtype = "float64"
else:
dtype = field_type.to_pandas_dtype()
col = pd.Series([], dtype=dtype, name=key)
self._loaded_columns[key] = col
return col
elif key in self._extra_columns:
return self._extra_columns[key]
else:
raise KeyError(f"Column '{key}' not found.")
def __setitem__(self, key, value):
if key in self._schema.names or key in self._loaded_columns:
self._loaded_columns[key] = value
else:
self._extra_columns[key] = value
if key not in self._column_order:
self._column_order.append(key)
self._invalidate_cache()
[docs]
def add_column(self, name: str, data, position=None):
"""Add a new column to the DataFrame."""
self._extra_columns[name] = data
if position is None:
self._column_order.append(name)
else:
self._column_order.insert(position, name)
self._invalidate_cache()
[docs]
def head(self, n: int = 5):
"""Return the first n rows of the DataFrame."""
return pq.read_table(self.path, columns=self._schema.names).to_pandas().head(n)
[docs]
def to_parquet(self, path: Path):
"""Write the DataFrame to a Parquet file."""
df = self.to_pandas()
df.to_parquet(path)
[docs]
def save(self, path=None, batch_size=100_000):
"""Save the DataFrame to Parquet in chunks to reduce memory usage."""
target = path or self.path
writer = None
for chunk in self.iter_chunks(batch_size=batch_size):
table = pa.Table.from_pandas(chunk)
if writer is None:
writer = pq.ParquetWriter(target, table.schema)
writer.write_table(table)
if writer is not None:
writer.close()
self._invalidate_cache()
def _update_from_pandas(self, df):
"""Update the internal state from a pandas DataFrame."""
for col in df.columns:
if col in self._schema.names:
self._loaded_columns[col] = df[col]
else:
self._extra_columns[col] = df[col]
self._column_order = list(df.columns)
self._invalidate_cache()
@property
def loc(self):
return LazyLocIndexer(self)
@property
def index(self):
return self._index
@property
def columns(self):
return self._column_order
@property
def shape(self):
return len(self._index), len(self._column_order)
@property
def dtypes(self):
import pandas as pd
dtypes = {}
for name in self._schema.names:
field = self._schema.field(name)
field_type = field.type
try:
dtype = field_type.to_pandas_dtype()
except Exception:
dtype = "object"
# Map nullable integer/float to pandas extension dtype
if field.nullable:
if pd.api.types.is_integer_dtype(dtype):
dtypes[name] = f"Int{pd.api.types._get_dtype(dtype).itemsize * 8}"
continue
elif pd.api.types.is_float_dtype(dtype):
dtypes[name] = f"Float{pd.api.types._get_dtype(dtype).itemsize * 8}"
continue
ser = pd.Series([], dtype=dtype, name=name)
# If dtype is object and column is empty, default to float64
if ser.empty and ser.dtype == "object":
dtypes[name] = "float64"
else:
dtypes[name] = ser.dtype.name
for name, col in self._extra_columns.items():
ser = pd.Series(col)
dtypes[name] = ser.dtype.name
return pd.Series(dtypes)
[docs]
def assign(self, **kwargs):
"""Assign new columns to the DataFrame."""
df = self.to_pandas().assign(**kwargs)
new_df = LazyParquetDataFrame(self.path)
new_df._update_from_pandas(df)
return new_df
[docs]
def insert(self, loc, column, value, allow_duplicates=False):
"""Insert a new column at a specific location."""
if column in self._column_order and not allow_duplicates:
raise ValueError(f"Column '{column}' already exists.")
self._extra_columns[column] = value
self._column_order.insert(loc, column)
self._invalidate_cache()
[docs]
def drop(self, labels=None, axis=0, index=None, columns=None, level=None, inplace=False, errors='raise'):
"""Drop specified labels from the DataFrame."""
df = self.to_pandas().drop(labels=labels, axis=axis, index=index, columns=columns, level=level, inplace=False,
errors=errors)
if inplace:
self._update_from_pandas(df)
self._invalidate_cache()
return None
else:
new_df = LazyParquetDataFrame(self.path)
new_df._update_from_pandas(df)
new_df._invalidate_cache()
return new_df
[docs]
def rename(self, mapper=None, index=None, columns=None, axis=None, copy=True, inplace=False, level=None,
errors='ignore'):
"""Rename the columns or index of the DataFrame."""
df = self.to_pandas().rename(mapper=mapper, index=index, columns=columns, axis=axis, copy=copy, inplace=False,
level=level, errors=errors)
if inplace:
self._update_from_pandas(df)
self._invalidate_cache()
return None
else:
new_df = LazyParquetDataFrame(self.path)
new_df._update_from_pandas(df)
new_df._invalidate_cache()
return new_df
def __len__(self):
return len(self.to_pandas())
def __repr__(self):
return repr(self.to_pandas())
def __str__(self):
return str(self.to_pandas())
def __iter__(self):
return iter(self.to_pandas())
def __contains__(self, item):
return item in self._column_order
def __eq__(self, other):
return self.to_pandas().equals(other)
def __ne__(self, other):
return not self.__eq__(other)
def __add__(self, other):
return self.to_pandas() + other
def __sub__(self, other):
return self.to_pandas() - other
def __mul__(self, other):
return self.to_pandas() * other
def __truediv__(self, other):
return self.to_pandas() / other
def __floordiv__(self, other):
return self.to_pandas() // other
def __mod__(self, other):
return self.to_pandas() % other
def __pow__(self, other):
return self.to_pandas() ** other
def __and__(self, other):
return self.to_pandas() & other
def __or__(self, other):
return self.to_pandas() | other
def __xor__(self, other):
return self.to_pandas() ^ other
def __lt__(self, other):
return self.to_pandas() < other
def __le__(self, other):
return self.to_pandas() <= other
def __gt__(self, other):
return self.to_pandas() > other
def __ge__(self, other):
return self.to_pandas() >= other
def __neg__(self):
return -self.to_pandas()
def __abs__(self):
return abs(self.to_pandas())
def __invert__(self):
return ~self.to_pandas()
def __round__(self, n=None):
return self.to_pandas().round(n)
def __floor__(self):
return self.to_pandas().floor()
def __ceil__(self):
return self.to_pandas().ceil()
def __trunc__(self):
return self.to_pandas().trunc()
def __radd__(self, other):
return other + self.to_pandas()
def __rsub__(self, other):
return other - self.to_pandas()
def __rmul__(self, other):
return other * self.to_pandas()
def __rtruediv__(self, other):
return other / self.to_pandas()
def __rfloordiv__(self, other):
return other // self.to_pandas()
def __rmod__(self, other):
return other % self.to_pandas()
def __rpow__(self, other):
return other ** self.to_pandas()
def __rand__(self, other):
return other & self.to_pandas()
def __ror__(self, other):
return other | self.to_pandas()
def __rxor__(self, other):
return other ^ self.to_pandas()
def __iadd__(self, other):
self._update_from_pandas(self.to_pandas() + other)
return self
def __isub__(self, other):
self._update_from_pandas(self.to_pandas() - other)
return self
def __imul__(self, other):
self._update_from_pandas(self.to_pandas() * other)
return self
def __itruediv__(self, other):
self._update_from_pandas(self.to_pandas() / other)
return self
def __ifloordiv__(self, other):
self._update_from_pandas(self.to_pandas() // other)
return self
def __imod__(self, other):
self._update_from_pandas(self.to_pandas() % other)
return self
def __ipow__(self, other):
self._update_from_pandas(self.to_pandas() ** other)
return self
def __iand__(self, other):
self._update_from_pandas(self.to_pandas() & other)
return self
def __ior__(self, other):
self._update_from_pandas(self.to_pandas() | other)
return self
def __ixor__(self, other):
self._update_from_pandas(self.to_pandas() ^ other)
return self
def __ilshift__(self, other):
self._update_from_pandas(self.to_pandas() << other)
return self
def __irshift__(self, other):
self._update_from_pandas(self.to_pandas() >> other)
return self