Source code for df_eval.pandera

"""Pandera integration helpers for df-eval.

This module keeps Pandera support optional and layered on top of the core
Engine API by translating Pandera column metadata into a df-eval schema map.
"""

from __future__ import annotations

import copy
from collections.abc import Mapping
from pathlib import Path
from typing import Any

import pandas as pd

from df_eval.engine import Engine
from df_eval.expr import Expression


def _import_pandera() -> Any:
    """Import pandera lazily so df-eval works without the optional dependency."""
    try:
        import pandera as pa
    except ImportError as exc:  # pragma: no cover - environment dependent
        raise ImportError(
            "Pandera support requires the optional dependency. "
            "Install with: pip install 'df-eval[pandera]'"
        ) from exc
    return pa


def _is_schema_model_subclass(schema: Any, pa: Any) -> bool:
    """Return True when schema is a Pandera model class with to_schema()."""
    if not isinstance(schema, type):
        return False

    if callable(getattr(schema, "to_schema", None)):
        return True

    model_types = tuple(
        candidate
        for candidate in (
            getattr(pa, "SchemaModel", None),
            getattr(pa, "DataFrameModel", None),
        )
        if isinstance(candidate, type)
    )
    return bool(model_types) and issubclass(schema, model_types)


def _to_dataframe_schema(schema: Any, pa: Any) -> Any:
    """Normalize SchemaModel/DataFrameModel classes and DataFrameSchema objects."""
    if _is_schema_model_subclass(schema, pa):
        return schema.to_schema()

    has_schema_shape = (
        hasattr(schema, "columns")
        and callable(getattr(schema, "validate", None))
    )
    if has_schema_shape:
        return schema

    raise TypeError(
        "schema must be a pandera SchemaModel/DataFrameModel subclass "
        "or a pandera DataFrameSchema"
    )


def _build_subset_schema(df_schema: Any, excluded_columns: set[str]) -> Any:
    """Create a schema with selected columns removed for pre-validation."""
    if not excluded_columns:
        return df_schema

    try:
        return df_schema.remove_columns(list(excluded_columns))
    except AttributeError:
        columns = {
            name: column
            for name, column in df_schema.columns.items()
            if name not in excluded_columns
        }
        kwargs = {
            "index": getattr(df_schema, "index", None),
            "dtype": getattr(df_schema, "dtype", None),
            "coerce": getattr(df_schema, "coerce", False),
            "strict": getattr(df_schema, "strict", False),
            "name": getattr(df_schema, "name", None),
            "ordered": getattr(df_schema, "ordered", False),
            "unique": getattr(df_schema, "unique", None),
            "checks": getattr(df_schema, "checks", None),
        }
        kwargs = {key: value for key, value in kwargs.items() if value is not None}
        return df_schema.__class__(columns=columns, **kwargs)


def _validate_with_coerce(df_schema: Any, df: pd.DataFrame, coerce: bool) -> pd.DataFrame:
    """Validate across Pandera versions with/without validate(..., coerce=...)."""
    try:
        return df_schema.validate(df, coerce=coerce)
    except TypeError as exc:
        if "coerce" not in str(exc):
            raise

    # Newer Pandera versions removed the validate(..., coerce=...) kwarg.
    schema_copy = copy.deepcopy(df_schema)
    schema_copy.coerce = coerce
    for column in schema_copy.columns.values():
        column.coerce = coerce
    return schema_copy.validate(df)


