Source code for elphick.sklearn_viz.features.outlier_detection

import logging
from typing import Dict, Optional, Union

import numpy as np
import pandas as pd
from scipy.stats import chi2
import plotly.graph_objects as go

from elphick.sklearn_viz.features import PrincipalComponents
from elphick.sklearn_viz.features.principal_components import PCResults
from elphick.sklearn_viz.features.scatter_matrix import plot_scatter_matrix
from elphick.sklearn_viz.utils import log_timer


[docs]def mahalanobis(x: pd.DataFrame, data: Optional[pd.DataFrame] = None, cov=None) -> pd.DataFrame: if data is None: data = x x_mu = x - np.mean(data) if not cov: cov = np.cov(data.values.T) inv_covmat = np.linalg.inv(cov) left = np.dot(x_mu, inv_covmat) mahal = np.dot(left, x_mu.T).diagonal() pvals = 1 - chi2.cdf(mahal, len(x.columns) - 1) res: pd.DataFrame = pd.DataFrame(np.vstack((mahal, pvals)).T, columns=['mahal_dist', 'p_val'], index=x.index) return res
[docs]def plot_outlier_matrix(x: pd.DataFrame, pca_spec: Union[float, int] = 0, p_val: float = 0.001, principal_components: bool = False) -> go.Figure: """Detect and plot outliers Args: x: X values for outlier detection. pca_spec: If zero, pca is not used. For integers (n) > 0 outlier detection is performed on the top n principal components. For values (f) < 1, outlier detection is performed on the number of principal components that explain f% of the variance. p_val: the p-value threshold for outlier detection. principal_components: If True (and pca_spec is not 0) the principal components will be plotted. Otherwise, will plot in the original feature space. """ return OutlierDetection(x=x, pca_spec=pca_spec, p_val=p_val).plot_outlier_matrix( principal_components=principal_components)
[docs]class OutlierDetection:
[docs] def __init__(self, x: pd.DataFrame, pca_spec: Union[float, int] = 0, standardise: bool = False, p_val: float = 0.001): """ Args: x: X values for outlier detection. pca_spec: If zero, pca is not used. For integers (n) > 0 outlier detection is performed on the top n principal components. For values (f) < 1, outlier detection is performed on the number of principal components that explain f% of the variance. standardise: If True, standardise the data prior to PCA, where vectors are transformed to zero mean and unit variance. p_val: the p-value threshold for outlier detection. """ self._logger = logging.getLogger(name=__class__.__name__) self.x: pd.DataFrame = x self.pca_spec: Union[float, int] = pca_spec self.standardise: bool = standardise self.p_val: float = p_val self._data: Optional[Dict] = None
@property @log_timer def data(self) -> Optional[Dict]: if self._data is not None: res = self._data else: label: str = 'std' if self.standardise else 'raw' res: Dict = {} if self.pca_spec != 0: res['pca'] = PrincipalComponents(self.x) pca_data: PCResults = res['pca'].data[label] if self.pca_spec >= 1: mahal = mahalanobis(x=pca_data.data.iloc[:, 0:self.pca_spec]) elif self.pca_spec < 1: num_required: int = next(i for i, v in enumerate(pca_data.explained_variance.cumsum() / 100 >= self.pca_spec) if v is True) + 1 mahal = mahalanobis(x=pca_data.data.iloc[:, 0:num_required]) else: raise ValueError("pca_spec cannot be negative") else: mahal = mahalanobis(x=self.x) res['mahal'] = mahal res['outlier'] = pd.Series(res['mahal']['p_val'] < self.p_val, name='outlier') self._data = res return res def plot_outlier_matrix(self, principal_components: bool = False) -> go.Figure: if principal_components: if 'pca' in self.data.keys(): fig = self.data['pca'].plot_scatter_matrix(original_features=True, y=self.data['outlier']) else: raise ValueError("Outliers not defined using PCA. Try changing pca_spec.") else: fig = plot_scatter_matrix(x=pd.concat([self.x, self.data['outlier']], axis=1), color='outlier', title="Outlier Scatter Matrix") return fig