Source code for elphick.geomet.base

import copy
import inspect
import logging
import re
from abc import ABC
from pathlib import Path
from typing import Optional, Union, Literal, TypeVar, TYPE_CHECKING

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

from elphick.geomet.config import read_yaml
from elphick.geomet.utils.components import get_components, is_compositional
from elphick.geomet.utils.moisture import solve_mass_moisture
from elphick.geomet.utils.pandas import mass_to_composition, composition_to_mass, composition_factors
from elphick.geomet.utils.sampling import random_int
from elphick.geomet.utils.timer import log_timer
from .config.config_read import get_column_config
from .plot import parallel_plot, comparison_plot

if TYPE_CHECKING:
    from elphick.geomet.flowsheet.stream import Stream

# generic type variable, used for type hinting, to indicate that the type is a subclass of MassComposition
MC = TypeVar('MC', bound='MassComposition')


[docs] def filter_kwargs(cls, **kwargs): valid_params = inspect.signature(cls.__init__).parameters res = {k: v for k, v in kwargs.items() if k in valid_params} return res
[docs] class MassComposition(ABC):
[docs] def __init__(self, data: Optional[pd.DataFrame] = None, name: Optional[str] = None, moisture_in_scope: bool = True, mass_wet_var: Optional[str] = None, mass_dry_var: Optional[str] = None, moisture_var: Optional[str] = None, component_vars: Optional[list[str]] = None, composition_units: Literal['%', 'ppm', 'ppb'] = '%', components_as_symbols: bool = True, ranges: Optional[dict[str, list]] = None, config_file: Optional[Path] = None): """ Args: data: The input data name: The name of the sample moisture_in_scope: Whether the moisture is in scope. If False, only dry mass is processed. mass_wet_var: The name of the wet mass column mass_dry_var: The name of the dry mass column moisture_var: The name of the moisture column component_vars: The names of the chemical columns components_as_symbols: If True, convert the composition variables to symbols, e.g. Fe ranges: The range of valid data for each column in the data config_file: The configuration file """ self._logger = logging.getLogger(name=self.__class__.__name__) if config_file is None: config_file = Path(__file__).parent / './config/mc_config.yml' self.config = read_yaml(config_file) self.name: str = name self.moisture_in_scope: bool = moisture_in_scope self.mass_wet_var: Optional[str] = mass_wet_var self.mass_dry_var: str = mass_dry_var self.moisture_var: Optional[str] = moisture_var self.component_vars: Optional[list[str]] = component_vars # TODO: check if this is redundant and remove. self.composition_units: Literal['%', 'ppm', 'ppb'] = composition_units self.composition_factor: int = composition_factors[composition_units] self.components_as_symbols: bool = components_as_symbols self._mass_data: Optional[pd.DataFrame] = None self._supplementary_data = None self._aggregate = None # set the data self.data = data # add the OOR status object self.status = OutOfRangeStatus(self, ranges)
@property @log_timer def data(self) -> Optional[pd.DataFrame]: if self._mass_data is not None: # convert chem mass to composition mass_comp_data = mass_to_composition(self._mass_data, mass_wet=self.mass_wet_var, mass_dry=self.mass_dry_var, moisture_column_name='H2O' if self.components_as_symbols else ( self.moisture_var if self.moisture_var is not None else 'h2o'), component_columns=self.composition_columns, composition_units=self.composition_units) # append the supplementary vars return pd.concat([mass_comp_data, self._supplementary_data], axis=1) return None @data.setter @log_timer def data(self, value): if value is not None: # Convert column names to symbols if components_as_symbols is True if self.components_as_symbols: symbol_dict = is_compositional(value.columns, strict=False) value.columns = [symbol_dict.get(col, col) for col in value.columns] # the config provides regex search keys to detect mass and moisture columns if they are not specified. mass_totals = self._solve_mass(value) composition, supplementary_data = self._get_non_mass_data(value) self._supplementary_data = supplementary_data self.mass_data = composition_to_mass(pd.concat([mass_totals, composition], axis=1), mass_wet=self.mass_wet_var, mass_dry=self.mass_dry_var, moisture_column_name=self.moisture_column, component_columns=composition.columns, composition_units=self.composition_units) self._logger.debug(f"Data has been set.") else: self._mass_data = None @property def mass_data(self): return self._mass_data @mass_data.setter def mass_data(self, value): self._mass_data = value # Recalculate the aggregate whenever the data changes self.aggregate = self.weight_average()
[docs] def get_mass_data(self, include_moisture: bool = True) -> pd.DataFrame: """Get the mass data Args: include_moisture: If True (and moisture is in scope), include the moisture mass column Returns: """ if include_moisture and self.moisture_in_scope: moisture_mass = self._mass_data[self.mass_wet_var] - self._mass_data[self.mass_dry_var] mass_data: pd.DataFrame = self._mass_data.copy() mass_data.insert(loc=2, column=self.moisture_column, value=moisture_mass) return mass_data return self._mass_data
@property def aggregate(self) -> pd.DataFrame: if self._aggregate is None and self._mass_data is not None: self._aggregate = self.weight_average() return self._aggregate @aggregate.setter def aggregate(self, value): self._aggregate = value @property def variable_map(self) -> Optional[dict[str, str]]: """A map from lower case standard names to the actual column names""" if self._mass_data is not None: existing_columns = list(self._mass_data.columns) res = {} if self.moisture_in_scope and self.mass_wet_var in existing_columns: res['mass_wet'] = self.mass_wet_var if self.mass_dry_var in existing_columns: res['mass_dry'] = self.mass_dry_var if self.moisture_in_scope: res['moisture'] = self.moisture_var if self.components_as_symbols: res['moisture'] = is_compositional([self.moisture_var], strict=False).get(self.moisture_var, self.moisture_var) if self.composition_columns: for col in self.composition_columns: res[col.lower()] = col return res return None @property def mass_columns(self) -> Optional[list[str]]: if self._mass_data is not None: existing_columns = list(self._mass_data.columns) res = [] if self.moisture_in_scope and self.mass_wet_var in existing_columns: res.append(self.mass_wet_var) if self.mass_dry_var in existing_columns: res.append(self.mass_dry_var) return res return None @property def moisture_column(self) -> Optional[str]: res = 'h2o' if self.moisture_in_scope: res = self.moisture_var if self.components_as_symbols: res = is_compositional([res], strict=False).get(res, res) return res @property def composition_columns(self) -> Optional[list[str]]: res = None if self._mass_data is not None: if self.moisture_in_scope: res = list(self._mass_data.columns)[2:] else: res = list(self._mass_data.columns)[1:] return res @property def supplementary_columns(self) -> Optional[list[str]]: res = None if self._supplementary_data is not None: res = list(self._supplementary_data.columns) return res @property def data_columns(self) -> list[str]: return [col for col in (self.mass_columns + [self.moisture_column] + self.composition_columns + self.supplementary_columns) if col is not None]
[docs] def balance_composition(self) -> MC: """Balance the composition data For records where the component mass exceeds the dry mass, the component masses are reduced proportionally to equal the dry mass. Records where the component mass is less than the dry mass are left unchanged. """ if self._mass_data is not None: # calculate the ratio of the sum of the components to the dry mass ratio = self._mass_data[self.composition_columns].sum(axis=1) / self._mass_data[self.mass_dry_var] if ratio.max() <= 1.0: return self epsilon = 1e-6 # add a small value to the ratio to avoid component sums marginally over 100.0 ratio[ratio > 1.0] = ratio[ratio > 1.0] + epsilon # to avoid reducing compliant records, clip the ratio at the lower side to 1.0 ratio = ratio.clip(lower=1.0) # apply the ratio to the components self._mass_data[self.composition_columns] = self._mass_data[self.composition_columns].div(ratio, axis=0) return self
[docs] def clip_recovery(self, other: MC, recovery_bounds: tuple[float, float] = (0.01, 0.99), allow_moisture_coercion: bool = True) -> MC: """Clip the recovery to the specified bounds and recalculate the estimate. Args: other: The other MassComposition object, from which the recovery of self is calculated. recovery_bounds: The bounds for the recovery between 0.0 and 1.0 allow_moisture_coercion: if True, allow the wet mass to be modified to maintain the moisture (in the case that dry mass is clipped to manage recovery) Returns: The MassComposition object with the recovery clipped to the bounds. """ recovery: pd.DataFrame = (self.get_mass_data(include_moisture=False) / other.get_mass_data(include_moisture=False)) # Limit the recovery to the bounds before_clip = recovery.copy() recovery = recovery.clip(lower=recovery_bounds[0], upper=recovery_bounds[1]).fillna(0.0) # Check if any records were affected affected_indexes = set(recovery.index[np.any(before_clip != recovery, axis=1)]) if affected_indexes: # Recalculate the estimate from the bound recovery new_mass: pd.DataFrame = recovery * other.get_mass_data(include_moisture=False)[recovery.columns] if self.moisture_in_scope and allow_moisture_coercion: # Calculate the moisture from the new mass new_mass[self.mass_wet_var] = solve_mass_moisture(mass_dry=new_mass[self.mass_dry_var], moisture=self.data[self.moisture_column]) # Log the top 50 records affected by the recovery coercion affected_indexes_list = sorted(affected_indexes)[:50] self._logger.info(f"Recovery coercion affected {len(affected_indexes)} records. " f"Affected indexes (first 50): {affected_indexes_list}") # Update the mass data of self self.update_mass_data(new_mass) else: self._logger.info("Recovery coercion did not affect any records.") return self
[docs] def set_moisture(self, moisture: Union[pd.Series, float, int], mass_to_adjust: Literal['wet', 'dry'] = 'wet') -> MC: """Set the moisture to the specified value A convenience method for an mc object that modifies the concrete mass to deliver the specified moisture. Args: moisture: The moisture value to set. Can be a constant or series. mass_to_adjust: The mass to adjust, either 'wet' or 'dry'. Returns: """ if not self.moisture_in_scope: raise AssertionError("This method is not applicable unless moisture_in_scope property is True.") if isinstance(moisture, float) or isinstance(moisture, int): # create a series with the same index as the mass data moisture = pd.Series(float(moisture), index=self._mass_data.index) elif not isinstance(moisture, pd.Series): raise TypeError(f"moisture must be a float or a pd.Series, not {type(moisture)}") if mass_to_adjust == 'wet': self._mass_data[self.mass_wet_var] = solve_mass_moisture(mass_dry=self._mass_data[self.mass_dry_var], moisture=moisture) elif mass_to_adjust == 'dry': self._mass_data[self.mass_dry_var] = solve_mass_moisture(mass_wet=self._mass_data[self.mass_wet_var], moisture=moisture) else: raise ValueError(f"mass_to_adjust must be 'wet' or 'dry', not {mass_to_adjust}") return self
[docs] def clip_composition(self, ranges: Optional[dict[str, list[float]]] = None) -> MC: """Clip the components Clip to the components to within the range provided or the default range for each component. This method does not clip moisture - see set_moisture and solve_moisture for that. Args: ranges: An optional dict defining a list of [lo, hi] floats for each component. If not provided, the default range from the config file will be used. Returns: The object with clipped composition. """ # load the default ranges from the config file component_ranges: dict = self._get_component_ranges(ranges) # define a small value to ensure the clipped values lie marginally inside the specified range. epsilon: float = 0.0 # 1.0e-05 # clip the components affected_indexes = set() for component, component_range in component_ranges.items(): before_clip = self._mass_data[component].copy() # define the component mass that aligns with the lower and upper bounds component_mass_limits = self._mass_data[self.mass_dry_var].values[:, np.newaxis] * np.array( component_range) / self.composition_factor # apply the clip to the mass data self._mass_data[component] = self._mass_data[component].clip(lower=component_mass_limits[:, 0] + epsilon, upper=component_mass_limits[:, 1] - epsilon) affected_indexes.update(self._mass_data.index[before_clip != self._mass_data[component]]) # log the action, including the first 50 indexes affected affected_indexes_list = sorted(affected_indexes)[:50] self._logger.info( f"{len(affected_indexes)} records where composition has been clipped to the range: {component_ranges}." f" Affected indexes (first 50): {affected_indexes_list}") return self
[docs] def plot_parallel(self, color: Optional[str] = None, vars_include: Optional[list[str]] = None, vars_exclude: Optional[list[str]] = None, title: Optional[str] = None, include_dims: Optional[Union[bool, list[str]]] = True, plot_interval_edges: bool = False) -> go.Figure: """Create an interactive parallel plot Useful to explore multidimensional data like mass-composition data Args: color: Optional color variable vars_include: Optional list of variables to include in the plot vars_exclude: Optional list of variables to exclude in the plot title: Optional plot title include_dims: Optional boolean or list of dimension to include in the plot. True will show all dims. plot_interval_edges: If True, interval edges will be plotted instead of interval mid Returns: """ if not title and hasattr(self, 'name'): title = self.name fig = parallel_plot(data=self.data, color=color, vars_include=vars_include, vars_exclude=vars_exclude, title=title, include_dims=include_dims, plot_interval_edges=plot_interval_edges) return fig
[docs] def plot_comparison(self, other: MC, color: Optional[str] = None, vars_include: Optional[list[str]] = None, vars_exclude: Optional[list[str]] = None, facet_col_wrap: int = 3, trendline: bool = False, trendline_kwargs: Optional[dict] = None, title: Optional[str] = None) -> go.Figure: """Create an interactive parallel plot Useful to compare the difference in component values between two objects. Args: other: the object to compare with self. color: Optional color variable vars_include: Optional List of variables to include in the plot vars_exclude: Optional List of variables to exclude in the plot trendline: If True and trendlines trendline_kwargs: Allows customising the trendline: ref: https://plotly.com/python/linear-fits/ title: Optional plot title facet_col_wrap: The number of subplot columns per row. Returns: """ df_self: pd.DataFrame = self.data.to_dataframe() df_other: pd.DataFrame = other.data.to_dataframe() if vars_include is not None: missing_vars = set(vars_include).difference(set(df_self.columns)) if len(missing_vars) > 0: raise KeyError(f'var_subset provided contains variable not found in the data: {missing_vars}') df_self = df_self[vars_include] if vars_exclude: df_self = df_self[[col for col in df_self.columns if col not in vars_exclude]] df_other = df_other[df_self.columns] # Supplementary variables are the same for each stream and so will be unstacked. supp_cols: list[str] = self.supplementary_columns if supp_cols: df_self.set_index(supp_cols, append=True, inplace=True) df_other.set_index(supp_cols, append=True, inplace=True) index_names = list(df_self.index.names) cols = list(df_self.columns).copy() df_self = df_self[cols].assign(name=self.name).reset_index().melt(id_vars=index_names + ['name']) df_other = df_other[cols].assign(name=other.name).reset_index().melt(id_vars=index_names + ['name']) df_plot: pd.DataFrame = pd.concat([df_self, df_other]) df_plot = df_plot.set_index(index_names + ['name', 'variable'], drop=True).unstack(['name']) df_plot.columns = df_plot.columns.droplevel(0) df_plot.reset_index(level=list(np.arange(-1, -len(index_names) - 1, -1)), inplace=True) # set variables back to standard order variable_order: dict = {col: i for i, col in enumerate(cols)} df_plot = df_plot.sort_values(by=['variable'], key=lambda x: x.map(variable_order)) fig: go.Figure = comparison_plot(data=df_plot, x=self.name, y=other.name, facet_col_wrap=facet_col_wrap, color=color, trendline=trendline, trendline_kwargs=trendline_kwargs) fig.update_layout(title=title) return fig
[docs] def plot_ternary(self, variables: list[str], color: Optional[str] = None, title: Optional[str] = None) -> go.Figure: """Plot a ternary diagram variables: List of 3 components to plot color: Optional color variable title: Optional plot title """ df = self.data vars_missing: list[str] = [v for v in variables if v not in df.columns] if vars_missing: raise KeyError(f'Variable/s not found in the dataset: {vars_missing}') cols: list[str] = variables if color is not None: cols.append(color) if color: fig = px.scatter_ternary(df[cols], a=variables[0], b=variables[1], c=variables[2], color=color) else: fig = px.scatter_ternary(df[cols], a=variables[0], b=variables[1], c=variables[2]) if not title and hasattr(self, 'name'): title = self.name fig.update_layout(title=title) return fig
def weight_average(self, group_by: Optional[str] = None) -> pd.DataFrame: if group_by is None: composition: pd.DataFrame = pd.DataFrame( self._mass_data[self.composition_columns].sum(axis=0) / self._mass_data[ self.mass_dry_var].sum() * self.composition_factor).T mass_sum = pd.DataFrame(self._mass_data[self.mass_columns].sum(axis=0)).T # Recalculate the moisture if self.moisture_in_scope: mass_sum[self.moisture_column] = solve_mass_moisture(mass_wet=mass_sum[self.mass_columns[0]], mass_dry=mass_sum[self.mass_columns[1]]) # Create a DataFrame from the weighted averages weighted_averages_df = pd.concat([mass_sum, composition], axis=1) else: group_var: pd.Series = self._supplementary_data[group_by] weighted_averages_df = self._mass_data.groupby(group_var).apply( lambda x: pd.DataFrame( x[self.composition_columns].sum(axis=0) / x[self.mass_dry_var].sum() * self.composition_factor).T) weighted_averages_df.index = weighted_averages_df.index.droplevel(-1) mass_sum = self._mass_data[self.mass_columns].groupby(group_var).sum() weighted_averages_df = pd.concat([mass_sum, weighted_averages_df], axis=1) if self.moisture_in_scope: weighted_averages_df.insert(loc=2, column=self.moisture_column, value=solve_mass_moisture( mass_wet=mass_sum[self.mass_columns[0]], mass_dry=mass_sum[self.mass_columns[1]])) return weighted_averages_df def _solve_mass(self, value) -> pd.DataFrame: """Solve mass_wet and mass_dry from the provided columns. Args: value: The input data with the column-names provided by the user\ Returns: The mass data, with the columns mass_wet and mass_dry. Only mass_dry if moisture_in_scope is False. """ # Auto-detect columns if they are not provided mass_dry, mass_wet, moisture = self._extract_mass_moisture_columns(value) if mass_dry is None: if mass_wet is not None and moisture is not None: value[self.mass_dry_var] = solve_mass_moisture(mass_wet=mass_wet, moisture=moisture) else: msg = (f"mass_dry_var is not provided and cannot be calculated from mass_wet_var and moisture_var " f"for {self.name}") self._logger.error(msg) raise ValueError(msg) if self.moisture_in_scope: if mass_wet is None: if mass_dry is not None and moisture is not None: value[self.mass_wet_var] = solve_mass_moisture(mass_dry=mass_dry, moisture=moisture) else: msg = ( f"mass_wet_var is not provided and cannot be calculated from mass_dry_var and moisture_var. " f"Consider specifying the mass_wet_var, mass_dry_var and moisture_var, or alternatively set " f"moisture_in_scope to False for {self.name}") self._logger.error(msg) raise ValueError(msg) if moisture is None: if mass_wet is not None and mass_dry is not None: value[self.moisture_var] = solve_mass_moisture(mass_wet=mass_wet, mass_dry=mass_dry) else: msg = f"moisture_var is not provided and cannot be calculated from mass_wet_var and mass_dry_var." self._logger.error(msg) raise ValueError(msg) mass_totals: pd.DataFrame = value[[self.mass_wet_var, self.mass_dry_var]] else: mass_totals: pd.DataFrame = value[[self.mass_dry_var]] return mass_totals # Helper method to extract column def _extract_column(self, value, var_type): var = getattr(self, f"{var_type}_var") if var is None: var = next((col for col in value.columns if re.search(self.config['vars'][var_type]['search_regex'], col, re.IGNORECASE)), self.config['vars'][var_type]['default_name']) return var def _extract_mass_moisture_columns(self, value): if self.mass_wet_var is None: self.mass_wet_var = self._extract_column(value, 'mass_wet') if self.mass_dry_var is None: self.mass_dry_var = self._extract_column(value, 'mass_dry') if self.moisture_var is None: self.moisture_var = self._extract_column(value, 'moisture') mass_wet = value.get(self.mass_wet_var) mass_dry = value.get(self.mass_dry_var) moisture = value.get(self.moisture_var) return mass_dry, mass_wet, moisture def _get_non_mass_data(self, value: Optional[pd.DataFrame]) -> (Optional[pd.DataFrame], Optional[pd.DataFrame]): """ Get the composition data and supplementary data. Extract only the composition columns specified, otherwise detect the compositional columns """ composition = None supplementary = None if value is not None: if self.component_vars is None: non_mass_cols: list[str] = [col for col in value.columns if col not in [self.mass_wet_var, self.mass_dry_var, self.moisture_var, 'h2o', 'H2O', 'H2O']] component_cols: list[str] = get_components(value[non_mass_cols], strict=False) else: component_cols: list[str] = self.component_vars composition = value[component_cols] supplementary_cols: list[str] = [col for col in value.columns if col not in component_cols + [self.mass_wet_var, self.mass_dry_var, self.moisture_var, 'h2o', 'H2O', 'H2O']] supplementary = value[supplementary_cols] return composition, supplementary def __deepcopy__(self, memo): # Create a new instance of our class new_obj = self.__class__() memo[id(self)] = new_obj # Copy each attribute for attr, value in self.__dict__.items(): setattr(new_obj, attr, copy.deepcopy(value, memo)) return new_obj def update_mass_data(self, value: pd.DataFrame): if self._mass_data is not None: self._mass_data = value if self._supplementary_data is not None: if self._supplementary_data.index.names != self._mass_data.index.names: # if indexes have been dropped self._supplementary_data.index = self._mass_data.index self._supplementary_data = self._supplementary_data.loc[value.index] self.aggregate = self.weight_average()
[docs] def filter_by_index(self, index: pd.Index): """Update the data by index""" if self._mass_data is not None: self._mass_data = self._mass_data.loc[index] if self._supplementary_data is not None: self._supplementary_data = self._supplementary_data.loc[index] self.aggregate = self.weight_average()
[docs] def split(self, fraction: float, name_1: Optional[str] = None, name_2: Optional[str] = None, include_supplementary_data: bool = False) -> tuple['Stream', 'Stream']: """Split the object by mass A simple mass split maintaining the same composition Args: fraction: A constant in the range [0.0, 1.0] name_1: The name of the reference object created by the split name_2: The name of the complement object created by the split include_supplementary_data: Whether to inherit the supplementary variables Returns: tuple of two objects, the first with the mass fraction specified, the other the complement """ # create_congruent_objects to preserve properties like constraints name_1 = name_1 if name_1 is not None else f"{self.name}_1" name_2 = name_2 if name_2 is not None else f"{self.name}_2" ref: MassComposition = self.create_congruent_object(name=name_1, include_mc_data=True, include_supp_data=include_supplementary_data) ref.update_mass_data(self._mass_data * fraction) comp: MassComposition = self.create_congruent_object(name=name_2, include_mc_data=True, include_supp_data=include_supplementary_data) comp.update_mass_data(self._mass_data * (1 - fraction)) # Ensure self and other are Stream objects self._convert_to_stream(self) self._convert_to_stream(ref) self._convert_to_stream(comp) self: 'Stream' ref: 'Stream' comp: 'Stream' # create the relationships ref.nodes = [self.nodes[1], random_int()] comp.nodes = [self.nodes[1], random_int()] return ref, comp
[docs] def add(self, other: MC, name: Optional[str] = None, include_supplementary_data: bool = False) -> 'Stream': """Add two objects together Args: other: The other object name: The name of the new object include_supplementary_data: Whether to include the supplementary data Returns: The new object """ res: MC = self.create_congruent_object(name=name, include_mc_data=True, include_supp_data=include_supplementary_data) res.update_mass_data(self._mass_data + other._mass_data) # Ensure self and other are Stream objects self: 'Stream' = self.to_stream() other: 'Stream' = self._convert_to_stream(other) res: 'Stream' = self._convert_to_stream(res) # create the relationships other.nodes = [other.nodes[0], self.nodes[1]] res.nodes = [self.nodes[1], random_int()] return res
[docs] def sub(self, other: MC, name: Optional[str] = None, include_supplementary_data: bool = False) -> 'Stream': """Subtract other from self Args: other: The other object name: The name of the new object include_supplementary_data: Whether to include the supplementary data Returns: The new object """ res = self.create_congruent_object(name=name, include_mc_data=True, include_supp_data=include_supplementary_data) res.update_mass_data(self._mass_data - other._mass_data) # Ensure self and other are Stream objects self._convert_to_stream(self) self._convert_to_stream(other) self: 'Stream' other: 'Stream' # create the relationships res.nodes = [self.nodes[1], random_int()] return res
[docs] def div(self, other: MC, name: Optional[str] = None, include_supplementary_data: bool = False) -> MC: """Divide two objects Divides self by other, with optional name of the returned object Args: other: the denominator (or reference) object name: name of the returned object include_supplementary_data: Whether to include the supplementary data Returns: """ new_obj = self.create_congruent_object(name=name, include_mc_data=True, include_supp_data=include_supplementary_data) new_obj.update_mass_data(self._mass_data / other._mass_data) return new_obj
def __str__(self): return f"{self.__class__.__name__}: {self.name}\n{self.aggregate.to_dict()}"
[docs] def create_congruent_object(self, name: str, include_mc_data: bool = False, include_supp_data: bool = False) -> MC: """Create an object with the same attributes""" # Create a new instance of our class new_obj = self.__class__() # Copy each attribute for attr, value in self.__dict__.items(): if attr == '_mass_data' and not include_mc_data: continue if attr == '_supplementary_data' and not include_supp_data: continue setattr(new_obj, attr, copy.deepcopy(value)) new_obj.name = name return new_obj
[docs] def __add__(self, other: MC) -> 'Stream': """Add two objects Perform the addition with the mass-composition variables only and then append any attribute variables. Presently ignores any attribute vars in other Args: other: object to add to self Returns: """ return self.add(other, include_supplementary_data=True)
def __sub__(self, other: MC) -> 'Stream': """Subtract the supplied object from self Perform the subtraction with the mass-composition variables only and then append any attribute variables. Args: other: object to subtract from self Returns: """ return self.sub(other, include_supplementary_data=True) def to_stream(self) -> 'Stream': from elphick.geomet.flowsheet.stream import Stream # Local import to avoid circular dependency if not isinstance(self, Stream): self.__class__ = type(self.__class__.__name__, (self.__class__, Stream), {}) filtered_kwargs = filter_kwargs(self.__class__, **self.__dict__) filtered_kwargs['data'] = self.data Stream.__init__(self, **filtered_kwargs) # Initialize Stream properties return self @staticmethod def _convert_to_stream(obj) -> 'Stream': from elphick.geomet.flowsheet.stream import Stream # Local import to avoid circular dependency if not isinstance(obj, Stream): obj.__class__ = type(obj.__class__.__name__, (obj.__class__, Stream), {}) filtered_kwargs = filter_kwargs(obj.__class__, **obj.__dict__) filtered_kwargs['data'] = obj.data Stream.__init__(obj, **filtered_kwargs) # Initialize Stream properties return obj def __truediv__(self, other: MC) -> MC: """Divide self by the supplied object Perform the division with the mass-composition variables only and then append any attribute variables. Args: other: denominator object, self will be divided by this object Returns: """ return self.div(other, include_supplementary_data=True) def __eq__(self, other): if isinstance(other, MassComposition): return self.__dict__ == other.__dict__ return False
[docs] @classmethod def from_mass_dataframe(cls, mass_df: pd.DataFrame, mass_wet: Optional[str] = 'mass_wet', mass_dry: str = 'mass_dry', moisture_column_name: Optional[str] = None, component_columns: Optional[list[str]] = None, composition_units: Literal['%', 'ppm', 'ppb'] = '%', **kwargs) -> MC: """ Class method to create a MassComposition object from a mass dataframe. Args: mass_df: DataFrame with mass data. **kwargs: Additional arguments to pass to the MassComposition constructor. Returns: A new MassComposition object. """ # Convert mass to composition using the function from the pandas module composition_df = mass_to_composition(mass_df, mass_wet=mass_wet, mass_dry=mass_dry, moisture_column_name=moisture_column_name, component_columns=component_columns, composition_units=composition_units) # Create a new instance of the MassComposition class return cls(data=composition_df, **kwargs)
[docs] def query(self, expr: str, name: Optional[str] = None) -> MC: """Reduce the data by a query expression Args: expr: A pandas query expression name: name of the new object Returns: A new object with the reduced data """ name = name if name is not None else self.name res = self.create_congruent_object(name=f"{name} ({expr})", include_mc_data=True, include_supp_data=True) filtered_index = self.data.query(expr).index res.update_mass_data(self._mass_data.loc[filtered_index]) if res.supplementary_columns is not None: res._supplementary_data = self._supplementary_data.loc[filtered_index] return res
def reset_index(self, index_name: str) -> MC: res = self.create_congruent_object(name=f"{self.name} (reset_index)", include_mc_data=True, include_supp_data=True) res.update_mass_data(self._mass_data.reset_index(level=index_name, drop=True)) if res.supplementary_columns is not None: res._supplementary_data = self._supplementary_data.reset_index(level=index_name, drop=False) else: res._supplementary_data = pd.DataFrame(index=self._mass_data.index, columns=[index_name], data=self._mass_data.index.get_level_values(index_name)) return res def _get_component_ranges(self, ranges: dict[str, list]) -> dict[str, list]: d_ranges: dict = get_column_config(config_dict=self.config, var_map=self.variable_map, config_key='range') # filter to include only components d_ranges = {k: v for k, v in d_ranges.items() if k in self.composition_columns} # modify the default dict based on any user passed constraints if ranges: for k, v in ranges.items(): d_ranges[k] = v return d_ranges
[docs] class OutOfRangeStatus: """A class to check and report out-of-range records in an MC object."""
[docs] def __init__(self, mc: 'MC', ranges: dict[str, list]): """Initialize with an MC object.""" self._logger = logging.getLogger(__name__) self.mc: 'MC' = mc self.ranges: Optional[dict[str, list]] = None self.oor: Optional[pd.DataFrame] = None self.num_oor: Optional[int] = None self.failing_components: Optional[list[str]] = None if mc.mass_data is not None: self.ranges = self.get_ranges(ranges) self.oor: pd.DataFrame = self._check_range() self.num_oor: int = len(self.oor) self.failing_components: Optional[list[str]] = list( self.oor.dropna(axis=1).columns) if self.num_oor > 0 else None
def get_ranges(self, ranges: dict[str, list]) -> dict[str, list]: d_ranges: dict = get_column_config(config_dict=self.mc.config, var_map=self.mc.variable_map, config_key='range') # modify the default dict based on any user passed constraints if ranges: for k, v in ranges.items(): d_ranges[k] = v return d_ranges def _check_range(self) -> pd.DataFrame: """Check if all records are within the constraints.""" if self.mc._mass_data is not None: df: pd.DataFrame = self.mc.data[self.ranges.keys()] chunks = [] for variable, bounds in self.ranges.items(): chunks.append(df.loc[(df[variable] < bounds[0]) | (df[variable] > bounds[1]), variable]) oor: pd.DataFrame = pd.concat(chunks, axis='columns') else: # An empty object will have ok status oor: pd.DataFrame = pd.DataFrame(columns=list(self.ranges.keys())) return oor @property def ok(self) -> bool: """Return True if all records are within range, False otherwise.""" if self.num_oor > 0: self._logger.warning(f'{self.num_oor} out of range records exist.') return True if self.num_oor == 0 else False def __str__(self) -> str: """Return a string representation of the status.""" res: str = f'status.ok: {self.ok}\n' res += f'num_oor: {self.num_oor}' return res def __eq__(self, other: object) -> bool: """Return True if other Status has the same out-of-range records.""" if isinstance(other, OutOfRangeStatus): return self.oor.equals(other.oor) return False