Source code for elphick.mass_composition.utils.interp

from typing import List, Dict, Optional, Iterable, Union

import numpy as np
import pandas as pd
from scipy.interpolate import pchip_interpolate

import xarray as xr

from elphick.mass_composition.utils.pd_utils import composition_to_mass, mass_to_composition


[docs]def interp_monotonic(ds: xr.Dataset, coords: Dict, include_original_coords: bool = True) -> xr.Dataset: """Interpolate with zero mass loss using pchip The pchip interpolation cannot be used via the xr.Dataset.interp method directly due to an error. This interpolates data_vars independently for a single dimension (coord) at a time. The function will: - convert from relative composition (%) to absolute (mass) - convert the index from interval to a float representing the right edge of the interval - cumsum to provide monotonic increasing data - interpolate with a pchip spline to preserve mass - diff to recover the original fractional data - reconstruct the interval index from the right edges - convert from absolute to relative composition Args: ds: The xarray Dataset with relative composition context include_original_coords: If True include the original coordinates in the result coords: A dictionary of coordinates mapped to the interpolated values. Returns: """ if len(coords) > 1: raise NotImplementedError("Not yet tested for more than one dimension") ds_res: xr.Dataset = ds for coord, x in coords.items(): ds_mass: xr.Dataset = ds.mc.composition_to_mass().sortby(variables=coord, ascending=True) # preserve the minimum interval index for later original_index = pd.arrays.IntervalArray(ds_mass[coord].data) mass: xr.Dataset = ds_mass.cumsum(keep_attrs=True) # put the coords back mass = mass.assign_coords(**ds_mass.coords) # # we'll work in cumulative mass space, using the right edge of the fraction (passing in the size context) mass['size'] = pd.arrays.IntervalArray(mass['size'].data).right # check the input is monotonic mass_check: pd.Series = mass.to_dataframe().apply(lambda col: col.is_monotonic_increasing, axis='index') if not np.all(mass_check): raise ValueError("The input data is not monotonic - have you not passed a cumulative mass dataset?") chunks: List[np.ndarray] = [] for v in list(mass.data_vars): chunks.append(pchip_interpolate(mass[coord], mass[v], x)) df = pd.DataFrame(data=chunks, index=list(mass.data_vars), columns=x).T df.index.name = coord mass_check: pd.Series = df.apply(lambda col: col.is_monotonic_increasing, axis='index') if not np.all(mass_check): raise ValueError("The interpolation is not monotonic - mass has not been preserved.") if include_original_coords: ds_res: xr.Dataset = xr.concat([mass, xr.Dataset.from_dataframe(df)], dim=coord, combine_attrs='override') ds_res = ds_res.drop_duplicates(dim=coord).sortby(variables=coord, ascending=True) else: ds_res: xr.Dataset = xr.Dataset.from_dataframe(df) ds_res.attrs.update(ds_res.attrs) da: xr.DataArray for new_da, da in zip(ds_res.values(), ds_res.values()): new_da.attrs.update(da.attrs) # back to fractions using diff, concat to inject in the correct first record ds_res = xr.concat([mass.isel({coord: 0}).expand_dims(coord), ds_res.diff(dim=coord)], dim=coord) # create a new interval index interval_index: pd.Series = pd.Series(pd.IntervalIndex.from_arrays( left=ds_res[coord].shift({coord: 1}).fillna(original_index.min().left).values, right=ds_res[coord].values, closed='left'), name=coord) ds_res[coord] = interval_index.values ds_res = ds_res.sortby(variables=coord, ascending=False) ds_res = ds_res.mc.mass_to_composition() return ds_res
[docs]def mass_preserving_interp(df_intervals: pd.DataFrame, interval_edges: Union[Iterable, int], include_original_edges: bool = True, precision: Optional[int] = None, mass_wet: str = 'mass_wet', mass_dry: str = 'mass_dry') -> pd.DataFrame: """Interpolate with zero mass loss using pchip The pchip interpolation cannot be used via the xr.Dataset.interp method directly due to an error. This interpolates data_vars independently for a single dimension (coord) at a time. The function will: - convert from relative composition (%) to absolute (mass) - convert the index from interval to a float representing the right edge of the interval - cumsum to provide monotonic increasing data - interpolate with a pchip spline to preserve mass - diff to recover the original fractional data - reconstruct the interval index from the right edges - convert from absolute to relative composition Args: df_intervals: A pd.DataFrame with a single interval index, with mass, composition context. interval_edges: The values of the new grid (interval edges). If an int, will up-sample by that factor, for example the value of 10 will automatically define edges that create 10 x the resolution (up-sampled). include_original_edges: If True include the original index edges in the result precision: Number of decimal places to round the index (edge) values. mass_wet: The wet mass column, not optional. Consider solve_mass_moisture prior to this call if needed. mass_dry: The dry mass column, not optional. Consider solve_mass_moisture prior to this call if needed. Returns: """ if not isinstance(df_intervals.index, pd.IntervalIndex): raise NotImplementedError(f"The index `{df_intervals.index}` of the dataframe is not a pd.Interval. " f" Only 1D interval indexes are valid") composition_in: pd.DataFrame = df_intervals.copy() if isinstance(interval_edges, int): grid_vals = _upsample_grid_by_factor(indx=composition_in.sort_index().index, factor=interval_edges) else: grid_vals = interval_edges if precision is not None: composition_in.index = pd.IntervalIndex.from_arrays(np.round(df_intervals.index.left, precision), np.round(df_intervals.index.right, precision), closed=df_intervals.index.closed, name=df_intervals.index.name) grid_vals = np.round(grid_vals, precision) if include_original_edges: original_edges = np.hstack([df_intervals.index.left, df_intervals.index.right]) grid_vals = np.sort(np.unique(np.hstack([grid_vals, original_edges]))) if not isinstance(grid_vals, np.ndarray): grid_vals = np.array(grid_vals) # convert from relative composition (%) to absolute (mass) mass_in: pd.DataFrame = composition_to_mass(composition_in, mass_wet=mass_wet, mass_dry=mass_dry) # convert the index from interval to a float representing the right edge of the interval mass_in.index = mass_in.index.right # add a row of zeros mass_in = pd.concat([mass_in, pd.Series(0, index=mass_in.columns).to_frame().T], axis=0).sort_index(ascending=True) # cumsum to provide monotonic increasing data mass_cum: pd.DataFrame = mass_in.cumsum() # if the new grid extrapolates (on the coarse side, mass will be lost, so we assume that when extrapolating. # the mass in the extrapolated fractions is zero. By inserting these records the spline will conform. x_extra = grid_vals[grid_vals > mass_cum.index.max()] cum_max: pd.Series = mass_cum.iloc[-1, :] mass_cum = mass_cum.reindex(index=mass_cum.index.append(pd.Index(x_extra))) # reindex to enable insert mass_cum.loc[x_extra, :] = cum_max.values # interpolate with a pchip spline to preserve mass chunks = [] for col in mass_cum: tmp = mass_cum[col].dropna() # drop any missing values new_vals = pchip_interpolate(tmp.index.values, tmp.values, grid_vals) chunks.append(new_vals) mass_cum_upsampled: pd.DataFrame = pd.DataFrame(chunks, index=mass_in.columns, columns=grid_vals).T # diff to recover the original fractional data mass_fractions_upsampled: pd.DataFrame = mass_cum_upsampled.diff().dropna(axis=0) # reconstruct the interval index from the right edges mass_fractions_upsampled.index = pd.IntervalIndex.from_arrays(left=[0] + list(mass_fractions_upsampled.index)[:-1], right=mass_fractions_upsampled.index, closed=df_intervals.index.closed, name=df_intervals.index.name) # convert from absolute to relative composition res = mass_to_composition(mass_fractions_upsampled, mass_wet=mass_wet, mass_dry=mass_dry).sort_index( ascending=False) return res
def _upsample_grid_by_factor(indx: pd.IntervalIndex, factor): # TODO: must be a better way than this - vectorised? grid_vals: List = [indx.left.min()] for interval in indx: increment = (interval.right - interval.left) / factor for i in range(0, factor): grid_vals.append(interval.left + (i + 1) * increment) grid_vals.sort() return grid_vals