Source code for elphick.mass_composition.utils.loader

import logging
from typing import Dict, Optional, List, Union, Iterable, Tuple

import numpy as np
import pandas as pd
from joblib import delayed
from tqdm import tqdm

from elphick.mass_composition import MassComposition
from elphick.mass_composition.utils.interp import _upsample_grid_by_factor
from elphick.mass_composition.utils.parallel import TqdmParallel
from elphick.mass_composition.utils.pd_utils import column_prefix_counts, column_prefixes

logger = logging.getLogger(__name__)


[docs]def create_mass_composition(stream_data: Tuple[Union[int, str], pd.DataFrame], interval_edges: Optional[Union[Iterable, int]] = None) -> Tuple[ Union[int, str], MassComposition]: stream, data = stream_data res = None try: if interval_edges is not None: res = stream, MassComposition(data=data, name=stream).resample_1d(interval_edges=interval_edges) else: res = stream, MassComposition(data=data, name=stream) except Exception as e: logger.error(f"Error creating MassComposition object for {stream}: {e}") return res
[docs]def streams_from_dataframe(df: pd.DataFrame, mc_name_col: Optional[str] = None, interval_edges: Optional[Union[Iterable, int]] = None, n_jobs=1) -> Dict[str, MassComposition]: """Objects from a DataFrame Args: df: The DataFrame mc_name_col: The column specified contains the names of objects to create. If None the DataFrame is assumed to be wide and the mc objects will be extracted from column prefixes. interval_edges: The values of the new grid (interval edges). If an int, will up-sample by that factor, for example the value of 10 will automatically define edges that create 10 x the resolution (up-sampled). Applicable only to 1d interval indexes. n_jobs: The number of parallel jobs to run. If -1, will use all available cores. Returns: """ stream_data: Dict[str, pd.DataFrame] = {} index_names: List[str] = [] if mc_name_col: logger.debug("Creating MassComposition objects by name column.") if mc_name_col in df.index.names: index_names = df.index.names df.reset_index(mc_name_col, inplace=True) if mc_name_col not in df.columns: raise KeyError(f'{mc_name_col} is not in the columns or indexes.') names = df[mc_name_col].unique() for obj_name in tqdm(names, desc='Preparing MassComposition data'): stream_data[obj_name] = df.query(f'{mc_name_col} == @obj_name')[ [col for col in df.columns if col != mc_name_col]] if index_names: # reinstate the index on the original dataframe df.reset_index(inplace=True) df.set_index(index_names, inplace=True) else: logger.debug("Creating MassComposition objects by column prefixes.") # wide case - find prefixes where there are at least 3 columns prefix_counts = column_prefix_counts(df.columns) prefix_cols = column_prefixes(df.columns) for prefix, n in tqdm(prefix_counts.items(), desc='Preparing MassComposition data by column prefixes'): if n >= 3: # we need at least 3 columns to create a MassComposition object logger.info(f"Creating object for {prefix}") cols = prefix_cols[prefix] stream_data[prefix] = df[[col for col in df.columns if col in cols]].rename( columns={col: col.replace(f'{prefix}_', '') for col in df.columns}) if interval_edges is not None: logger.debug("Resampling MassComposition objects to new interval edges.") # unify the edges - this will also interp missing grades if not isinstance(df.index, pd.IntervalIndex): raise NotImplementedError(f"The index `{df.index}` of the dataframe is not a pd.Interval. " f" Only 1D interval indexes are valid") if isinstance(interval_edges, int): all_edges = [] for strm_data in stream_data.values(): all_edges.extend(list(np.sort(np.unique(list(strm_data.index.left) + list(strm_data.index.right))))) all_edges = list(set(all_edges)) all_edges.sort() indx = pd.IntervalIndex.from_arrays(left=all_edges[0:-1], right=all_edges[1:]) interval_edges = _upsample_grid_by_factor(indx=indx, factor=interval_edges) with TqdmParallel(desc="Creating MassComposition objects", n_jobs=n_jobs, prefer=None, total=len(stream_data)) as p: res = p(delayed(create_mass_composition)(stream_data, interval_edges) for stream_data in stream_data.items()) res = dict(res) return res