from typing import Optional, Callable, Generator, Tuple, Union
import pandas as pd
from elphick.mass_composition import MassComposition
[docs]class Stream(MassComposition):
[docs] def __init__(self, mc: MassComposition, **kwargs):
"""
Args:
mc: MassComposition object to be used as a stream.
**kwargs: The key word arguments for the Mass Composition object
"""
# Filter out all private properties
mc_dict = {k: v for k, v in mc.__dict__.items() if
not (k.startswith('_') or k in ['config', 'variables', 'status'])}
super().__init__(**mc_dict, **kwargs)
self.set_data(data=mc._data, constraints=mc.constraints)
self._nodes = mc._nodes
self.variables = mc.variables
@property
def source_node(self):
return self._nodes[0]
@property
def destination_node(self):
return self._nodes[1]
@classmethod
def from_mass_composition(cls, mc: MassComposition, **kwargs):
return cls(mc=mc, **kwargs)
[docs] def split(self, fraction: float,
name_1: Optional[str] = None, name_2: Optional[str] = None) -> tuple['Stream', 'Stream']:
"""
Splits the stream into two streams.
Args:
fraction: The fraction of the stream to be assigned to the first stream.
name_1: The name of the first stream.
name_2: The name of the second stream.
Returns:
A tuple of two Stream objects.
"""
mc1, mc2 = super().split(fraction, name_1, name_2)
return Stream.from_mass_composition(mc1), Stream.from_mass_composition(mc2)
[docs] def split_by_partition(self, partition_definition: Callable,
name_1: Optional[str] = None, name_2: Optional[str] = None) -> tuple['Stream', 'Stream']:
"""
Partition the object along a given dimension.
This method applies the defined separation resulting in two new objects.
See also: split, split_by_function
Args:
partition_definition: A partition function that defines the efficiency of separation along a dimension
name_1: The name of the reference stream created by the split
name_2: The name of the complement stream created by the split
Returns:
tuple of two datasets, the first with the mass fraction specified, the other the complement
"""
mcs = super().split_by_partition(partition_definition, name_1, name_2)
return Stream.from_mass_composition(mcs[0]), Stream.from_mass_composition(mcs[1])
[docs] def split_by_function(self, split_function: Callable,
name_1: Optional[str] = None,
name_2: Optional[str] = None) -> tuple['Stream', 'Stream']:
"""Split an object using a function.
This method applies the function to self, resulting in two new objects. The object returned with name_1
is the result of the function. The object returned with name_2 is the complement.
See also: split, split_by_estimator, split_by_partition
Args:
split_function: Any function that transforms the dataframe from a MassComposition object into a new
dataframe with values representing a new (output) stream. The returned dataframe structure must be
identical to the input dataframe.
name_1: The name of the stream created by the function
name_2: The name of the complement stream created by the split, which is calculated automatically.
Returns:
A generator of two Streams,
"""
mcs = super().split_by_function(split_function, name_1, name_2)
return Stream.from_mass_composition(mcs[0]), Stream.from_mass_composition(mcs[1])
[docs] def split_by_estimator(self, estimator: 'sklearn.base.BaseEstimator',
name_1: Optional[str] = None,
name_2: Optional[str] = None,
extra_features: Optional[pd.DataFrame] = None,
allow_prefix_mismatch: bool = False,
mass_recovery_column: Optional[str] = None,
mass_recovery_max: float = 1.0) -> tuple['Stream', 'Stream']:
"""Split an object using a sklearn estimator.
This method applies the function to self, resulting in two new objects. The object returned with name_1
is the result of the estimator.predict() method. The object returned with name_2 is the complement.
See also: split, split_by_function, split_by_partition
Args:
estimator: Any sklearn estimator that transforms the dataframe from a MassComposition object into a new
dataframe with values representing a new (output) stream using the predict method. The returned
dataframe structure must be identical to the input dataframe.
name_1: The name of the stream created by the estimator.
name_2: The name of the complement stream created by the split, which is calculated automatically.
extra_features: Optional additional features to pass to the estimator as features.
allow_prefix_mismatch: If True, allow feature names to be different and log an info message. If False,
raise an error when feature names are different.
mass_recovery_column: If provided, this indicates that the model has estimated mass recovery, not mass
explicitly. This will execute a transformation of the predicted `dry` mass recovery to dry mass.
mass_recovery_max: The maximum mass recovery value, used to scale the mass recovery to mass. Only
applicable if mass_recovery_column is provided. Should be either 1.0 or 100.0.
Returns:
tuple of two Stream objects, the first the output of the estimator, the other the complement
"""
mcs = super().split_by_estimator(estimator, name_1, name_2, extra_features, allow_prefix_mismatch,
mass_recovery_column, mass_recovery_max)
return Stream.from_mass_composition(mcs[0]), Stream.from_mass_composition(mcs[1])
[docs] def add(self, other: Union['Stream', Tuple['Stream', ...]], name: Optional[str] = None) -> 'Stream':
"""
Adds the stream to another stream or a tuple of streams.
Args:
other: The other stream, or a tuple of other streams
name: The name of the new stream.
Returns:
A Stream object.
"""
# If other is not a tuple, make it a tuple
if not isinstance(other, tuple):
other = (other,)
# Initialize the result as the current stream
strm: Stream = self
# Iterate over each stream in other and add it to the result
for o in other:
if not isinstance(o, Stream):
raise ValueError(f'Expected Stream object, got {type(o)}')
if isinstance(strm, MassComposition):
strm = Stream.from_mass_composition(strm)
strm = super(Stream, strm).add(o, name=name)
return strm
[docs] def sub(self, other: 'Stream', name: Optional[str] = None) -> 'Stream':
"""Subtract two streams
Subtracts other from self, with optional name of the returned object
Args:
other: stream to subtract from self
name: name of the returned stream
Returns:
"""
res: MassComposition = self.__sub__(other)
if name is not None:
res._data.mc.rename(name)
return Stream.from_mass_composition(res)