import logging
from typing import Dict, Optional, List, Union, Iterable, Tuple
import numpy as np
import pandas as pd
from joblib import delayed
from tqdm import tqdm
from elphick.mass_composition import MassComposition
from elphick.mass_composition.utils.interp import _upsample_grid_by_factor
from elphick.mass_composition.utils.parallel import TqdmParallel
from elphick.mass_composition.utils.pd_utils import column_prefix_counts, column_prefixes
logger = logging.getLogger(__name__)
[docs]def create_mass_composition(stream_data: Tuple[Union[int, str], pd.DataFrame],
interval_edges: Optional[Union[Iterable, int]] = None) -> Tuple[
Union[int, str], MassComposition]:
stream, data = stream_data
res = None
try:
if interval_edges is not None:
res = stream, MassComposition(data=data, name=stream).resample_1d(interval_edges=interval_edges)
else:
res = stream, MassComposition(data=data, name=stream)
except Exception as e:
logger.error(f"Error creating MassComposition object for {stream}: {e}")
return res
[docs]def streams_from_dataframe(df: pd.DataFrame,
mc_name_col: Optional[str] = None,
interval_edges: Optional[Union[Iterable, int]] = None,
n_jobs=1) -> Dict[str, MassComposition]:
"""Objects from a DataFrame
Args:
df: The DataFrame
mc_name_col: The column specified contains the names of objects to create.
If None the DataFrame is assumed to be wide and the mc objects will be extracted from column prefixes.
interval_edges: The values of the new grid (interval edges). If an int, will up-sample by that factor, for
example the value of 10 will automatically define edges that create 10 x the resolution (up-sampled).
Applicable only to 1d interval indexes.
n_jobs: The number of parallel jobs to run. If -1, will use all available cores.
Returns:
"""
stream_data: Dict[str, pd.DataFrame] = {}
index_names: List[str] = []
if mc_name_col:
logger.debug("Creating MassComposition objects by name column.")
if mc_name_col in df.index.names:
index_names = df.index.names
df.reset_index(mc_name_col, inplace=True)
if mc_name_col not in df.columns:
raise KeyError(f'{mc_name_col} is not in the columns or indexes.')
names = df[mc_name_col].unique()
for obj_name in tqdm(names, desc='Preparing MassComposition data'):
stream_data[obj_name] = df.query(f'{mc_name_col} == @obj_name')[
[col for col in df.columns if col != mc_name_col]]
if index_names: # reinstate the index on the original dataframe
df.reset_index(inplace=True)
df.set_index(index_names, inplace=True)
else:
logger.debug("Creating MassComposition objects by column prefixes.")
# wide case - find prefixes where there are at least 3 columns
prefix_counts = column_prefix_counts(df.columns)
prefix_cols = column_prefixes(df.columns)
for prefix, n in tqdm(prefix_counts.items(), desc='Preparing MassComposition data by column prefixes'):
if n >= 3: # we need at least 3 columns to create a MassComposition object
logger.info(f"Creating object for {prefix}")
cols = prefix_cols[prefix]
stream_data[prefix] = df[[col for col in df.columns if col in cols]].rename(
columns={col: col.replace(f'{prefix}_', '') for col in df.columns})
if interval_edges is not None:
logger.debug("Resampling MassComposition objects to new interval edges.")
# unify the edges - this will also interp missing grades
if not isinstance(df.index, pd.IntervalIndex):
raise NotImplementedError(f"The index `{df.index}` of the dataframe is not a pd.Interval. "
f" Only 1D interval indexes are valid")
if isinstance(interval_edges, int):
all_edges = []
for strm_data in stream_data.values():
all_edges.extend(list(np.sort(np.unique(list(strm_data.index.left) + list(strm_data.index.right)))))
all_edges = list(set(all_edges))
all_edges.sort()
indx = pd.IntervalIndex.from_arrays(left=all_edges[0:-1], right=all_edges[1:])
interval_edges = _upsample_grid_by_factor(indx=indx, factor=interval_edges)
with TqdmParallel(desc="Creating MassComposition objects", n_jobs=n_jobs,
prefer=None, total=len(stream_data)) as p:
res = p(delayed(create_mass_composition)(stream_data, interval_edges) for stream_data in stream_data.items())
res = dict(res)
return res