Note
Go to the end to download the full example code.
Lookup and Function Pipeline via Pandera#
This example shows how to drive a small pipeline entirely from Pandera metadata, using df-eval for both lookup-style operations and generic pipeline functions.
Pipeline:
Define a Pandera schema with df-eval metadata
Register a resolver for prices and a pipeline function for discounts
Use
apply_pandera_schemato validate, derive, and re-validate
import pandas as pd
import pandera.pandas as pa
from df_eval import Engine, DictResolver
from df_eval.pandera import apply_pandera_schema
Define Pandera Schema with df-eval Metadata#
price is driven by a lookup spec that references a named resolver.
discounted_total is driven by a generic function step that calls a
registered pipeline function.
schema = pa.DataFrameSchema(
{
"product": pa.Column(str),
"quantity": pa.Column(int),
"price": pa.Column(
float,
metadata={
"df-eval": {
"lookup": {
"resolver": "prices", # name of registered resolver
"key": "product",
"on_missing": "null",
}
}
},
),
"line_total": pa.Column(
float,
metadata={"df-eval": {"expr": "price * quantity"}},
),
"discounted_total": pa.Column(
float,
metadata={
"df-eval": {
"function": {
"name": "apply_discount", # registered pipeline function
"inputs": ["line_total"],
"outputs": ["discounted_total"],
"params": {"rate": 0.1}, # 10% discount
}
}
},
),
}
)
schema
<Schema DataFrameSchema(columns={'product': <Schema Column(name=product, type=DataType(str))>, 'quantity': <Schema Column(name=quantity, type=DataType(int64))>, 'price': <Schema Column(name=price, type=DataType(float64))>, 'line_total': <Schema Column(name=line_total, type=DataType(float64))>, 'discounted_total': <Schema Column(name=discounted_total, type=DataType(float64))>}, checks=[], parsers=[], index=None, dtype=None, coerce=False, strict=False, name=None, ordered=False, unique=None, report_duplicates=all, unique_column_names=False, add_missing_columns=False, title=None, description=None, metadata=None, drop_invalid_rows=False)>
Build Input Data#
df = pd.DataFrame(
{
"product": ["apple", "banana", "orange"],
"quantity": [10, 20, 15],
}
)
df
Register Resolver and Pipeline Function#
The resolver provides prices by product, while the pipeline function
applies a simple percentage discount to line_total.
price_resolver = DictResolver(
{
"apple": 1.50,
"banana": 0.75,
"orange": 1.25,
}
)
def apply_discount(df_slice: pd.DataFrame, *, rate: float) -> pd.Series:
"""Simple example pipeline function applying a discount to a column.
Expects a single input column ``line_total`` and returns the
discounted total as a Series.
"""
return df_slice["line_total"] * (1 - rate)
engine = Engine()
engine.register_resolver("prices", price_resolver)
engine.register_pipeline_function("apply_discount", apply_discount)
Apply Pandera Schema#
apply_pandera_schema will:
Pre-validate input columns (excluding derived ones)
Apply df-eval-derived columns (including lookup + function steps)
Optionally re-validate the full schema
Total running time of the script: (0 minutes 0.012 seconds)