Lookups#

This guide covers df-eval’s lookup functionality for integrating external data sources.

Overview#

Lookups allow you to resolve values from external data sources during expression evaluation. This is useful for:

  • Mapping codes to descriptions

  • Looking up prices or rates

  • Resolving configuration values

  • Fetching data from databases or APIs

  • Implementing business rules from external sources

Basic Lookup Usage#

The lookup function resolves values using a resolver:

import pandas as pd
from df_eval import lookup, DictResolver

# Create a mapping
mapping = {
    "apple": 1.50,
    "banana": 0.75,
    "orange": 1.25
}

# Create a resolver
resolver = DictResolver(mapping, default=0.0)

# Lookup values
products = pd.Series(["apple", "banana", "cherry"])
prices = lookup(products, resolver, on_missing="null")
print(prices)  # [1.50, 0.75, None]

For a complete, runnable walkthrough that combines lookups with expression evaluation, see the example Engine Lookups with Resolvers.

Resolver Types#

DictResolver#

The simplest resolver for in-memory mappings:

from df_eval import DictResolver

# Basic dictionary resolver
mapping = {
    "USD": 1.0,
    "EUR": 0.85,
    "GBP": 0.73,
    "JPY": 110.0
}

resolver = DictResolver(mapping, default=1.0)

currencies = pd.Series(["USD", "EUR", "GBP", "CAD"])
rates = lookup(currencies, resolver, on_missing="default")
print(rates)  # [1.0, 0.85, 0.73, 1.0]

FileResolver#

Load mappings from CSV, JSON, or Excel files:

from df_eval import FileResolver

# Lookup from CSV file
# prices.csv:
#   product,price
#   apple,1.50
#   banana,0.75
#   orange,1.25

resolver = FileResolver(
    "prices.csv",
    key_column="product",
    value_column="price"
)

products = pd.Series(["apple", "banana", "cherry"])
prices = lookup(products, resolver, on_missing="null")

Database Resolvers#

Create custom resolvers for database lookups:

from df_eval import BaseResolver

class DatabaseResolver(BaseResolver):
    def __init__(self, connection_string, table, key_col, value_col):
        self.conn = create_connection(connection_string)
        self.table = table
        self.key_col = key_col
        self.value_col = value_col

    def resolve(self, keys):
        """Resolve keys to values using database query."""
        # Build query with parameterized values
        placeholders = ','.join(['?'] * len(keys))
        query = f"""
            SELECT {self.key_col}, {self.value_col}
            FROM {self.table}
            WHERE {self.key_col} IN ({placeholders})
        """

        # Execute query
        results = pd.read_sql(query, self.conn, params=keys.tolist())

        # Map results back to keys
        mapping = dict(zip(results[self.key_col], results[self.value_col]))
        return keys.map(mapping)

# Use the custom resolver
resolver = DatabaseResolver(
    "postgresql://localhost/mydb",
    table="products",
    key_col="product_id",
    value_col="product_name"
)

ids = pd.Series([101, 102, 103])
names = lookup(ids, resolver)

HTTP API Resolvers#

Fetch data from web APIs:

import requests

class APIResolver(BaseResolver):
    def __init__(self, api_url, api_key=None):
        self.api_url = api_url
        self.api_key = api_key

    def resolve(self, keys):
        """Resolve keys using HTTP API."""
        headers = {}
        if self.api_key:
            headers['Authorization'] = f'Bearer {self.api_key}'

        # Batch request
        response = requests.post(
            self.api_url,
            json={'keys': keys.tolist()},
            headers=headers
        )
        response.raise_for_status()

        # Parse response
        data = response.json()
        mapping = {item['key']: item['value'] for item in data['results']}
        return keys.map(mapping)

# Use the API resolver
resolver = APIResolver(
    "https://api.example.com/lookup",
    api_key="your-api-key"
)

codes = pd.Series(["ABC", "DEF", "GHI"])
values = lookup(codes, resolver)

Caching#

Improve performance by caching lookup results:

from df_eval import CachedResolver

# Wrap any resolver with caching
base_resolver = FileResolver("prices.csv", "product", "price")
cached_resolver = CachedResolver(base_resolver, ttl_seconds=300)

# First lookup hits the file
prices1 = lookup(products, cached_resolver)

# Second lookup uses cache (faster)
prices2 = lookup(products, cached_resolver)

# Cache expires after 5 minutes (300 seconds)

Cache Configuration#

# Configure cache behavior
cached_resolver = CachedResolver(
    base_resolver,
    ttl_seconds=600,        # Cache for 10 minutes
    max_size=10000,         # Store up to 10,000 entries
    refresh_on_hit=True     # Reset TTL when cache is hit
)

Manual Cache Management#

# Clear the cache
cached_resolver.clear_cache()

