Source code for parq_blockmodel.utils.pandas.aggregate

from typing import Literal

import numpy as np
import pandas as pd
from pandas import CategoricalDtype


[docs] def aggregate(df: pd.DataFrame, agg_dict: dict, cat_treatment: Literal['majority', 'proportions'] = 'majority', proportions_as_columns: bool = False) -> pd.DataFrame: """ Aggregate a DataFrame using a provided dictionary. Args: df: The DataFrame to aggregate. agg_dict: A dictionary where keys are the columns to be aggregated and values are the weight columns. cat_treatment: A string indicating how to treat categorical columns. 'majority' returns the majority category, 'proportions' returns the proportions of each category. proportions_as_columns: A boolean indicating whether to return category proportions as separate columns. Returns: pd.DataFrame: The aggregated DataFrame with columns in the same order as the incoming DataFrame. """ result = {} weight_columns = set(agg_dict.values()) for weight_col in weight_columns: # Get columns that share the same weight column cols_with_weight = [col for col, w_col in agg_dict.items() if w_col == weight_col] if cols_with_weight: weights = df[weight_col].values weighted_values = df[cols_with_weight].values * weights[:, np.newaxis] aggregated_values = np.sum(weighted_values, axis=0) / np.sum(weights) result.update({col: aggregated_values[i] for i, col in enumerate(cols_with_weight)}) # Sum columns that are not in the agg_dict for col in df.columns: if col not in agg_dict: if isinstance(df[col].dtype, CategoricalDtype): if cat_treatment == 'majority': result[col] = df[col].mode()[0] # Get the majority category elif cat_treatment == 'proportions': proportions = df[col].value_counts(normalize=True).to_dict() if proportions_as_columns: for cat, prop in proportions.items(): result[f"{col}_{cat}"] = prop else: result[col] = proportions else: result[col] = df[col].sum() # Create a DataFrame from the result dictionary aggregated_df = pd.DataFrame([result]) # Manage the final column order if proportions_as_columns: # loop through the columns and add them, extending with cat classes final_columns = [] for col in df.columns: if col in result: final_columns.append(col) elif isinstance(df[col].dtype, CategoricalDtype) and cat_treatment == 'proportions': for cat in df[col].cat.categories: final_columns.append(f"{col}_{cat}") aggregated_df = aggregated_df[final_columns] else: # Ensure the columns are in the same order as the incoming DataFrame aggregated_df = aggregated_df[df.columns] return aggregated_df