Source code for elphick.geomet.base

import copy
import inspect
import logging
import re
from abc import ABC
from pathlib import Path
from typing import Optional, Union, Literal, TypeVar, TYPE_CHECKING, Any

import numpy as np
import pandas as pd

from elphick.geomet.config import read_yaml
from elphick.geomet.utils.components import get_components, is_compositional
from elphick.geomet.utils.moisture import solve_mass_moisture
from elphick.geomet.utils.pandas import mass_to_composition, composition_to_mass, composition_factors
from elphick.geomet.utils.sampling import random_int
from elphick.geomet.utils.timer import log_timer
from .config.config_read import get_column_config
from .plot import parallel_plot, comparison_plot
import plotly.express as px
import plotly.graph_objects as go

if TYPE_CHECKING:
    from elphick.geomet.flowsheet.stream import Stream

# generic type variable, used for type hinting, to indicate that the type is a subclass of MassComposition
MC = TypeVar('MC', bound='MassComposition')


[docs] def filter_kwargs(cls, **kwargs): valid_params = inspect.signature(cls.__init__).parameters res = {k: v for k, v in kwargs.items() if k in valid_params} return res
[docs] class MassComposition(ABC):
[docs] def __init__(self, data: Optional[pd.DataFrame] = None, name: Optional[str] = None, moisture_in_scope: bool = True, mass_wet_var: Optional[str] = None, mass_dry_var: Optional[str] = None, moisture_var: Optional[str] = None, component_vars: Optional[list[str]] = None, composition_units: Literal['%', 'ppm', 'ppb'] = '%', components_as_symbols: bool = True, ranges: Optional[dict[str, list]] = None, config_file: Optional[Path] = None): """ Args: data: The input data name: The name of the sample moisture_in_scope: Whether the moisture is in scope. If False, only dry mass is processed. mass_wet_var: The name of the wet mass column mass_dry_var: The name of the dry mass column moisture_var: The name of the moisture column component_vars: The names of the chemical columns components_as_symbols: If True, convert the composition variables to symbols, e.g. Fe ranges: The range of valid data for each column in the data config_file: The configuration file """ self._logger = logging.getLogger(name=self.__class__.__name__) if config_file is None: config_file = Path(__file__).parent / './config/mc_config.yml' self.config = read_yaml(config_file) self.name: str = name self.moisture_in_scope: bool = moisture_in_scope self.mass_wet_var: Optional[str] = mass_wet_var self.mass_dry_var: str = mass_dry_var self.moisture_var: Optional[str] = moisture_var self.component_vars: Optional[list[str]] = component_vars # TODO: check if this is redundant and remove. self.composition_units: Literal['%', 'ppm', 'ppb'] = composition_units self.composition_factor: int = composition_factors[composition_units] self.components_as_symbols: bool = components_as_symbols self._mass_data: Optional[pd.DataFrame] = None self._supplementary_data = None self._aggregate = None # set the data self.data = data # add the OOR status object self.status = OutOfRangeStatus(self, ranges)
@property @log_timer def data(self) -> Optional[pd.DataFrame]: if self._mass_data is not None: # convert chem mass to composition mass_comp_data = mass_to_composition(self._mass_data, mass_wet=self.mass_wet_var, mass_dry=self.mass_dry_var, moisture_column_name='H2O' if self.components_as_symbols else ( self.moisture_var if self.moisture_var is not None else 'h2o'), component_columns=self.composition_columns, composition_units=self.composition_units) # append the supplementary vars return pd.concat([mass_comp_data, self._supplementary_data], axis=1) return None @data.setter @log_timer def data(self, value): if value is not None: # Convert column names to symbols if components_as_symbols is True if self.components_as_symbols: symbol_dict = is_compositional(value.columns, strict=False) value.columns = [symbol_dict.get(col, col) for col in value.columns] # the config provides regex search keys to detect mass and moisture columns if they are not specified. mass_totals = self._solve_mass(value) composition, supplementary_data = self._get_non_mass_data(value) self._supplementary_data = supplementary_data self.mass_data = composition_to_mass(pd.concat([mass_totals, composition], axis=1), mass_wet=self.mass_wet_var, mass_dry=self.mass_dry_var, moisture_column_name=self.moisture_column, component_columns=composition.columns, composition_units=self.composition_units) self._logger.debug(f"Data has been set.") else: self._mass_data = None @property def mass_data(self): return self._mass_data @mass_data.setter def mass_data(self, value): self._mass_data = value # Recalculate the aggregate whenever the data changes self.aggregate = self.weight_average() @property def aggregate(self) -> pd.DataFrame: if self._aggregate is None and self._mass_data is not None: self._aggregate = self.weight_average() return self._aggregate @aggregate.setter def aggregate(self, value): self._aggregate = value @property def variable_map(self) -> Optional[dict[str, str]]: """A map from lower case standard names to the actual column names""" if self._mass_data is not None: existing_columns = list(self._mass_data.columns) res = {} if self.moisture_in_scope and self.mass_wet_var in existing_columns: res['mass_wet'] = self.mass_wet_var if self.mass_dry_var in existing_columns: res['mass_dry'] = self.mass_dry_var if self.moisture_in_scope: res['moisture'] = self.moisture_var if self.components_as_symbols: res['moisture'] = is_compositional([self.moisture_var], strict=False).get(self.moisture_var, self.moisture_var) if self.composition_columns: for col in self.composition_columns: res[col.lower()] = col return res return None @property def mass_columns(self) -> Optional[list[str]]: if self._mass_data is not None: existing_columns = list(self._mass_data.columns) res = [] if self.moisture_in_scope and self.mass_wet_var in existing_columns: res.append(self.mass_wet_var) if self.mass_dry_var in existing_columns: res.append(self.mass_dry_var) return res return None @property def moisture_column(self) -> Optional[str]: res = 'h2o' if self.moisture_in_scope: res = self.moisture_var if self.components_as_symbols: res = is_compositional([res], strict=False).get(res, res) return res @property def composition_columns(self) -> Optional[list[str]]: res = None if self._mass_data is not None: if self.moisture_in_scope: res = list(self._mass_data.columns)[2:] else: res = list(self._mass_data.columns)[1:] return res @property def supplementary_columns(self) -> Optional[list[str]]: res = None if self._supplementary_data is not None: res = list(self._supplementary_data.columns) return res @property def data_columns(self) -> list[str]: return [col for col in (self.mass_columns + [self.moisture_column] + self.composition_columns + self.supplementary_columns) if col is not None]
[docs] def plot_parallel(self, color: Optional[str] = None, vars_include: Optional[list[str]] = None, vars_exclude: Optional[list[str]] = None, title: Optional[str] = None, include_dims: Optional[Union[bool, list[str]]] = True, plot_interval_edges: bool = False) -> go.Figure: """Create an interactive parallel plot Useful to explore multidimensional data like mass-composition data Args: color: Optional color variable vars_include: Optional list of variables to include in the plot vars_exclude: Optional list of variables to exclude in the plot title: Optional plot title include_dims: Optional boolean or list of dimension to include in the plot. True will show all dims. plot_interval_edges: If True, interval edges will be plotted instead of interval mid Returns: """ if not title and hasattr(self, 'name'): title = self.name fig = parallel_plot(data=self.data, color=color, vars_include=vars_include, vars_exclude=vars_exclude, title=title, include_dims=include_dims, plot_interval_edges=plot_interval_edges) return fig
[docs] def plot_comparison(self, other: MC, color: Optional[str] = None, vars_include: Optional[list[str]] = None, vars_exclude: Optional[list[str]] = None, facet_col_wrap: int = 3, trendline: bool = False, trendline_kwargs: Optional[dict] = None, title: Optional[str] = None) -> go.Figure: """Create an interactive parallel plot Useful to compare the difference in component values between two objects. Args: other: the object to compare with self. color: Optional color variable vars_include: Optional List of variables to include in the plot vars_exclude: Optional List of variables to exclude in the plot trendline: If True and trendlines trendline_kwargs: Allows customising the trendline: ref: https://plotly.com/python/linear-fits/ title: Optional plot title facet_col_wrap: The number of subplot columns per row. Returns: """ df_self: pd.DataFrame = self.data.to_dataframe() df_other: pd.DataFrame = other.data.to_dataframe() if vars_include is not None: missing_vars = set(vars_include).difference(set(df_self.columns)) if len(missing_vars) > 0: raise KeyError(f'var_subset provided contains variable not found in the data: {missing_vars}') df_self = df_self[vars_include] if vars_exclude: df_self = df_self[[col for col in df_self.columns if col not in vars_exclude]] df_other = df_other[df_self.columns] # Supplementary variables are the same for each stream and so will be unstacked. supp_cols: list[str] = self.supplementary_columns if supp_cols: df_self.set_index(supp_cols, append=True, inplace=True) df_other.set_index(supp_cols, append=True, inplace=True) index_names = list(df_self.index.names) cols = list(df_self.columns).copy() df_self = df_self[cols].assign(name=self.name).reset_index().melt(id_vars=index_names + ['name']) df_other = df_other[cols].assign(name=other.name).reset_index().melt(id_vars=index_names + ['name']) df_plot: pd.DataFrame = pd.concat([df_self, df_other]) df_plot = df_plot.set_index(index_names + ['name', 'variable'], drop=True).unstack(['name']) df_plot.columns = df_plot.columns.droplevel(0) df_plot.reset_index(level=list(np.arange(-1, -len(index_names) - 1, -1)), inplace=True) # set variables back to standard order variable_order: dict = {col: i for i, col in enumerate(cols)} df_plot = df_plot.sort_values(by=['variable'], key=lambda x: x.map(variable_order)) fig: go.Figure = comparison_plot(data=df_plot, x=self.name, y=other.name, facet_col_wrap=facet_col_wrap, color=color, trendline=trendline, trendline_kwargs=trendline_kwargs) fig.update_layout(title=title) return fig
[docs] def plot_ternary(self, variables: list[str], color: Optional[str] = None, title: Optional[str] = None) -> go.Figure: """Plot a ternary diagram variables: List of 3 components to plot color: Optional color variable title: Optional plot title """ df = self.data vars_missing: list[str] = [v for v in variables if v not in df.columns] if vars_missing: raise KeyError(f'Variable/s not found in the dataset: {vars_missing}') cols: list[str] = variables if color is not None: cols.append(color) if color: fig = px.scatter_ternary(df[cols], a=variables[0], b=variables[1], c=variables[2], color=color) else: fig = px.scatter_ternary(df[cols], a=variables[0], b=variables[1], c=variables[2]) if not title and hasattr(self, 'name'): title = self.name fig.update_layout(title=title) return fig
def weight_average(self, group_by: Optional[str] = None) -> pd.DataFrame: if group_by is None: composition: pd.DataFrame = pd.DataFrame( self._mass_data[self.composition_columns].sum(axis=0) / self._mass_data[ self.mass_dry_var].sum() * self.composition_factor).T mass_sum = pd.DataFrame(self._mass_data[self.mass_columns].sum(axis=0)).T # Recalculate the moisture if self.moisture_in_scope: mass_sum[self.moisture_column] = solve_mass_moisture(mass_wet=mass_sum[self.mass_columns[0]], mass_dry=mass_sum[self.mass_columns[1]]) # Create a DataFrame from the weighted averages weighted_averages_df = pd.concat([mass_sum, composition], axis=1) else: group_var: pd.Series = self._supplementary_data[group_by] weighted_averages_df = self._mass_data.groupby(group_var).apply( lambda x: pd.DataFrame( x[self.composition_columns].sum(axis=0) / x[self.mass_dry_var].sum() * self.composition_factor).T) weighted_averages_df.index = weighted_averages_df.index.droplevel(-1) mass_sum = self._mass_data[self.mass_columns].groupby(group_var).sum() weighted_averages_df = pd.concat([mass_sum, weighted_averages_df], axis=1) if self.moisture_in_scope: weighted_averages_df.insert(loc=2, column=self.moisture_column, value=solve_mass_moisture( mass_wet=mass_sum[self.mass_columns[0]], mass_dry=mass_sum[self.mass_columns[1]])) return weighted_averages_df def _solve_mass(self, value) -> pd.DataFrame: """Solve mass_wet and mass_dry from the provided columns. Args: value: The input data with the column-names provided by the user\ Returns: The mass data, with the columns mass_wet and mass_dry. Only mass_dry if moisture_in_scope is False. """ # Auto-detect columns if they are not provided mass_dry, mass_wet, moisture = self._extract_mass_moisture_columns(value) if mass_dry is None: if mass_wet is not None and moisture is not None: value[self.mass_dry_var] = solve_mass_moisture(mass_wet=mass_wet, moisture=moisture) else: msg = (f"mass_dry_var is not provided and cannot be calculated from mass_wet_var and moisture_var " f"for {self.name}") self._logger.error(msg) raise ValueError(msg) if self.moisture_in_scope: if mass_wet is None: if mass_dry is not None and moisture is not None: value[self.mass_wet_var] = solve_mass_moisture(mass_dry=mass_dry, moisture=moisture) else: msg = ( f"mass_wet_var is not provided and cannot be calculated from mass_dry_var and moisture_var. " f"Consider specifying the mass_wet_var, mass_dry_var and moisture_var, or alternatively set " f"moisture_in_scope to False for {self.name}") self._logger.error(msg) raise ValueError(msg) if moisture is None: if mass_wet is not None and mass_dry is not None: value[self.moisture_var] = solve_mass_moisture(mass_wet=mass_wet, mass_dry=mass_dry) else: msg = f"moisture_var is not provided and cannot be calculated from mass_wet_var and mass_dry_var." self._logger.error(msg) raise ValueError(msg) mass_totals: pd.DataFrame = value[[self.mass_wet_var, self.mass_dry_var]] else: mass_totals: pd.DataFrame = value[[self.mass_dry_var]] return mass_totals # Helper method to extract column def _extract_column(self, value, var_type): var = getattr(self, f"{var_type}_var") if var is None: var = next((col for col in value.columns if re.search(self.config['vars'][var_type]['search_regex'], col, re.IGNORECASE)), self.config['vars'][var_type]['default_name']) return var def _extract_mass_moisture_columns(self, value): if self.mass_wet_var is None: self.mass_wet_var = self._extract_column(value, 'mass_wet') if self.mass_dry_var is None: self.mass_dry_var = self._extract_column(value, 'mass_dry') if self.moisture_var is None: self.moisture_var = self._extract_column(value, 'moisture') mass_wet = value.get(self.mass_wet_var) mass_dry = value.get(self.mass_dry_var) moisture = value.get(self.moisture_var) return mass_dry, mass_wet, moisture def _get_non_mass_data(self, value: Optional[pd.DataFrame]) -> (Optional[pd.DataFrame], Optional[pd.DataFrame]): """ Get the composition data and supplementary data. Extract only the composition columns specified, otherwise detect the compositional columns """ composition = None supplementary = None if value is not None: if self.component_vars is None: non_mass_cols: list[str] = [col for col in value.columns if col not in [self.mass_wet_var, self.mass_dry_var, self.moisture_var, 'h2o', 'H2O', 'H2O']] component_cols: list[str] = get_components(value[non_mass_cols], strict=False) else: component_cols: list[str] = self.component_vars composition = value[component_cols] supplementary_cols: list[str] = [col for col in value.columns if col not in component_cols + [self.mass_wet_var, self.mass_dry_var, self.moisture_var, 'h2o', 'H2O', 'H2O']] supplementary = value[supplementary_cols] return composition, supplementary def __deepcopy__(self, memo): # Create a new instance of our class new_obj = self.__class__() memo[id(self)] = new_obj # Copy each attribute for attr, value in self.__dict__.items(): setattr(new_obj, attr, copy.deepcopy(value, memo)) return new_obj def update_mass_data(self, value: pd.DataFrame): if self._mass_data is not None: self._mass_data = value if self._supplementary_data is not None: if self._supplementary_data.index.names != self._mass_data.index.names: # if indexes have been dropped self._supplementary_data.index = self._mass_data.index self._supplementary_data = self._supplementary_data.loc[value.index] self.aggregate = self.weight_average()
[docs] def filter_by_index(self, index: pd.Index): """Update the data by index""" if self._mass_data is not None: self._mass_data = self._mass_data.loc[index] if self._supplementary_data is not None: self._supplementary_data = self._supplementary_data.loc[index] self.aggregate = self.weight_average()
[docs] def split(self, fraction: float, name_1: Optional[str] = None, name_2: Optional[str] = None, include_supplementary_data: bool = False) -> tuple['Stream', 'Stream']: """Split the object by mass A simple mass split maintaining the same composition Args: fraction: A constant in the range [0.0, 1.0] name_1: The name of the reference object created by the split name_2: The name of the complement object created by the split include_supplementary_data: Whether to inherit the supplementary variables Returns: tuple of two objects, the first with the mass fraction specified, the other the complement """ # create_congruent_objects to preserve properties like constraints name_1 = name_1 if name_1 is not None else f"{self.name}_1" name_2 = name_2 if name_2 is not None else f"{self.name}_2" ref: MassComposition = self.create_congruent_object(name=name_1, include_mc_data=True, include_supp_data=include_supplementary_data) ref.update_mass_data(self._mass_data * fraction) comp: MassComposition = self.create_congruent_object(name=name_2, include_mc_data=True, include_supp_data=include_supplementary_data) comp.update_mass_data(self._mass_data * (1 - fraction)) # Ensure self and other are Stream objects self._convert_to_stream(self) self._convert_to_stream(ref) self._convert_to_stream(comp) self: 'Stream' ref: 'Stream' comp: 'Stream' # create the relationships ref.nodes = [self.nodes[1], random_int()] comp.nodes = [self.nodes[1], random_int()] return ref, comp
[docs] def add(self, other: MC, name: Optional[str] = None, include_supplementary_data: bool = False) -> 'Stream': """Add two objects together Args: other: The other object name: The name of the new object include_supplementary_data: Whether to include the supplementary data Returns: The new object """ res: MC = self.create_congruent_object(name=name, include_mc_data=True, include_supp_data=include_supplementary_data) res.update_mass_data(self._mass_data + other._mass_data) # Ensure self and other are Stream objects self: 'Stream' = self.to_stream() other: 'Stream' = self._convert_to_stream(other) res: 'Stream' = self._convert_to_stream(res) # create the relationships other.nodes = [other.nodes[0], self.nodes[1]] res.nodes = [self.nodes[1], random_int()] return res
[docs] def sub(self, other: MC, name: Optional[str] = None, include_supplementary_data: bool = False) -> 'Stream': """Subtract other from self Args: other: The other object name: The name of the new object include_supplementary_data: Whether to include the supplementary data Returns: The new object """ res = self.create_congruent_object(name=name, include_mc_data=True, include_supp_data=include_supplementary_data) res.update_mass_data(self._mass_data - other._mass_data) # Ensure self and other are Stream objects self._convert_to_stream(self) self._convert_to_stream(other) self: 'Stream' other: 'Stream' # create the relationships res.nodes = [self.nodes[1], random_int()] return res
[docs] def div(self, other: MC, name: Optional[str] = None, include_supplementary_data: bool = False) -> MC: """Divide two objects Divides self by other, with optional name of the returned object Args: other: the denominator (or reference) object name: name of the returned object include_supplementary_data: Whether to include the supplementary data Returns: """ new_obj = self.create_congruent_object(name=name, include_mc_data=True, include_supp_data=include_supplementary_data) new_obj.update_mass_data(self._mass_data / other._mass_data) return new_obj
def __str__(self): return f"{self.__class__.__name__}: {self.name}\n{self.aggregate.to_dict()}"
[docs] def create_congruent_object(self, name: str, include_mc_data: bool = False, include_supp_data: bool = False) -> MC: """Create an object with the same attributes""" # Create a new instance of our class new_obj = self.__class__() # Copy each attribute for attr, value in self.__dict__.items(): if attr == '_mass_data' and not include_mc_data: continue if attr == '_supplementary_data' and not include_supp_data: continue setattr(new_obj, attr, copy.deepcopy(value)) new_obj.name = name return new_obj
[docs] def __add__(self, other: MC) -> 'Stream': """Add two objects Perform the addition with the mass-composition variables only and then append any attribute variables. Presently ignores any attribute vars in other Args: other: object to add to self Returns: """ return self.add(other, include_supplementary_data=True)
def __sub__(self, other: MC) -> 'Stream': """Subtract the supplied object from self Perform the subtraction with the mass-composition variables only and then append any attribute variables. Args: other: object to subtract from self Returns: """ return self.sub(other, include_supplementary_data=True) def to_stream(self) -> 'Stream': from elphick.geomet.flowsheet.stream import Stream # Local import to avoid circular dependency if not isinstance(self, Stream): self.__class__ = type(self.__class__.__name__, (self.__class__, Stream), {}) filtered_kwargs = filter_kwargs(self.__class__, **self.__dict__) filtered_kwargs['data'] = self.data Stream.__init__(self, **filtered_kwargs) # Initialize Stream properties return self @staticmethod def _convert_to_stream(obj) -> 'Stream': from elphick.geomet.flowsheet.stream import Stream # Local import to avoid circular dependency if not isinstance(obj, Stream): obj.__class__ = type(obj.__class__.__name__, (obj.__class__, Stream), {}) filtered_kwargs = filter_kwargs(obj.__class__, **obj.__dict__) filtered_kwargs['data'] = obj.data Stream.__init__(obj, **filtered_kwargs) # Initialize Stream properties return obj def __truediv__(self, other: MC) -> MC: """Divide self by the supplied object Perform the division with the mass-composition variables only and then append any attribute variables. Args: other: denominator object, self will be divided by this object Returns: """ return self.div(other, include_supplementary_data=True) def __eq__(self, other): if isinstance(other, MassComposition): return self.__dict__ == other.__dict__ return False
[docs] @classmethod def from_mass_dataframe(cls, mass_df: pd.DataFrame, mass_wet: Optional[str] = 'mass_wet', mass_dry: str = 'mass_dry', moisture_column_name: Optional[str] = None, component_columns: Optional[list[str]] = None, composition_units: Literal['%', 'ppm', 'ppb'] = '%', **kwargs): """ Class method to create a MassComposition object from a mass dataframe. Args: mass_df: DataFrame with mass data. **kwargs: Additional arguments to pass to the MassComposition constructor. Returns: A new MassComposition object. """ # Convert mass to composition using the function from the pandas module composition_df = mass_to_composition(mass_df, mass_wet=mass_wet, mass_dry=mass_dry, moisture_column_name=moisture_column_name, component_columns=component_columns, composition_units=composition_units) # Create a new instance of the MassComposition class return cls(data=composition_df, **kwargs)
[docs] def query(self, expr: str, name: Optional[str] = None) -> MC: """Reduce the data by a query expression Args: expr: A pandas query expression name: name of the new object Returns: A new object with the reduced data """ name = name if name is not None else self.name res = self.create_congruent_object(name=f"{name} ({expr})", include_mc_data=True, include_supp_data=True) filtered_index = self.data.query(expr).index res.update_mass_data(self._mass_data.loc[filtered_index]) if res.supplementary_columns is not None: res._supplementary_data = self._supplementary_data.loc[filtered_index] return res
def reset_index(self, index_name: str) -> MC: res = self.create_congruent_object(name=f"{self.name} (reset_index)", include_mc_data=True, include_supp_data=True) res.update_mass_data(self._mass_data.reset_index(level=index_name, drop=True)) if res.supplementary_columns is not None: res._supplementary_data = self._supplementary_data.reset_index(level=index_name, drop=False) else: res._supplementary_data = pd.DataFrame(index=self._mass_data.index, columns=[index_name], data=self._mass_data.index.get_level_values(index_name)) return res
[docs] class OutOfRangeStatus: """A class to check and report out-of-range records in an MC object."""
[docs] def __init__(self, mc: 'MC', ranges: dict[str, list]): """Initialize with an MC object.""" self._logger = logging.getLogger(__name__) self.mc: 'MC' = mc self.ranges: Optional[dict[str, list]] = None self.oor: Optional[pd.DataFrame] = None self.num_oor: Optional[int] = None self.failing_components: Optional[list[str]] = None if mc.mass_data is not None: self.ranges = self.get_ranges(ranges) self.oor: pd.DataFrame = self._check_range() self.num_oor: int = len(self.oor) self.failing_components: Optional[list[str]] = list( self.oor.dropna(axis=1).columns) if self.num_oor > 0 else None
def get_ranges(self, ranges: dict[str, list]) -> dict[str, list]: d_ranges: dict = get_column_config(config_dict=self.mc.config, var_map=self.mc.variable_map, config_key='range') # modify the default dict based on any user passed constraints if ranges: for k, v in ranges.items(): d_ranges[k] = v return d_ranges def _check_range(self) -> pd.DataFrame: """Check if all records are within the constraints.""" if self.mc._mass_data is not None: df: pd.DataFrame = self.mc.data[self.ranges.keys()] chunks = [] for variable, bounds in self.ranges.items(): chunks.append(df.loc[(df[variable] < bounds[0]) | (df[variable] > bounds[1]), variable]) oor: pd.DataFrame = pd.concat(chunks, axis='columns') else: # An empty object will have ok status oor: pd.DataFrame = pd.DataFrame(columns=list(self.ranges.keys())) return oor @property def ok(self) -> bool: """Return True if all records are within range, False otherwise.""" if self.num_oor > 0: self._logger.warning(f'{self.num_oor} out of range records exist.') return True if self.num_oor == 0 else False def __str__(self) -> str: """Return a string representation of the status.""" res: str = f'status.ok: {self.ok}\n' res += f'num_oor: {self.num_oor}' return res def __eq__(self, other: object) -> bool: """Return True if other Status has the same out-of-range records.""" if isinstance(other, OutOfRangeStatus): return self.oor.equals(other.oor) return False