Lookups#
This guide covers df-eval’s lookup functionality for integrating external data sources.
Overview#
Lookups allow you to resolve values from external data sources during expression evaluation. This is useful for:
Mapping codes to descriptions
Looking up prices or rates
Resolving configuration values
Fetching data from databases or APIs
Implementing business rules from external sources
Basic Lookup Usage#
The lookup function resolves values using a resolver:
import pandas as pd
from df_eval import lookup, DictResolver
# Create a mapping
mapping = {
"apple": 1.50,
"banana": 0.75,
"orange": 1.25
}
# Create a resolver
resolver = DictResolver(mapping, default=0.0)
# Lookup values
products = pd.Series(["apple", "banana", "cherry"])
prices = lookup(products, resolver, on_missing="null")
print(prices) # [1.50, 0.75, None]
For a complete, runnable walkthrough that combines lookups with expression evaluation, see the example Engine Lookups with Resolvers.
Resolver Types#
DictResolver#
The simplest resolver for in-memory mappings:
from df_eval import DictResolver
# Basic dictionary resolver
mapping = {
"USD": 1.0,
"EUR": 0.85,
"GBP": 0.73,
"JPY": 110.0
}
resolver = DictResolver(mapping, default=1.0)
currencies = pd.Series(["USD", "EUR", "GBP", "CAD"])
rates = lookup(currencies, resolver, on_missing="default")
print(rates) # [1.0, 0.85, 0.73, 1.0]
FileResolver#
Load mappings from CSV, JSON, or Excel files:
from df_eval import FileResolver
# Lookup from CSV file
# prices.csv:
# product,price
# apple,1.50
# banana,0.75
# orange,1.25
resolver = FileResolver(
"prices.csv",
key_column="product",
value_column="price"
)
products = pd.Series(["apple", "banana", "cherry"])
prices = lookup(products, resolver, on_missing="null")
Database Resolvers#
Create custom resolvers for database lookups:
from df_eval import BaseResolver
class DatabaseResolver(BaseResolver):
def __init__(self, connection_string, table, key_col, value_col):
self.conn = create_connection(connection_string)
self.table = table
self.key_col = key_col
self.value_col = value_col
def resolve(self, keys):
"""Resolve keys to values using database query."""
# Build query with parameterized values
placeholders = ','.join(['?'] * len(keys))
query = f"""
SELECT {self.key_col}, {self.value_col}
FROM {self.table}
WHERE {self.key_col} IN ({placeholders})
"""
# Execute query
results = pd.read_sql(query, self.conn, params=keys.tolist())
# Map results back to keys
mapping = dict(zip(results[self.key_col], results[self.value_col]))
return keys.map(mapping)
# Use the custom resolver
resolver = DatabaseResolver(
"postgresql://localhost/mydb",
table="products",
key_col="product_id",
value_col="product_name"
)
ids = pd.Series([101, 102, 103])
names = lookup(ids, resolver)
HTTP API Resolvers#
Fetch data from web APIs:
import requests
class APIResolver(BaseResolver):
def __init__(self, api_url, api_key=None):
self.api_url = api_url
self.api_key = api_key
def resolve(self, keys):
"""Resolve keys using HTTP API."""
headers = {}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# Batch request
response = requests.post(
self.api_url,
json={'keys': keys.tolist()},
headers=headers
)
response.raise_for_status()
# Parse response
data = response.json()
mapping = {item['key']: item['value'] for item in data['results']}
return keys.map(mapping)
# Use the API resolver
resolver = APIResolver(
"https://api.example.com/lookup",
api_key="your-api-key"
)
codes = pd.Series(["ABC", "DEF", "GHI"])
values = lookup(codes, resolver)
Caching#
Improve performance by caching lookup results:
from df_eval import CachedResolver
# Wrap any resolver with caching
base_resolver = FileResolver("prices.csv", "product", "price")
cached_resolver = CachedResolver(base_resolver, ttl_seconds=300)
# First lookup hits the file
prices1 = lookup(products, cached_resolver)
# Second lookup uses cache (faster)
prices2 = lookup(products, cached_resolver)
# Cache expires after 5 minutes (300 seconds)
Cache Configuration#
# Configure cache behavior
cached_resolver = CachedResolver(
base_resolver,
ttl_seconds=600, # Cache for 10 minutes
max_size=10000, # Store up to 10,000 entries
refresh_on_hit=True # Reset TTL when cache is hit
)
Manual Cache Management#
# Clear the cache
cached_resolver.clear_cache()
# Check cache statistics
stats = cached_resolver.get_stats()
print(f"Cache hits: {stats['hits']}")
print(f"Cache misses: {stats['misses']}")
print(f"Cache size: {stats['size']}")
Missing Value Handling#
Control how missing values are handled:
# Return None for missing values
result = lookup(keys, resolver, on_missing="null")
# Use default value from resolver
result = lookup(keys, resolver, on_missing="default")
# Raise error on missing values
try:
result = lookup(keys, resolver, on_missing="raise")
except KeyError as e:
print(f"Missing keys: {e}")
# Return the original key for missing values
result = lookup(keys, resolver, on_missing="key")
Using Lookups in Pipelines#
Integrate lookups into df-eval pipelines by combining resolvers with regular
expressions. Lookups themselves are performed in Python using the helper
function df_eval.lookup.lookup() or via Pandera metadata.
Basic engine integration:
import pandas as pd
from df_eval import Engine, DictResolver, lookup
# Setup resolver
price_resolver = DictResolver({
"apple": 1.50,
"banana": 0.75,
"orange": 1.25,
})
engine = Engine()
engine.register_resolver("prices", price_resolver)
df = pd.DataFrame({
"product": ["apple", "banana", "orange"],
"quantity": [10, 20, 15],
})
# Perform lookups explicitly in Python
prices = lookup(df["product"], price_resolver, on_missing="null")
df = df.assign(price=prices)
# Then use df-eval expressions for arithmetic/boolean logic
schema = {
"total": "price * quantity",
}
result = engine.apply_schema(df, schema)
print(result)
# product quantity price total
# 0 apple 10 1.50 15.00
# 1 banana 20 0.75 15.00
# 2 orange 15 1.25 18.75
See also the gallery example Engine Lookups with Resolvers for a slightly more complete pipeline using in-memory resolvers, and Lookup and Function Pipeline via Pandera for an end-to-end pipeline that stores lookup configuration in a Pandera schema.
Chaining Lookups#
Perform multiple lookups in sequence:
# Setup multiple resolvers
category_resolver = DictResolver({
"apple": "fruit",
"banana": "fruit",
"carrot": "vegetable"
})
tax_rate_resolver = DictResolver({
"fruit": 0.05,
"vegetable": 0.03,
"other": 0.08
})
engine.register_resolver("categories", category_resolver)
engine.register_resolver("tax_rates", tax_rate_resolver)
# First lookup: product -> category
categories = lookup(df["product"], category_resolver, on_missing="key")
df = df.assign(category=categories)
# Second lookup: category -> tax rate
tax_rates = lookup(df["category"], tax_rate_resolver, on_missing="default")
df = df.assign(tax_rate=tax_rates)
# Now use df-eval expressions to compute derived values
schema = {
"price_with_tax": "price * (1 + tax_rate)",
}
result = engine.apply_schema(df, schema)
For metadata-driven pipelines that keep lookup configuration inside a Pandera
schema, see df_eval.pandera and the
df_eval.docs.examples.lookup_pandera_pipeline example.
Batch Lookups#
Optimize lookups for large datasets:
# Resolvers should implement batch resolution
class BatchResolver(BaseResolver):
def resolve(self, keys):
"""Resolve all keys in a single batch operation."""
# Get unique keys to minimize lookups
unique_keys = keys.unique()
# Fetch all values at once (e.g., single database query)
mapping = self._batch_fetch(unique_keys)
# Map back to original keys
return keys.map(mapping)
def _batch_fetch(self, keys):
# Implement efficient batch fetching
pass
Error Handling#
Handle lookup errors gracefully:
from df_eval import lookup, LookupError
try:
result = lookup(keys, resolver, on_missing="raise")
except LookupError as e:
print(f"Lookup failed: {e}")
print(f"Missing keys: {e.missing_keys}")
# Fallback strategy
result = lookup(keys, resolver, on_missing="default")
Best Practices#
1. Use Caching for Expensive Lookups#
# Always wrap expensive resolvers (DB, API) with caching
expensive_resolver = DatabaseResolver(...)
cached = CachedResolver(expensive_resolver, ttl_seconds=300)
2. Batch Operations#
# Implement batch resolution for better performance
def resolve(self, keys):
# Fetch all keys at once, not one-by-one
unique_keys = keys.unique()
mapping = self._fetch_batch(unique_keys)
return keys.map(mapping)
3. Handle Missing Values#
# Always specify on_missing behavior
result = lookup(keys, resolver, on_missing="default")
4. Monitor Cache Performance#
# Check cache hit rate periodically
stats = cached_resolver.get_stats()
hit_rate = stats['hits'] / (stats['hits'] + stats['misses'])
if hit_rate < 0.8:
print("Consider increasing cache size or TTL")
Next Steps#
Check the API Reference for complete documentation
Review Basic Usage for core concepts
Explore Advanced Usage for more features