Source code for elphick.mass_composition.mass_composition

import logging
import os
from collections import Counter
from copy import deepcopy
from pathlib import Path
from typing import Dict, List, Optional, Union, Tuple, Iterable, Callable, Set, Literal, Any

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import xarray as xr
from sklearn.base import BaseEstimator, RegressorMixin

from elphick.mass_composition.config import read_yaml
from elphick.mass_composition.mc_status import Status
from elphick.mass_composition.plot import parallel_plot, comparison_plot
from elphick.mass_composition.utils import solve_mass_moisture
from elphick.mass_composition.utils.amenability import amenability_index
from elphick.mass_composition.utils.interp import mass_preserving_interp
from elphick.mass_composition.utils.pd_utils import weight_average, calculate_recovery, calculate_partition
from elphick.mass_composition.utils.sampling import random_int
from elphick.mass_composition.utils.sklearn import extract_feature_names, PandasPipeline

from elphick.mass_composition.variables import Variables, VariableGroups


[docs]class MassComposition:
[docs] def __init__(self, data: Optional[pd.DataFrame] = None, name: Optional[str] = None, mass_wet_var: Optional[str] = None, mass_dry_var: Optional[str] = None, moisture_var: Optional[str] = None, chem_vars: Optional[List[str]] = None, mass_units: Optional[str] = None, composition_units: Optional[str] = None, constraints: Optional[Dict[str, List]] = None, config_file: Optional[Path] = None): """ Args: data: name: mass_wet_var: mass_dry_var: moisture_var: chem_vars: mass_units: constraints: config_file: """ self._logger = logging.getLogger(name=self.__class__.__name__) if config_file is None: config_file = Path(__file__).parent / './config/mc_config.yml' self.config = read_yaml(config_file) # _nodes can preserve relationships from math operations, and can be used to build a network. self._nodes: List[Union[str, int]] = [random_int(), random_int()] self._name: str = name self._mass_units = self.config['units']['mass'] if not mass_units else None self._composition_units = self.config['units']['composition_rel'] if not composition_units else None self._specified_columns: Dict = {'mass_wet_var': mass_wet_var, 'mass_dry_var': mass_dry_var, 'moisture_var': moisture_var, 'chem_vars': chem_vars} self._data: Optional[xr.Dataset] = None self.variables: Optional[Variables] = None self.constraints: Optional[Dict[str, List]] = None self.status: Optional[Status] = None if data is not None: data = deepcopy(data) # preserve the incoming data variable. self.set_data(data, constraints=constraints)
@staticmethod def _strip_common_prefix(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]: # Extract prefixes common_prefix = MassComposition.get_common_prefix(df.columns.to_list()) res = df # Create a copy of the dataframe and strip the most common prefix from column names if common_prefix: res = df.copy() res.columns = [col.replace(common_prefix + '_', '') if col.startswith(common_prefix) else col for col in df.columns] return res, common_prefix @staticmethod def get_common_prefix(columns: List[str]) -> str: prefixes = [col.split('_')[0] for col in columns] # Count the frequency of each prefix prefix_counter = Counter(prefixes) # Check if prefix_counter is not empty if prefix_counter: # Find the most common prefix common_prefix, freq = prefix_counter.most_common(1)[0] # Only return the prefix if its frequency is 3 or more if freq >= 3: return common_prefix return "" def set_data(self, data: Union[pd.DataFrame, xr.Dataset], constraints: Optional[Dict[str, List]] = None): if isinstance(data, xr.Dataset): # we assume it is a compliant mc-xarray self._data = data self.variables = Variables(config=self.config['vars'], supplied=[str(v) for v in data.variables if v not in data.dims], specified_map=self._specified_columns) elif isinstance(data, pd.DataFrame): if sum(data.index.duplicated()) > 0: raise KeyError('The data has duplicate indexes.') if isinstance(data.index, pd.MultiIndex) and data.index.nlevels >= 3: self._logger.warning('The data has more than 2 levels in the index, which can consume excessive ' 'memory for large datasets. Is this is what you intend? Depending on your' 'requirements you may be able to process ths dataset with a single index.') # seek a prefix to self assign the name data, common_prefix = self._strip_common_prefix(data) if common_prefix: self._specified_columns = {k: v.replace(f"{common_prefix}_", '') for k, v in self._specified_columns.items() if v is not None} self.variables = Variables(config=self.config['vars'], supplied=list(data.columns), specified_map=self._specified_columns) # if interval pairs are passed as indexes then create the proper interval index data = self._create_interval_indexes(data=data) # rename the columns using the Variables class data.rename(columns=self.variables.vars.col_to_var(), inplace=True) # solve or validate the moisture balance data = self._solve_mass_moisture(data) xr_ds = self._dataframe_to_mc_dataset(data) self._data = xr_ds if not self._name: self.rename('unnamed') if not common_prefix else self.rename(common_prefix.strip('_')) else: self.rename(self._name) # explicitly define the constraints self.constraints: Dict = self.get_constraint_bounds(constraints=constraints) self.status = Status(self._check_constraints()) def rename(self, new_name: str) -> 'MassComposition': self.name = new_name return self def get_constraint_bounds(self, constraints: Optional[Dict[str, List]]) -> Dict[str, List]: d_constraints: Dict = {} # populate from the defaults for v in self.variables.mass_moisture.get_var_names(): if 'mass' in v: d_constraints[v] = self.config['constraints']['mass'] else: d_constraints[v] = self.config['constraints']['composition'] for col in self.variables.chemistry.get_var_names(): d_constraints[col] = self.config['constraints']['composition'] # modify the default dict based on any user passed constraints if constraints: for k, v in constraints.items(): d_constraints[k] = v return d_constraints @classmethod def from_xarray(cls, ds: xr.Dataset, name: Optional[str] = 'unnamed'): obj = cls() obj._data = ds obj.name = name return obj @property def name(self) -> str: return self._data.mc.name @name.setter def name(self, value): self._data.mc.rename(value) @property def data(self) -> xr.Dataset: moisture: xr.DataArray = xr.DataArray((self._data['mass_wet'] - self._data['mass_dry']) / self._data['mass_wet'] * 100, name='H2O', attrs={'units': '%', 'standard_name': 'H2O', 'mc_type': 'moisture', 'mc_col_orig': 'H2O'} ) data: xr.Dataset = xr.merge( [self._data[self._data.attrs['mc_vars_mass']], moisture, self._data[self._data.attrs['mc_vars_chem']], self._data[self._data.attrs['mc_vars_attrs']]]) return data def update_data(self, values: Union[pd.DataFrame, xr.Dataset, xr.DataArray]): if isinstance(values, xr.Dataset) or isinstance(values, xr.DataArray): values = values.to_dataframe() for v in values.columns: self._data[v].values = values[v].values self.status = Status(self._check_constraints()) def set_parent_node(self, parent: 'MassComposition') -> 'MassComposition': self._nodes = [parent._nodes[1], self._nodes[1]] return self def set_child_node(self, child: 'MassComposition') -> 'MassComposition': self._nodes = [self._nodes[0], child._nodes[0]] return self def set_stream_nodes(self, nodes: Tuple[int, int]) -> 'MassComposition': self._nodes = nodes return self
[docs] def to_xarray(self) -> xr.Dataset: """Returns the mc compliant xr.Dataset Returns: """ return self._data
[docs] def aggregate(self, group_var: Optional[str] = None, group_bins: Optional[Union[int, Iterable]] = None, as_dataframe: bool = True, original_column_names: bool = False) -> Union[pd.DataFrame, xr.Dataset]: """Calculate the weight average. Args: group_var: Optional grouping variable group_bins: Optional bins to apply to the group_var as_dataframe: If True return a pd.DataFrame original_column_names: If True, and as_dataframe is True, will return with the original column names. Returns: """ res: xr.Dataset = self._data.mc.aggregate(group_var=group_var, group_bins=group_bins, as_dataframe=as_dataframe, original_column_names=original_column_names) return res
def query(self, queries) -> 'MassComposition': res: MassComposition = deepcopy(self) res._data = res._data.query(queries=queries) return res
[docs] def constrain(self, clip_mass: Optional[Union[Tuple, Dict]] = None, clip_composition: Optional[Union[Tuple, Dict]] = None, relative_mass: Optional[Union[Tuple, Dict]] = None, relative_composition: Optional[Union[Tuple, Dict]] = None, other: Optional['MassComposition'] = None) -> 'MassComposition': """Constrain the mass-composition It is possible that a MassComposition object is created from a source that has improbable results. In this case this method can help improve the integrity of the mass-composition. Args: clip_mass: Limit the minimum and maximum values of the mass between a minimum and maximum absolute value. clip_composition: Limit the minimum and maximum values of the composition between a minimum and maximum absolute value. relative_mass: Constrain the mass recovery of the object to the other object relative_composition: Constrain the component recovery of the object to the other object other: The other object used for recovery calculation. Must be provided if relative_mass or relative_composition are provided. Returns: Returns the new object constrained per the provided arguments. """ xr_ds: xr.Dataset = self.data.copy() if clip_mass: if isinstance(clip_mass, Dict): for k, v in clip_mass.items(): xr_ds = self._clip(xr_ds=xr_ds, variables=[k], limits=v) else: xr_ds = self._clip(xr_ds=xr_ds, variables=xr_ds.mc.mc_vars_mass, limits=clip_mass) if clip_composition: if isinstance(clip_composition, Dict): for k, v in clip_composition.items(): xr_ds = self._clip(xr_ds=xr_ds, variables=[k], limits=v) else: xr_ds = self._clip(xr_ds=xr_ds, variables=xr_ds.mc.mc_vars_chem, limits=clip_composition) if relative_mass or relative_composition: if not object: raise ValueError("The other other argument must be provided to apply relative constraints.") if relative_mass: xr_relative: xr.Dataset = self.data[xr_ds.mc.mc_vars_mass] / other.data[xr_ds.mc.mc_vars_mass] if isinstance(relative_mass, Dict): for k, v in relative_mass.items(): xr_relative = self._clip(xr_ds=xr_relative, variables=[k], limits=v) else: xr_relative = self._clip(xr_ds=xr_relative, variables=xr_ds.mc.mc_vars_mass, limits=relative_mass) # convert back to relative composition (mass/grades) xr_ds = other.data[xr_ds.mc.mc_vars_mass] * xr_relative xr_ds = xr.merge([xr_ds, self.data[self.data.mc.mc_vars_chem], self.data[self.data.mc.mc_vars_attrs]]) xr_ds = self._copy_all_attrs(xr_ds, self.data) if relative_composition: xr_relative: xr.Dataset = self.compare(other=other, comparisons='recovery', explicit_names=False, as_dataframe=False) if isinstance(relative_composition, Dict): for k, v in relative_composition.items(): xr_relative = self._clip(xr_ds=xr_relative, variables=[k], limits=v) else: xr_relative = self._clip(xr_ds=xr_relative, variables=self.data.mc.mc_vars_chem, limits=relative_composition) # convert back to relative composition (mass/grades) xr_ds = other.data.mc.mul(xr_relative) xr_ds = xr.merge([xr_ds, self.data[self.data.mc.mc_vars_attrs]]) xr_ds = self._copy_all_attrs(xr_ds, self.data) res: MassComposition = MassComposition().from_xarray(xr_ds, name=self.name) return res
def compare(self, other: 'MassComposition', comparisons: Union[str, List[str]] = 'recovery', explicit_names: bool = True, as_dataframe: bool = True) -> Union[pd.DataFrame, xr.Dataset]: comparisons = [comparisons] if isinstance(comparisons, str) else comparisons valid_comparisons: Set = {'recovery', 'difference', 'divide', 'all'} def set_explicit_names(xrds, comparison) -> xr.Dataset: xrds = xrds.rename_vars( {col: f"{self.name}_{col}_{self.config['comparisons'][comparison]}_{other.name}" for col in xrds.data_vars}) return xrds cols = [col for col in self.data.data_vars if col not in self.data.mc.mc_vars_attrs] chunks: List[xr.Dataset] = [] if 'recovery' in comparisons or comparisons == ['all']: ds: xr.Dataset = self.data.mc.composition_to_mass()[cols] / other.data.mc.composition_to_mass()[cols] ds = set_explicit_names(ds, comparison='recovery') if explicit_names else ds chunks.append(ds) if 'difference' in comparisons or comparisons == ['all']: ds: xr.Dataset = self.data[cols] - other.data[cols] ds = set_explicit_names(ds, comparison='difference') if explicit_names else ds chunks.append(ds) if 'divide' in comparisons or comparisons == ['all']: ds: xr.Dataset = self.data[cols] / other.data[cols] ds = set_explicit_names(ds, comparison='divide') if explicit_names else ds chunks.append(ds) if not chunks: raise ValueError(f"The comparison argument is not valid: {valid_comparisons}") res: xr.Dataset = xr.merge(chunks) res: pd.DataFrame = res.to_dataframe() if as_dataframe else res return res
[docs] def binned_mass_composition(self, cutoff_var: str, bin_width: float, cumulative: bool = True, direction: str = 'descending', as_dataframe: bool = True, ) -> Union[xr.Dataset, pd.DataFrame]: """A.K.A "The Grade-Tonnage" curve. Mass and grade by bins for a cut-off variable. Args: cutoff_var: The variable that defines the bins bin_width: The width of the bin cumulative: If True, the results are cumulative weight averaged. direction: 'ascending'|'descending', if cumulative is True, the direction of accumulation as_dataframe: If True return a pd.DataFrame Returns: """ if cutoff_var not in list(self._data.variables): raise KeyError(f'{cutoff_var} is not found in the data') bins = np.arange(np.floor(min(self._data[cutoff_var].values)), np.ceil(max(self._data[cutoff_var].values)) + bin_width, bin_width) res: xr.Dataset = self.aggregate(group_var=cutoff_var, group_bins=bins, as_dataframe=False) if cumulative: res = res.mc.data().mc.cumulate(direction=direction) if as_dataframe: res = res.mc.data().to_dataframe() else: res = res.mc.data() return res
[docs] def ideal_incremental_separation(self, discard_from: Literal["lowest", "highest"] = "lowest") -> pd.DataFrame: """Incrementally separate a fractionated sample. This method sorts by the provided direction prior to incrementally removing and discarding the first fraction (of the remaining fractions) and recalculating the mass-composition and recovery of the portion remaining. This is equivalent to incrementally applying a perfect separation (partition) at every interval edge. This method is only applicable to a 1D object where the single dimension is a pd.Interval type. See also: ideal_incremental_composition, ideal_incremental_recovery. Args: discard_from: Defines the discarded direction. discard_from = "lowest" will discard the lowest value first, then the next lowest, etc. Returns: A pandas DataFrame """ self._check_one_dim_interval() sample: pd.DataFrame = self.data.to_dataframe() is_decreasing: bool = sample.index.is_monotonic_decreasing if discard_from == "lowest": sample.sort_index(ascending=True, inplace=True) new_index: pd.Index = pd.Index(sample.index.left) else: sample.sort_index(ascending=False, inplace=True) new_index: pd.Index = pd.Index(sample.index.right) new_index.name = f"{sample.index.name}_cut-point" aggregated_chunks: List = [] recovery_chunks: List = [] head: pd.DataFrame = sample.pipe(weight_average) for i, indx in enumerate(sample.index): tmp_composition: pd.DataFrame = sample.iloc[i:, :].pipe(weight_average) aggregated_chunks.append(tmp_composition) recovery_chunks.append(tmp_composition.pipe(calculate_recovery, df_ref=head)) res_composition: pd.DataFrame = pd.concat(aggregated_chunks).assign(attribute="composition").set_index( new_index) res_recovery: pd.DataFrame = pd.concat(recovery_chunks).assign(attribute="recovery").set_index( new_index) if is_decreasing: res_composition.sort_index(ascending=False, inplace=True) res_recovery.sort_index(ascending=False, inplace=True) res: pd.DataFrame = pd.concat([res_composition, res_recovery]).reset_index().set_index( [new_index.name, 'attribute']) return res
[docs] def ideal_incremental_composition(self, discard_from: Literal["lowest", "highest"] = "lowest") -> pd.DataFrame: """Incrementally separate a fractionated sample. This method sorts by the provided direction prior to incrementally removing and discarding the first fraction (of the remaining fractions) and recalculating the mass-composition of the portion remaining. This is equivalent to incrementally applying a perfect separation (partition) at every interval edge. This method is only applicable to a 1D object where the single dimension is a pd.Interval type. See also: ideal_incremental_separation, ideal_incremental_recovery. Args: discard_from: Defines the discarded direction. discard_from = "lowest" will discard the lowest value first, then the next lowest, etc. Returns: A pandas DataFrame """ df: pd.DataFrame = self.ideal_incremental_separation(discard_from=discard_from).query( 'attribute=="composition"').droplevel('attribute') return df
[docs] def ideal_incremental_recovery(self, discard_from: Literal["lowest", "highest"] = "lowest", apply_closure: bool = True) -> pd.DataFrame: """Incrementally separate a fractionated sample. This method sorts by the provided direction prior to incrementally removing and discarding the first fraction (of the remaining fractions) and recalculating the recovery of the portion remaining. This is equivalent to incrementally applying a perfect separation (partition) at every interval edge. This method is only applicable to a 1D object where the single dimension is a pd.Interval type. See also: ideal_incremental_separation, ideal_incremental_composition. Args: discard_from: Defines the discarded direction. discard_from = "lowest" will discard the lowest value first, then the next lowest, etc. apply_closure: If True, Add the missing record (zero recovery) that closes the recovery envelope. Returns: A pandas DataFrame """ df: pd.DataFrame = self.ideal_incremental_separation(discard_from=discard_from).query( 'attribute=="recovery"').droplevel('attribute').rename(columns={'mass_dry': 'mass'}).drop( columns=["mass_wet", 'H2O']) if apply_closure: # add zero recovery record to close the envelope. indx = np.inf if df.index.min() == 0.0 else 0.0 indx_name: str = df.index.name df = pd.concat([df, pd.Series(0, index=df.columns, name=indx).to_frame().T]).sort_index(ascending=True) df.index.name = indx_name return df
[docs] def split(self, fraction: float, name_1: Optional[str] = None, name_2: Optional[str] = None) -> Tuple['MassComposition', 'MassComposition']: """Split the object by mass A simple mass split maintaining the same composition See also: split_by_partition, split_by_function, split_by_estimator Args: fraction: A constant in the range [0.0, 1.0] name_1: The name of the reference stream created by the split name_2: The name of the complement stream created by the split Returns: tuple of two datasets, the first with the mass fraction specified, the other the complement """ xr_ds_1, xr_ds_2 = self._data.mc.split(fraction=fraction) out: MassComposition = MassComposition(name=xr_ds_1.mc.name, constraints=self.constraints) out.set_data(data=xr_ds_1, constraints=self.constraints) comp: MassComposition = MassComposition(name=xr_ds_2.mc.name, constraints=self.constraints) comp.set_data(data=xr_ds_2, constraints=self.constraints) self._post_process_split(out, comp, name_1, name_2) return out, comp
[docs] def split_by_partition(self, partition_definition: Callable, name_1: Optional[str] = None, name_2: Optional[str] = None) -> Tuple['MassComposition', 'MassComposition']: """Partition the object along a given dimension. This method applies the defined separation resulting in two new objects. See also: split, split_by_function, split_by_estimator Args: partition_definition: A partition function that defines the efficiency of separation along a dimension name_1: The name of the reference stream created by the split name_2: The name of the complement stream created by the split Returns: tuple of two datasets, the first with the mass fraction specified, the other the complement """ out = deepcopy(self) comp = deepcopy(self) xr_ds_1, xr_ds_2 = self._data.mc.split_by_partition(partition_definition=partition_definition) out._data = xr_ds_1 comp._data = xr_ds_2 self._post_process_split(out, comp, name_1, name_2) return out, comp
[docs] def split_by_function(self, split_function: Callable, name_1: Optional[str] = None, name_2: Optional[str] = None) -> Tuple['MassComposition', 'MassComposition']: """Split an object using a function. This method applies the function to self, resulting in two new objects. The object returned with name_1 is the result of the function. The object returned with name_2 is the complement. See also: split, split_by_estimator, split_by_partition Args: split_function: Any function that transforms the dataframe from a MassComposition object into a new dataframe with values representing a new (output) stream. The returned dataframe structure must be identical to the input dataframe. name_1: The name of the stream created by the function name_2: The name of the complement stream created by the split, which is calculated automatically. Returns: tuple of two datasets, the first with the mass fraction specified, the other the complement """ out_data: pd.DataFrame = split_function(self.data.to_dataframe()) out: MassComposition = MassComposition(name=name_1, constraints=self.constraints, data=out_data) comp: MassComposition = self.sub(other=out, name=name_2) self._post_process_split(out, comp, name_1, name_2) return out, comp
[docs] def split_by_estimator(self, estimator: PandasPipeline, name_1: Optional[str] = None, name_2: Optional[str] = None, extra_features: Optional[pd.DataFrame] = None, allow_prefix_mismatch: bool = False, mass_recovery_column: Optional[str] = None, mass_recovery_max: float = 1.0) -> Tuple['MassComposition', 'MassComposition']: """Split an object using a sklearn estimator. This method applies the function to self, resulting in two new objects. The object returned with name_1 is the result of the estimator.predict() method. The object returned with name_2 is the complement. See also: split, split_by_function, split_by_partition Args: estimator: Any sklearn estimator that transforms the dataframe from a MassComposition object into a new dataframe with values representing a new (output) stream using the predict method. The returned dataframe structure must be identical to the input dataframe. name_1: The name of the stream created by the estimator. name_2: The name of the complement stream created by the split, which is calculated automatically. extra_features: Optional additional features to pass to the estimator as features. allow_prefix_mismatch: If True, allow feature names to be different and log an info message. If False, raise an error when feature names are different. mass_recovery_column: If provided, this indicates that the model has estimated mass recovery, not mass explicitly. This will execute a transformation of the predicted `dry` mass recovery to dry mass. mass_recovery_max: The maximum mass recovery value, used to scale the mass recovery to mass. Only applicable if mass_recovery_column is provided. Should be either 1.0 or 100.0. Returns: tuple of two MassComposition objects, the first the output of the estimator, the other the complement """ # Extract feature names from the estimator, and get the actual features feature_names: list[str] = list(extract_feature_names(estimator)) features: pd.DataFrame = self._get_features(feature_names, allow_prefix_mismatch=allow_prefix_mismatch, extra_features=extra_features) # Apply the estimator estimates: pd.DataFrame = estimator.predict(X=features) if isinstance(estimates, np.ndarray): raise NotImplementedError("The estimator must return a DataFrame") # Detect a possible prefix from the estimate columns features_prefix: str = self.get_common_prefix(features.columns.to_list()) estimates_prefix: str = self.get_common_prefix(estimates.columns.to_list()) # If there is a prefix, check that it matches name_1, subject to allow_prefix_mismatch if estimates_prefix and not allow_prefix_mismatch and name_1 and not name_1 == estimates_prefix: raise ValueError(f"Common prefix mismatch: {features_prefix} and name_1: {name_1}") # assign the output names, based on specified names, allow for prefix mismatch name_1 = name_1 if name_1 else estimates_prefix if mass_recovery_column: # Transform the mass recovery to mass by applying the mass recovery to the dry mass of the input stream if mass_recovery_max not in [1.0, 100.0]: raise ValueError(f"mass_recovery_max must be either 1.0 or 100.0, not {mass_recovery_max}") if mass_recovery_column not in estimates.columns: raise KeyError(f"mass_recovery_column: {mass_recovery_column} not found in the estimates.") dry_mass_var: str = self.data.mass_dry.name estimates[mass_recovery_column] = estimates[mass_recovery_column] * self.data[ dry_mass_var].values / mass_recovery_max estimates.rename(columns={mass_recovery_column: dry_mass_var}, inplace=True) if estimates_prefix: col_name_map: dict[str, str] = {f: f.replace(estimates_prefix + '_', "") for f in estimates.columns} estimates.rename(columns=col_name_map, inplace=True) out: MassComposition = MassComposition(name=name_1, constraints=self.constraints, data=estimates) comp: MassComposition = self.sub(other=out, name=name_2) self._post_process_split(out, comp, name_1, name_2) return out, comp
def _get_features(self, feature_names: List[str], allow_prefix_mismatch: bool, extra_features: Optional[pd.DataFrame] = None, ) -> pd.DataFrame: """ This method checks if the feature names required by an estimator are present in the data. If not, it tries to match the feature names by considering a common prefix. If a match is found, the columns in the data are renamed accordingly. If a match is not found and `allow_prefix_mismatch` is False, an error is raised. If `allow_prefix_mismatch` is True, the method proceeds with the mismatched feature names. If `extra_features` is provided, these features are added to the data. Args: feature_names (List[str]): A list of feature names required by the estimator. allow_prefix_mismatch (bool): If True, allows the feature names in the data and the estimator to be different. extra_features (Optional[pd.DataFrame]): Additional features to be added to the data. Returns: pd.DataFrame: The data with the correct feature names. Raises: ValueError: If `allow_prefix_mismatch` is False and the feature names in the data and the estimator do not match. """ # Create a mapping of lower-case feature names to original feature names feature_name_map = {name.lower(): name for name in feature_names} df_features: pd.DataFrame = self.data.to_dataframe() if extra_features is not None: df_features = pd.concat([df_features, extra_features], axis=1) missing_features = set(f.lower() for f in feature_names) - set(c.lower() for c in df_features.columns) if missing_features: prefix: str = f"{self.name}_" common_prefix: str = self.get_common_prefix(feature_names) if common_prefix and common_prefix + '_' != prefix and allow_prefix_mismatch: prefix = common_prefix + '_' # create a map to support renaming the columns prefixed_feature_map: dict[str, str] = {f: feature_name_map.get(f"{prefix}{f.lower()}") for f in df_features.columns if feature_name_map.get(f"{prefix}{f.lower()}") is not None} df_features.rename(columns=prefixed_feature_map, inplace=True) missing_features = set(f.lower() for f in feature_names) - set(c.lower() for c in df_features.columns) if missing_features: raise ValueError(f"Missing features: {missing_features}, with mc.name: {self.name}, prefix: {prefix}" f" and allow_prefix_mismatch set to {allow_prefix_mismatch}.") # Return the dataframe with the selected features df_features: pd.DataFrame = df_features[feature_names] return df_features
[docs] def calculate_partition(self, ref: 'MassComposition') -> pd.DataFrame: """Calculate the partition of the ref stream relative to self""" self._check_one_dim_interval() return calculate_partition(df_feed=self.data.to_dataframe(), df_ref=ref.data.to_dataframe(), col_mass_dry='mass_dry')
# def resample(self, dim: str, num_intervals: int = 50, edge_precision: int = 8) -> 'MassComposition': # res = deepcopy(self) # res._data = self._data.mc.resample(dim=dim, num_intervals=num_intervals, edge_precision=edge_precision) # return res
[docs] def resample_1d(self, interval_edges: Union[Iterable, int], precision: Optional[int] = None, include_original_edges: bool = False) -> 'MassComposition': """Resample a 1D fractional dim/index Args: interval_edges: The values of the new grid (interval edges). If an int, will up-sample by that factor, for example the value of 10 will automatically define edges that create 10 x the resolution (up-sampled). precision: Optional integer for the number of decimal places to round the grid values to. include_original_edges: If True include the original edges in the grid. Returns: A new object interpolated onto the new grid """ # TODO: add support for supplementary variables df_upsampled: pd.DataFrame = mass_preserving_interp(self.data.to_dataframe(), interval_edges=interval_edges, precision=precision, include_original_edges=include_original_edges) obj: MassComposition = MassComposition(df_upsampled, name=self.name) obj._nodes = self._nodes obj.constraints = self.constraints return obj
[docs] def add(self, other: 'MassComposition', name: Optional[str] = None) -> 'MassComposition': """Add two objects Adds other to self, with optional name of the returned object Args: other: object to add to self name: name of the returned object Returns: """ res: MassComposition = self.__add__(other) if name is not None: res._data.mc.rename(name) return res
[docs] def sub(self, other: 'MassComposition', name: Optional[str] = None) -> 'MassComposition': """Subtract two objects Subtracts other from self, with optional name of the returned object Args: other: object to subtract from self name: name of the returned object Returns: """ res: MassComposition = self.__sub__(other) if name is not None: res._data.mc.rename(name) return res
[docs] def div(self, other: 'MassComposition', name: Optional[str] = None) -> 'MassComposition': """Divide two objects Divides self by other, with optional name of the returned object Args: other: the denominator (or reference) object name: name of the returned object Returns: """ res: MassComposition = self.__truediv__(other) if name is not None: res._data.mc.rename(name) return res
[docs] def plot_bins(self, variables: List[str], cutoff_var: str, bin_width: float, cumulative: bool = True, direction: str = 'descending', ) -> go.Figure: """Plot "The Grade-Tonnage" curve. Mass and grade by bins for a cut-off variable. Args: variables: List of variables to include in the plot cutoff_var: The variable that defines the bins bin_width: The width of the bin cumulative: If True, the results are cumulative weight averaged. direction: 'ascending'|'descending', if cumulative is True, the direction of accumulation """ bin_data: pd.DataFrame = self.binned_mass_composition(cutoff_var=cutoff_var, bin_width=bin_width, cumulative=cumulative, direction=direction, as_dataframe=True) id_var: str = bin_data.index.name df: pd.DataFrame = bin_data[variables].reset_index() # convert the interval to the left edge TODO: make flexible df[id_var] = df[id_var].apply(lambda x: x.left) var_cutoff: str = id_var.replace('_bins', '_cut-off') df.rename(columns={id_var: var_cutoff}, inplace=True) df = df.melt(id_vars=[var_cutoff], var_name='component') fig = px.line(df, x=var_cutoff, y='value', facet_row='component') fig.update_yaxes(matches=None) fig.update_layout(title=self.name) return fig
[docs] def plot_intervals(self, variables: List[str], cumulative: bool = True, direction: str = 'descending', show_edges: bool = True, min_x: Optional[float] = None) -> go.Figure: """Plot "The Grade-Tonnage" curve. Mass and grade by bins for a cut-off variable. Args: variables: List of variables to include in the plot cumulative: If True, the results are cumulative weight averaged. direction: 'ascending'|'descending', if cumulative is True, the direction of accumulation show_edges: If True, show the edges on the plot. Applicable to cumulative plots only. min_x: Optional minimum value for the x-axis, useful to set reasonable visual range with a log scaled x-axis when plotting size data """ res: xr.Dataset = self.data plot_kwargs: Dict = dict(line_shape='vh') if cumulative: res = res.mc.data().mc.cumulate(direction=direction) plot_kwargs = dict(line_shape='spline') interval_data: pd.DataFrame = res.mc.to_dataframe() df_intervals: pd.DataFrame = self._intervals_to_columns(interval_index=interval_data.index) df = pd.concat([df_intervals, interval_data], axis='columns') x_var: str = interval_data.index.name if not cumulative: # append on the largest fraction right edge for display purposes df_end: pd.DataFrame = df.loc[df.index.max(), list(df_intervals.columns) + variables].to_frame().T df_end[df_intervals.columns[0]] = df_end[df_intervals.columns[1]] df_end[df_intervals.columns[1]] = np.inf df = pd.concat([df_end, df], axis='index') df[interval_data.index.name] = df[df_intervals.columns[0]] else: if direction == 'ascending': x_var = df_intervals.columns[1] elif direction == 'descending': x_var = df_intervals.columns[0] if 'size' in x_var: if not min_x: min_x = interval_data.index.min().right / 2.0 # set zero to the minimum x value (for display only) to enable the tooltips on that point. df.loc[df[x_var] == df[x_var].min(), x_var] = min_x hover_data = {'component': True, # add other column, default formatting x_var: ':.3f', # add other column, customized formatting 'value': ':.2f' } plot_kwargs = {**plot_kwargs, **dict(log_x=True, range_x=[min_x, interval_data.index.max().right], hover_data=hover_data)} df = df[[x_var] + variables].melt(id_vars=[x_var], var_name='component') if cumulative and show_edges: plot_kwargs['markers'] = True fig = px.line(df, x=x_var, y='value', facet_row='component', **plot_kwargs) fig.for_each_annotation(lambda a: a.update(text=a.text.replace("component=", ""))) fig.update_yaxes(matches=None) fig.update_layout(title=self.name) return fig
[docs] def plot_grade_recovery(self, target_analyte, discard_from: Literal["lowest", "highest"] = "lowest", title: Optional[str] = None, ) -> go.Figure: """The grade-recovery plot. The grade recovery curve is generated by assuming an ideal separation (for the chosen property, or dimension) at each fractional interval. It defines the theoretical maximum performance, which can only be improved if liberation is improved by comminution. This method is only applicable to a 1D object where the single dimension is a pd.Interval type. Args: target_analyte: The analyte of value. discard_from: Defines the discarded direction. discard_from = "lowest" will discard the lowest value first, then the next lowest, etc. title: Optional plot title Returns: A plotly.GraphObjects figure """ title = title if title is not None else 'Ideal Grade - Recovery' df: pd.DataFrame = self.ideal_incremental_separation(discard_from=discard_from) df_recovery: pd.DataFrame = df.loc[(slice(None), 'recovery'), [target_analyte, 'mass_dry']].droplevel( 'attribute').rename( columns={'mass_dry': 'Yield', target_analyte: f"{target_analyte}_recovery"}) df_composition: pd.DataFrame = df.loc[(slice(None), 'composition'), :].droplevel('attribute').drop( columns=['mass_wet', 'mass_dry', 'H2O']) df_plot: pd.DataFrame = pd.concat([df_recovery, df_composition], axis=1).reset_index() fig = px.line(df_plot, x=target_analyte, y=f"{target_analyte}_recovery", hover_data=df_plot.columns, title=title) # fig.update_layout(xaxis_title=f"Grade of {target_analyte}", yaxis_title=f"Recovery of {target_analyte}", # title=title) return fig
[docs] def plot_amenability(self, target_analyte: str, discard_from: Literal["lowest", "highest"] = "lowest", gangue_analytes: Optional[str] = None, title: Optional[str] = None, ) -> go.Figure: """The yield-recovery plot. The yield recovery curve provides an understanding of the amenability of a sample. This method is only applicable to a 1D object where the single dimension is a pd.Interval type. Args: target_analyte: The analyte of value. discard_from: Defines the discarded direction. discard_from = "lowest" will discard the lowest value first, then the next lowest, etc. gangue_analytes: The analytes to be rejected title: Optional plot title Returns: A plotly.GraphObjects figure """ title = title if title is not None else 'Amenability Plot' df: pd.DataFrame = self.ideal_incremental_recovery(discard_from=discard_from) amenability_indices: pd.Series = amenability_index(df, col_target=target_analyte, col_mass_recovery='mass') analytes = [col for col in df.columns if col != "mass"] if gangue_analytes is None else [ target_analyte + gangue_analytes] mass_rec: pd.DataFrame = df["mass"] df = df[analytes] fig = go.Figure() for analyte in analytes: fig.add_trace( go.Scatter(x=mass_rec, y=df[analyte], mode="lines", name=f"{analyte} ({round(amenability_indices[analyte], 2)})", customdata=df.index.values, hovertemplate='<b>Recovery: %{y:.3f}</b><br>Cut-point: %{customdata:.3f} ')) fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], mode="lines", name='y=x', line=dict(shape='linear', color='gray', dash='dash'), )) fig.update_layout(xaxis_title='Yield (Mass Recovery)', yaxis_title='Recovery', title=title, hovermode='x') return fig
[docs] def plot_parallel(self, color: Optional[str] = None, vars_include: Optional[List[str]] = None, vars_exclude: Optional[List[str]] = None, title: Optional[str] = None, include_dims: Optional[Union[bool, List[str]]] = True, plot_interval_edges: bool = False) -> go.Figure: """Create an interactive parallel plot Useful to explore multidimensional data like mass-composition data Args: color: Optional color variable vars_include: Optional List of variables to include in the plot vars_exclude: Optional List of variables to exclude in the plot title: Optional plot title include_dims: Optional boolean or list of dimension to include in the plot. True will show all dims. plot_interval_edges: If True, interval edges will be plotted instead of interval mid Returns: """ df = self.data.mc.to_dataframe() if not title and hasattr(self, 'name'): title = self.name fig = parallel_plot(data=df, color=color, vars_include=vars_include, vars_exclude=vars_exclude, title=title, include_dims=include_dims, plot_interval_edges=plot_interval_edges) return fig
[docs] def plot_comparison(self, other: 'MassComposition', color: Optional[str] = None, vars_include: Optional[List[str]] = None, vars_exclude: Optional[List[str]] = None, facet_col_wrap: int = 3, trendline: bool = False, trendline_kwargs: Optional[Dict] = None, title: Optional[str] = None) -> go.Figure: """Create an interactive parallel plot Useful to compare the difference in component values between two objects. Args: other: the object to compare with self. color: Optional color variable vars_include: Optional List of variables to include in the plot vars_exclude: Optional List of variables to exclude in the plot trendline: If True and trendlines trendline_kwargs: Allows customising the trendline: ref: https://plotly.com/python/linear-fits/ title: Optional plot title facet_col_wrap: The number of subplot columns per row. Returns: """ df_self: pd.DataFrame = self.data.to_dataframe() df_other: pd.DataFrame = other.data.to_dataframe() if vars_include is not None: missing_vars = set(vars_include).difference(set(df_self.columns)) if len(missing_vars) > 0: raise KeyError(f'var_subset provided contains variable not found in the data: {missing_vars}') df_self = df_self[vars_include] if vars_exclude: df_self = df_self[[col for col in df_self.columns if col not in vars_exclude]] df_other = df_other[df_self.columns] # Supplementary variables are the same for each stream and so will be unstacked. supp_cols: List[str] = [col for col in df_self.columns if col in self.variables.supplementary.get_col_names()] if supp_cols: df_self.set_index(supp_cols, append=True, inplace=True) df_other.set_index(supp_cols, append=True, inplace=True) index_names = list(df_self.index.names) cols = list(df_self.columns).copy() df_self = df_self[cols].assign(name=self.name).reset_index().melt(id_vars=index_names + ['name']) df_other = df_other[cols].assign(name=other.name).reset_index().melt(id_vars=index_names + ['name']) df_plot: pd.DataFrame = pd.concat([df_self, df_other]) df_plot = df_plot.set_index(index_names + ['name', 'variable'], drop=True).unstack(['name']) df_plot.columns = df_plot.columns.droplevel(0) df_plot.reset_index(level=list(np.arange(-1, -len(index_names) - 1, -1)), inplace=True) # set variables back to standard order variable_order: Dict = {col: i for i, col in enumerate(cols)} df_plot = df_plot.sort_values(by=['variable'], key=lambda x: x.map(variable_order)) fig: go.Figure = comparison_plot(data=df_plot, x=self.name, y=other.name, facet_col_wrap=facet_col_wrap, color=color, trendline=trendline, trendline_kwargs=trendline_kwargs) fig.update_layout(title=title) return fig
[docs] def plot_ternary(self, variables: List[str], color: Optional[str] = None, title: Optional[str] = None) -> go.Figure: """Plot a ternary diagram variables: List of 3 components to plot color: Optional color variable title: Optional plot title """ df = self.data.to_dataframe() vars_missing: List[str] = [v for v in variables if v not in df.columns] if vars_missing: raise KeyError(f'Variable/s not found in the dataset: {vars_missing}') cols: List[str] = variables if color is not None: cols.append(color) if color: fig = px.scatter_ternary(df[cols], a=variables[0], b=variables[1], c=variables[2], color=color) else: fig = px.scatter_ternary(df[cols], a=variables[0], b=variables[1], c=variables[2]) if not title and hasattr(self, 'name'): title = self.name fig.update_layout(title=title) return fig
def __str__(self) -> str: res: str = f'\n{self.name}\n' res += str(self.data) return res
[docs] def __add__(self, other: 'MassComposition') -> 'MassComposition': """Add two objects Perform the addition with the mass-composition variables only and then append any attribute variables. Presently ignores any attribute vars in other Args: other: object to add to self Returns: """ xr_sum: xr.Dataset = self._data.mc.add(other._data) res: MassComposition = MassComposition(name=xr_sum.mc.name, constraints=self.constraints) res.set_data(data=xr_sum, constraints=self.constraints) other._nodes = [other._nodes[0], self._nodes[1]] res._nodes = [self._nodes[1], random_int()] return res
def __sub__(self, other: 'MassComposition') -> 'MassComposition': """Subtract the supplied object from self Perform the subtraction with the mass-composition variables only and then append any attribute variables. Args: other: object to subtract from self Returns: """ xr_sub: xr.Dataset = self._data.mc.sub(other._data) res: MassComposition = MassComposition(name=xr_sub.mc.name, constraints=self.constraints) res.set_data(data=xr_sub, constraints=self.constraints) res._nodes = [self._nodes[1], random_int()] return res def __truediv__(self, other: 'MassComposition') -> 'MassComposition': """Divide self by the supplied object Perform the division with the mass-composition variables only and then append any attribute variables. Args: other: denominator object, self will be divided by this object Returns: """ xr_div: xr.Dataset = self._data.mc.div(other._data) res: MassComposition = MassComposition(name=xr_div.mc.name, constraints=self.constraints) res.set_data(data=xr_div, constraints=self.constraints) return res def __eq__(self, other): if isinstance(other, MassComposition): return self.__dict__ == other.__dict__ return False @staticmethod def _check_cols_in_data_cols(cols: List[str], cols_data: List[str]): for col in cols: if (col is not None) and (col not in cols_data): msg: str = f"{col} not in the data columns: {cols_data}" logging.getLogger(__name__).error(msg) raise IndexError(msg) @staticmethod def _copy_all_attrs(xr_to: xr.Dataset, xr_from: xr.Dataset) -> xr.Dataset: xr_to.attrs.update(xr_from.attrs) da: xr.DataArray for new_da, da in zip(xr_to.values(), xr_from.values()): new_da.attrs.update(da.attrs) return xr_to @staticmethod def _clip(xr_ds: xr.Dataset, variables: List[str], limits: Tuple) -> xr.Dataset: if len(variables) == 1: variables = variables[0] xr_ds[variables] = xr_ds[variables].where(xr_ds[variables] > limits[0], limits[0]) xr_ds[variables] = xr_ds[variables].where(xr_ds[variables] < limits[1], limits[1]) return xr_ds def _post_process_split(self, obj_1, obj_2, name_1, name_2): if name_1: obj_1._data.mc.rename(name_1) if name_2: obj_2._data.mc.rename(name_2) obj_1._nodes = [self._nodes[1], random_int()] obj_2._nodes = [self._nodes[1], random_int()] obj_1._name = name_1 obj_2._name = name_2 return obj_1, obj_2 def _intervals_to_columns(self, interval_index: pd.IntervalIndex) -> pd.DataFrame: """Reconstruct columns from an interval index Uses the left and right names stored in the xr.Dataset attrs Args: interval_index: The IntervalIndex to convert to named columns of edges Returns: """ base_name: str = str(interval_index.name) if base_name in self._data.attrs['mc_interval_edges'].keys(): d_edge_names = self._data.attrs['mc_interval_edges'][base_name] else: d_edge_names = {'left': 'left', 'right': 'right'} df_intervals: pd.DataFrame = pd.DataFrame(index=interval_index).reset_index() df_intervals[f'{base_name}_{d_edge_names["left"]}'] = df_intervals[base_name].apply(lambda x: x.left) df_intervals[f'{base_name}_{d_edge_names["right"]}'] = df_intervals[base_name].apply(lambda x: x.right) df_intervals.set_index(base_name, inplace=True) return df_intervals def _create_interval_indexes(self, data: pd.DataFrame) -> pd.DataFrame: if (data.index.names is not None) and (data.index.names[0] is not None): for pair in self.config['intervals']['suffixes']: suffix_candidates: Dict = {n: n.split('_')[-1].lower() for n in data.index.names} suffixes: Dict = {k: v for k, v in suffix_candidates.items() if v in pair} if suffixes: indexes_orig: List = data.index.names data = data.reset_index() num_intervals: int = int(len(suffixes.keys()) / 2) for i in range(0, num_intervals): keys = list(suffixes.keys())[i: i + 2] base_name: str = '_'.join(keys[0].split('_')[:-1]) data[base_name] = pd.arrays.IntervalArray.from_arrays(left=data[keys[0]], right=data[keys[1]], closed=self.config['intervals']['closed']) # verbose but need to preserve index order... new_indexes: List = [] index_edge_names: Dict = {base_name: {'left': keys[0].split('_')[-1], 'right': keys[1].split('_')[-1]}} for index in indexes_orig: if index not in keys: new_indexes.append(index) if (index in keys) and (base_name not in new_indexes): new_indexes.append(base_name) # push the left and right names (suffixes) to the dataset attrs # (series attrs are lost when set to an index) data.attrs = index_edge_names data.set_index(new_indexes, inplace=True) data.drop(columns=keys, inplace=True) return data def _solve_mass_moisture(self, data) -> pd.DataFrame: d_var_map: Dict = self.variables.mass_moisture.property_to_var() d_var_exists: Dict = {k: v in data.columns for k, v in d_var_map.items()} d_mass_var_exists: Dict = {k: v in data.columns for k, v in self.variables.mass.property_to_var().items()} if sum(list(d_var_exists.values())) == 0: raise KeyError(f"Insufficient data supplied to solve mass-moisture: {d_var_exists}") if sum(list(d_mass_var_exists.values())) == 0: raise KeyError(f"At least one mass variable must be supplied to solve mass-moisture: {d_mass_var_exists}") if sum(list(d_var_exists.values())) == 3: # TODO: add mass-moisture balance integrity check. self._logger.info( 'The mass-moisture variables are over-specified and not (yet) checked for balance. ' 'Moisture is ignored and the mass variables assumed to be correct.') # assume zero moisture if sum(list(d_var_exists.values())) == 1: data[d_var_map['moisture']] = 0.0 self._logger.info('Zero moisture has been assumed.') if not d_var_exists['mass_wet']: data[d_var_map['mass_wet']] = solve_mass_moisture(mass_dry=data[d_var_map['mass_dry']], moisture=data[d_var_map['moisture']]) if not d_var_exists['mass_dry']: data[d_var_map['mass_dry']] = solve_mass_moisture(mass_wet=data[d_var_map['mass_wet']], moisture=data[d_var_map['moisture']]) # drop the moisture column, since it is now redundant, work with mass, moisture is dependent property if d_var_exists['moisture']: data.drop(columns=d_var_map['moisture'], inplace=True) return data def _dataframe_to_mc_dataset(self, data): # create the xr.Dataset, dims from the index. xr_ds: xr.Dataset = data.to_xarray() # move the attrs to become coords - HOLD - this creates merging problems in the data property, reconsider. # xr_ds = xr_ds.set_coords(cols_attrs) # add the dataset attributes ds_attrs: Dict = {'mc_name': self._name, 'mc_vars_mass': self.variables.mass.get_var_names(), 'mc_vars_chem': self.variables.chemistry.get_var_names(), 'mc_vars_attrs': self.variables.supplementary.get_var_names(), 'mc_interval_edges': data.attrs} xr_ds.attrs = ds_attrs # add the variable attributes for v in self.variables.xr.variables: xr_ds[v.name].attrs = { 'units': self._mass_units if v.group == VariableGroups.MASS else self._composition_units, 'standard_name': ' '.join( v.name.split('_')[::-1]).title() if v.group == VariableGroups.MASS else v.name, 'mc_type': (VariableGroups.MASS if v.group == VariableGroups.MASS else VariableGroups.CHEMISTRY).value, 'mc_col_orig': v.column_name} return xr_ds def _check_constraints(self) -> pd.DataFrame: """Determine if all records are within the constraints""" # execute column-wise to manage memory df: pd.DataFrame = self.data[self.constraints.keys()].to_dataframe() chunks = [] for variable, bounds in self.constraints.items(): chunks.append(df.loc[(df[variable] < bounds[0]) | (df[variable] > bounds[1]), variable]) oor: pd.DataFrame = pd.concat(chunks, axis='columns') return oor def _check_one_dim_interval(self): if len(self.data.dims) > 1: raise NotImplementedError(f"This object is {len(self.data.dims)} dimensional. " f"Only 1D interval objects are valid") index_var: str = str(list(self.data.dims.keys())[0]) if not isinstance(self.data[index_var].data[0], pd.Interval): raise NotImplementedError(f"The dim {index_var} of this object is not a pd.Interval. " f" Only 1D interval objects are valid")