Source code for omf_io.utils.pandas_utils

import tokenize
from io import StringIO
from token import STRING
from typing import Literal

import pandas as pd
import numpy as np
from pandas import CategoricalDtype


[docs] def is_nullable_integer_dtype(series: pd.Series) -> bool: """ Args: series: The series Returns: bool: True if series contains nullable integer """ return True if str(series.dtype)[0] == "I" else False
[docs] def to_nullable_integer_dtype(series: pd.Series) -> pd.Series: """ Convert an int series to a nullable integer dtype Args: series: The series Returns: pd.Series: The series with nullable dtype """ return series.astype(str(series.dtype).replace("i", "I")) if is_nullable_integer_dtype(series) else series
[docs] def to_numpy_integer_dtype(series: pd.Series) -> pd.Series: """ Convert a nullable int series to a numpy integer dtype Args: series: The series Returns: pd.Series: The series with nullable dtype """ return series.astype(str(series.dtype).replace("I", "i")) if is_nullable_integer_dtype(series) else series
[docs] def parse_vars_from_expr(expr: str) -> list[str]: """ Parse variables from a pandas query expression string. Args: expr: The expression string Returns: list[str]: The list of variables """ variables = set() tokens = tokenize.generate_tokens(StringIO(expr).readline) logical_operators = {'and', 'or', '&', '|'} inside_backticks = False current_var = [] for token in tokens: if token.string == '`': if inside_backticks: # End of backtick-enclosed variable variables.add(' '.join(current_var)) current_var = [] inside_backticks = not inside_backticks elif inside_backticks: if token.type in {tokenize.NAME, STRING}: current_var.append(token.string) elif token.type == tokenize.NAME and token.string not in logical_operators: variables.add(token.string) return list(variables)
[docs] def create_test_blockmodel(shape: tuple[int, int, int], block_size: tuple[float, float, float], corner: tuple[float, float, float], is_tensor=False) -> pd.DataFrame: """ Create a test blockmodel DataFrame. Args: shape: Shape of the block model (x, y, z). block_size: Size of each block (x, y, z). corner: The lower left (minimum) corner of the block model. is_tensor: If True, create a tensor block model. Default is False, which creates a regular block model. The MultiIndex levels for a regular model are x, y, z. For a tensor model they are x, y, z, dx, dy, dz. The tensor model created is a special case where dx, dy, dz are the same for all blocks. Returns: pd.DataFrame: DataFrame containing the block model data. """ num_blocks = np.prod(shape) # Generate the coordinates for the block model x_coords = np.arange(corner[0] + block_size[0] / 2, corner[0] + shape[0] * block_size[0], block_size[0]) y_coords = np.arange(corner[1] + block_size[1] / 2, corner[1] + shape[1] * block_size[1], block_size[1]) z_coords = np.arange(corner[2] + block_size[2] / 2, corner[2] + shape[2] * block_size[2], block_size[2]) # Create a meshgrid of coordinates xx, yy, zz = np.meshgrid(x_coords, y_coords, z_coords, indexing='ij') # Flatten the coordinates xx_flat_c = xx.ravel(order='C') yy_flat_c = yy.ravel(order='C') zz_flat_c = zz.ravel(order='C') # Create the attributes c_order_xyz = np.arange(num_blocks) # assume the surface of the highest block is the topo surface surface_rl = corner[2] + shape[2] * block_size[2] # Create the DataFrame df = pd.DataFrame({ 'x': xx_flat_c, 'y': yy_flat_c, 'z': zz_flat_c, 'c_style_xyz': c_order_xyz}) # Set the index to x, y, z df.set_index(keys=['x', 'y', 'z'], inplace=True) df.sort_index(level=['x', 'y', 'z'], inplace=True) df.sort_index(level=['z', 'y', 'x'], inplace=True) df['f_style_zyx'] = c_order_xyz df.sort_index(level=['x', 'y', 'z'], inplace=True) df['depth'] = surface_rl - zz_flat_c # Check the ordering - confirm that the c_order_xyz and f_order_zyx columns are in the correct order assert np.array_equal(df.sort_index(level=['x', 'y', 'z'])['c_style_xyz'].values, np.arange(num_blocks)) assert np.array_equal(df.sort_index(level=['z', 'y', 'x'])['f_style_zyx'].values, np.arange(num_blocks)) # Check the depth using a pandas groupby depth_group = df.groupby('z')['depth'].unique().apply(lambda x: x[0]).sort_index(ascending=False) assert np.all(surface_rl - depth_group.diff().index == depth_group.values) if is_tensor: # Create the dx, dy, dz levels df['dx'] = block_size[0] df['dy'] = block_size[1] df['dz'] = block_size[2] # Set the index to x, y, z, dx, dy, dz df.set_index(keys=['dx', 'dy', 'dz'], append=True, inplace=True) return df
[docs] def aggregate(df: pd.DataFrame, agg_dict: dict, cat_treatment: Literal['majority', 'proportions'] = 'majority', proportions_as_columns: bool = False) -> pd.DataFrame: """ Aggregate a DataFrame using a provided dictionary. Args: df: The DataFrame to aggregate. agg_dict: A dictionary where keys are the columns to be aggregated and values are the weight columns. cat_treatment: A string indicating how to treat categorical columns. 'majority' returns the majority category, 'proportions' returns the proportions of each category. proportions_as_columns: A boolean indicating whether to return category proportions as separate columns. Returns: pd.DataFrame: The aggregated DataFrame with columns in the same order as the incoming DataFrame. """ result = {} weight_columns = set(agg_dict.values()) for weight_col in weight_columns: # Get columns that share the same weight column cols_with_weight = [col for col, w_col in agg_dict.items() if w_col == weight_col] if cols_with_weight: weights = df[weight_col].values weighted_values = df[cols_with_weight].values * weights[:, np.newaxis] aggregated_values = np.sum(weighted_values, axis=0) / np.sum(weights) result.update({col: aggregated_values[i] for i, col in enumerate(cols_with_weight)}) # Sum columns that are not in the agg_dict for col in df.columns: if col not in agg_dict: if isinstance(df[col].dtype, CategoricalDtype): if cat_treatment == 'majority': result[col] = df[col].mode()[0] # Get the majority category elif cat_treatment == 'proportions': proportions = df[col].value_counts(normalize=True).to_dict() if proportions_as_columns: for cat, prop in proportions.items(): result[f"{col}_{cat}"] = prop else: result[col] = proportions else: result[col] = df[col].sum() # Create a DataFrame from the result dictionary aggregated_df = pd.DataFrame([result]) # Manage the final column order if proportions_as_columns: # loop through the columns and add them, extending with cat classes final_columns = [] for col in df.columns: if col in result: final_columns.append(col) elif isinstance(df[col].dtype, CategoricalDtype) and cat_treatment == 'proportions': for cat in df[col].cat.categories: final_columns.append(f"{col}_{cat}") aggregated_df = aggregated_df[final_columns] else: # Ensure the columns are in the same order as the incoming DataFrame aggregated_df = aggregated_df[df.columns] return aggregated_df