[docs] def df_eval_schema_from_pandera( schema: Any, meta_key: str = "df-eval", expr_key: str = "expr", ) -> dict[str, str]: """Build a df-eval schema mapping from Pandera per-column metadata.""" pa = _import_pandera() df_schema = _to_dataframe_schema(schema, pa) expr_map: dict[str, str] = {} for col_name, col_schema in df_schema.columns.items(): metadata = col_schema.metadata or {} if not isinstance(metadata, Mapping): raise TypeError(f"metadata for column '{col_name}' must be a mapping") section = metadata.get(meta_key) if section is None: continue if not isinstance(section, Mapping): raise TypeError( f"metadata['{meta_key}'] for column '{col_name}' must be a mapping" ) expr = section.get(expr_key) if expr is None: continue if not isinstance(expr, str): raise TypeError( f"metadata['{meta_key}']['{expr_key}'] for column '{col_name}' " "must be a string" ) expr_map[col_name] = expr return expr_map
[docs] def df_eval_operations_from_pandera( schema: Any, meta_key: str = "df-eval", ) -> dict[str, dict[str, Any]]: """Build a rich df-eval operations mapping from Pandera column metadata. Each column may define one of the following under ``metadata[meta_key]``:: {"expr": "a + b"} {"lookup": {"resolver": "prices", "key": "product"}} {"function": {"name": "churn_model_v1", "inputs": ["age"]}} The returned mapping has the shape:: { "column_name": { "kind": "expr" | "lookup" | "function", "expr": str | None, "lookup": dict | None, "function": dict | None, }, } """ pa = _import_pandera() df_schema = _to_dataframe_schema(schema, pa) ops: dict[str, dict[str, Any]] = {} for col_name, col_schema in df_schema.columns.items(): metadata = col_schema.metadata or {} if not isinstance(metadata, Mapping): continue section = metadata.get(meta_key) if section is None: continue if not isinstance(section, Mapping): raise TypeError( f"metadata['{meta_key}'] for column '{col_name}' must be a mapping" ) if "expr" in section: expr = section["expr"] if not isinstance(expr, str): raise TypeError( f"metadata['{meta_key}']['expr'] for column '{col_name}' must be a string" ) ops[col_name] = { "kind": "expr", "expr": expr, "lookup": None, "function": None, } elif "lookup" in section: lookup_spec = section["lookup"] if not isinstance(lookup_spec, Mapping): raise TypeError( f"metadata['{meta_key}']['lookup'] for column '{col_name}' must be a mapping" ) ops[col_name] = { "kind": "lookup", "expr": None, "lookup": dict(lookup_spec), "function": None, } elif "function" in section: function_spec = section["function"] if not isinstance(function_spec, Mapping): raise TypeError( f"metadata['{meta_key}']['function'] for column '{col_name}' must be a mapping" ) ops[col_name] = { "kind": "function", "expr": None, "lookup": None, "function": dict(function_spec), } return ops
[docs] def load_pandera_schema_yaml(source: str | Path) -> Any: """Load a Pandera DataFrameSchema from YAML, preserving column metadata. This is a thin, public wrapper around df-eval's temporary fork of ``pandera.io.pandas_io``. It exists to work around https://github.com/unionai-oss/pandera/issues/1301, where column ``metadata`` is not round-tripped by Pandera's YAML/JSON IO helpers. Args: source: Path to a YAML schema file or a YAML string. Returns: A Pandera :class:`~pandera.api.pandas.container.DataFrameSchema`. """ _import_pandera() # ensure the optional dependency is present with a clear error from df_eval.utils import pandera_io_compat as _pa_io return _pa_io.from_yaml(source)
[docs] def dump_pandera_schema_yaml(schema: Any, stream: str | Path | None = None) -> str | None: """Dump a Pandera DataFrameSchema to YAML, preserving column metadata. This uses df-eval's temporary fork of ``pandera.io.pandas_io`` so that column ``metadata`` survives a full IO round-trip. Once Pandera fixes https://github.com/unionai-oss/pandera/issues/1301, this helper may be simplified to delegate directly to Pandera's built-in IO. Args: schema: A Pandera SchemaModel/DataFrameModel class or DataFrameSchema. stream: Optional path or file-like to write to. If ``None``, the YAML representation is returned as a string. Returns: The YAML string if ``stream`` is ``None``, otherwise ``None``. """ _import_pandera() from df_eval.utils import pandera_io_compat as _pa_io return _pa_io.to_yaml(schema, stream=stream)
[docs] def load_pandera_schema_json(source: str | Path) -> Any: """Load a Pandera DataFrameSchema from JSON, preserving column metadata. This mirrors :func:`load_pandera_schema_yaml` but for JSON input. """ _import_pandera() from df_eval.utils import pandera_io_compat as _pa_io return _pa_io.from_json(source)
[docs] def dump_pandera_schema_json(schema: Any, target: str | Path | None = None, **kwargs: Any) -> str | None: """Dump a Pandera DataFrameSchema to JSON, preserving column metadata. Args: schema: A Pandera SchemaModel/DataFrameModel class or DataFrameSchema. target: Optional path or file-like to write to. If ``None``, the JSON representation is returned as a string. **kwargs: Extra keyword arguments forwarded to :func:`json.dump`. """ _import_pandera() from df_eval.utils import pandera_io_compat as _pa_io return _pa_io.to_json(schema, target=target, **kwargs)
def _plan_pandera_parquet_projection( schema: Any, *, meta_key: str, expr_key: str, ) -> tuple[dict[str, str], list[str], list[str]]: """Return expression map, input projection, and schema-ordered outputs.""" pa = _import_pandera() df_schema = _to_dataframe_schema(schema, pa) output_columns = list(df_schema.columns) expr_map = df_eval_schema_from_pandera(df_schema, meta_key=meta_key, expr_key=expr_key) derived_columns = set(expr_map) required_input_columns = { column_name for column_name in output_columns if column_name not in derived_columns } for expr in expr_map.values(): dependencies = Expression(expr).dependencies required_input_columns.update( dependency for dependency in dependencies if dependency in df_schema.columns and dependency not in derived_columns ) input_columns = [ column_name for column_name in output_columns if column_name in required_input_columns ] return expr_map, input_columns, output_columns
[docs] def apply_pandera_schema( df: pd.DataFrame, schema: Any, *, meta_key: str = "df-eval", coerce: bool = True, validate: bool = True, validate_post: bool = True, engine: Engine | None = None, error_on_overwrite: bool = True, ) -> pd.DataFrame: """Validate with Pandera, apply df-eval operations, then optionally revalidate. Columns that define df-eval metadata under ``meta_key`` are considered derived and are excluded from pre-validation. This allows input frames that do not yet include derived columns. The df-eval metadata for each column may currently define exactly one of the following keys: ``{"expr": "a + b"}`` ``{"lookup": {"resolver": "prices", "key": "product"}}`` ``{"function": {"name": "my_fn", "inputs": ["a"], "outputs": ["y"]}}`` These are translated into an operations mapping consumed by :meth:`df_eval.engine.Engine.apply_operations`. """ pa = _import_pandera() df_schema = _to_dataframe_schema(schema, pa) # Build a rich operations map (expr, lookup, function) from column metadata. from df_eval.pandera import df_eval_operations_from_pandera operations = df_eval_operations_from_pandera(df_schema, meta_key=meta_key) derived_columns = set(operations) validated_df = df if validate: base_schema = _build_subset_schema(df_schema, derived_columns) validated_df = _validate_with_coerce(base_schema, df, coerce=coerce) # If there are no df-eval-driven columns, there is nothing for the engine to do. # Mirror previous behaviour and skip post-validation in this case. if not operations: return validated_df if error_on_overwrite: overlapping = derived_columns.intersection(validated_df.columns) if overlapping: overlap_text = ", ".join(sorted(overlapping)) raise ValueError( "input DataFrame already contains derived columns marked by Pandera " f"metadata: {overlap_text}" ) eval_engine = engine or Engine() result = eval_engine.apply_operations(validated_df, operations) if validate and validate_post: result = _validate_with_coerce(df_schema, result, coerce=coerce) return result
[docs] def apply_pandera_schema_parquet_to_parquet( input_path: str | Path, output_path: str | Path, schema: Any, *, meta_key: str = "df-eval", expr_key: str = "expr", engine: Engine | None = None, chunk_size: int = 100_000, compression: str = "snappy", ) -> Path: """Apply a Pandera-driven schema to Parquet input and write Parquet output. The input scan is projected to only required source columns, and output columns are restricted to the Pandera schema order. Args: input_path: Source Parquet file or directory-backed dataset. output_path: Destination Parquet file. schema: Pandera SchemaModel/DataFrameModel class or DataFrameSchema. meta_key: Metadata section containing df-eval expressions. expr_key: Metadata key containing the expression text. engine: Optional Engine instance. chunk_size: Maximum rows processed per chunk. compression: Parquet compression codec used for output. Returns: The normalized output path. """ expr_map, input_columns, output_columns = _plan_pandera_parquet_projection( schema, meta_key=meta_key, expr_key=expr_key, ) eval_engine = engine or Engine() return eval_engine.apply_schema_parquet_to_parquet( input_path, output_path, expr_map, chunk_size=chunk_size, input_columns=input_columns, output_columns=output_columns, compression=compression, )
__all__ = [ "df_eval_schema_from_pandera", "apply_pandera_schema", "apply_pandera_schema_parquet_to_parquet", "df_eval_operations_from_pandera", "load_pandera_schema_yaml", "dump_pandera_schema_yaml", "load_pandera_schema_json", "dump_pandera_schema_json", ]