# Check cache statistics
stats = cached_resolver.get_stats()
print(f"Cache hits: {stats['hits']}")
print(f"Cache misses: {stats['misses']}")
print(f"Cache size: {stats['size']}")

Missing Value Handling#

Control how missing values are handled:

# Return None for missing values
result = lookup(keys, resolver, on_missing="null")

# Use default value from resolver
result = lookup(keys, resolver, on_missing="default")

# Raise error on missing values
try:
    result = lookup(keys, resolver, on_missing="raise")
except KeyError as e:
    print(f"Missing keys: {e}")

# Return the original key for missing values
result = lookup(keys, resolver, on_missing="key")

Using Lookups in Pipelines#

Integrate lookups into df-eval pipelines by combining resolvers with regular expressions. Lookups themselves are performed in Python using the helper function df_eval.lookup.lookup() or via Pandera metadata.

Basic engine integration:

import pandas as pd
from df_eval import Engine, DictResolver, lookup

# Setup resolver
price_resolver = DictResolver({
    "apple": 1.50,
    "banana": 0.75,
    "orange": 1.25,
})

engine = Engine()
engine.register_resolver("prices", price_resolver)

df = pd.DataFrame({
    "product": ["apple", "banana", "orange"],
    "quantity": [10, 20, 15],
})

# Perform lookups explicitly in Python
prices = lookup(df["product"], price_resolver, on_missing="null")
df = df.assign(price=prices)

# Then use df-eval expressions for arithmetic/boolean logic
schema = {
    "total": "price * quantity",
}

result = engine.apply_schema(df, schema)
print(result)
#   product  quantity  price  total
# 0   apple        10   1.50  15.00
# 1  banana        20   0.75  15.00
# 2  orange        15   1.25  18.75

See also the gallery example Engine Lookups with Resolvers for a slightly more complete pipeline using in-memory resolvers, and Lookup and Function Pipeline via Pandera for an end-to-end pipeline that stores lookup configuration in a Pandera schema.

Chaining Lookups#

Perform multiple lookups in sequence:

# Setup multiple resolvers
category_resolver = DictResolver({
    "apple": "fruit",
    "banana": "fruit",
    "carrot": "vegetable"
})

tax_rate_resolver = DictResolver({
    "fruit": 0.05,
    "vegetable": 0.03,
    "other": 0.08
})

engine.register_resolver("categories", category_resolver)
engine.register_resolver("tax_rates", tax_rate_resolver)

# First lookup: product -> category
categories = lookup(df["product"], category_resolver, on_missing="key")
df = df.assign(category=categories)

# Second lookup: category -> tax rate
tax_rates = lookup(df["category"], tax_rate_resolver, on_missing="default")
df = df.assign(tax_rate=tax_rates)

# Now use df-eval expressions to compute derived values
schema = {
    "price_with_tax": "price * (1 + tax_rate)",
}

result = engine.apply_schema(df, schema)

For metadata-driven pipelines that keep lookup configuration inside a Pandera schema, see df_eval.pandera and the df_eval.docs.examples.lookup_pandera_pipeline example.

Batch Lookups#

Optimize lookups for large datasets:

# Resolvers should implement batch resolution
class BatchResolver(BaseResolver):
    def resolve(self, keys):
        """Resolve all keys in a single batch operation."""
        # Get unique keys to minimize lookups
        unique_keys = keys.unique()

        # Fetch all values at once (e.g., single database query)
        mapping = self._batch_fetch(unique_keys)

        # Map back to original keys
        return keys.map(mapping)

    def _batch_fetch(self, keys):
        # Implement efficient batch fetching
        pass

Error Handling#

Handle lookup errors gracefully:

from df_eval import lookup, LookupError

try:
    result = lookup(keys, resolver, on_missing="raise")
except LookupError as e:
    print(f"Lookup failed: {e}")
    print(f"Missing keys: {e.missing_keys}")

    # Fallback strategy
    result = lookup(keys, resolver, on_missing="default")

Best Practices#

1. Use Caching for Expensive Lookups#

# Always wrap expensive resolvers (DB, API) with caching
expensive_resolver = DatabaseResolver(...)
cached = CachedResolver(expensive_resolver, ttl_seconds=300)

2. Batch Operations#

# Implement batch resolution for better performance
def resolve(self, keys):
    # Fetch all keys at once, not one-by-one
    unique_keys = keys.unique()
    mapping = self._fetch_batch(unique_keys)
    return keys.map(mapping)

3. Handle Missing Values#

# Always specify on_missing behavior
result = lookup(keys, resolver, on_missing="default")

4. Monitor Cache Performance#

# Check cache hit rate periodically
stats = cached_resolver.get_stats()
hit_rate = stats['hits'] / (stats['hits'] + stats['misses'])
if hit_rate < 0.8:
    print("Consider increasing cache size or TTL")

Next Steps#