Source code for elphick.sklearn_viz.model_selection.cross_validation

import logging
import math
import multiprocessing
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Any, Union, Optional
from typing import List

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator, is_classifier, is_regressor
from sklearn.model_selection import KFold, cross_validate

from elphick.sklearn_viz.model_selection.metrics import classification_metrics, regression_metrics
from elphick.sklearn_viz.model_selection.scorers import classification_scorers, regression_scorers


[docs]@dataclass class CrossValidationResult: test_scores: List[float] train_scores: List[float] fit_times: List[float] score_times: List[float] estimator: List[Any] metrics: Dict[str, List[float]] metrics_group: Dict[str, Dict[str, List[float]]]
[docs]class CrossValidatorBase(ABC):
[docs] def __init__(self, estimators: Union[BaseEstimator, Dict[str, BaseEstimator]], datasets: Union[pd.DataFrame, Dict[str, pd.DataFrame]], target: str, pre_processor: Optional[Any], cv: Union[int, Any], scorer: Any, metrics: Optional[Dict[str, Any]], group: Any, random_state: int, n_jobs: Union[int, str] = 1, verbosity: int = 1): self._logger = logging.getLogger(self.__class__.__name__) # If algorithms is not a dictionary, convert it into a dictionary with a default key if not isinstance(estimators, dict): estimators = {'estimator': estimators} # If datasets is not a dictionary, convert it into a dictionary with a default key if not isinstance(datasets, dict): datasets = {'dataset': datasets} # Check if all estimators are classifiers or regressors self.is_classifier = all(is_classifier(estimator) for estimator in estimators.values()) self.is_regressor = all(is_regressor(estimator) for estimator in estimators.values()) if not self.is_classifier and not self.is_regressor: raise ValueError("All estimators must be either classifiers or regressors.") if scorer is None: scorer = 'accuracy' if self.is_classifier else 'r2' if metrics is None: default_metrics = { 'classification': classification_metrics, 'regression': regression_metrics } metrics_type = 'classification' if self.is_classifier else 'regression' metrics = default_metrics.get(metrics_type) self.estimators: Dict[str, BaseEstimator] = estimators self.datasets: Dict[str, pd.DataFrame] = datasets self.target: str = target self.pre_processor: Optional[Any] = pre_processor self.cv: Union[int, Any] = cv self.scorer: Any = scorer self.metrics: Dict[str, Any] = metrics self.group: Any = group self.random_state: int = random_state self.n_jobs: int = n_jobs self.verbosity: int = verbosity self._logger: logging.Logger = logging.getLogger(self.__class__.__name__) self._results: Optional[CrossValidationResult] = None self.features_in: List[str] = [col for col in self.datasets[list(self.datasets.keys())[0]] if col != self.target] self._data: Optional[Dict] = None self._num_algorithms: int = len(list(self.estimators.keys())) self._num_datasets: int = len(list(self.datasets.keys()))
@property def n_cores(self) -> int: n_cores = self.n_jobs if self.n_jobs < 0: n_cores = multiprocessing.cpu_count() + 1 + self.n_jobs return n_cores @property def results(self) -> Optional[Dict[str, Dict[str, CrossValidationResult]]]: if self._results is None: start_time = datetime.now() # Record the start time if self.verbosity > 0: self._logger.info(f"Commencing cross validation...") d_results: Dict = {data_key: {algo_key: {} for algo_key in self.estimators.keys()} for data_key in self.datasets.keys()} # Run the tasks in a loop for data_key, data in self.datasets.items(): for estimator_key, estimator in self.estimators.items(): data_key, estimator_key, res = self.cross_validate_task(data_key, data, estimator_key, estimator) d_results[data_key][estimator_key] = res self._results = d_results if self.verbosity > 0: duration = str(timedelta(seconds=round((datetime.now() - start_time).total_seconds()))) self._logger.info(f"Cross validation complete in {duration} using {self.n_cores} " f"worker{'s' if self.n_cores > 1 else ''}") return self._results def cross_validate_task(self, data_key, data, estimator_key, estimator): start_time = datetime.now() # Record the start time if self.verbosity > 1: self._logger.info(f"Starting cross-validation for {data_key} with {estimator_key}") # Ensure that only the features present in the dataset are used features_in_dataset = list(set(self.features_in).intersection(set(data.columns))) x: pd.DataFrame = data[features_in_dataset] y: pd.DataFrame = data[self.target] if self.pre_processor: x = self.pre_processor.set_output(transform="pandas").fit_transform(X=x) if isinstance(self.cv, int): cv = KFold(n_splits=self.cv, random_state=self.random_state, shuffle=True) else: cv = self.cv res = cross_validate(estimator, x, y, cv=cv, scoring=self.scorer, return_train_score=True, return_estimator=True, return_indices=True, error_score=np.nan, n_jobs=self.n_jobs) if self.metrics is not None: res['metrics'], res['metrics_group'] = self.calculate_metrics(x=x, y=y, estimators=res['estimator'], indices=res['indices'], group=self.group) if self.verbosity > 1: duration = str(timedelta(seconds=round((datetime.now() - start_time).total_seconds()))) res_mean = res[f"test_score"].mean() res_std = res[f"test_score"].std() # Format the duration self._logger.info(f"Finished cross-validation for {data_key} with {estimator_key}." f" Mean = {res_mean}, SD = {res_std}, Duration: {duration}") # Convert the results to a CrossValidationResult instance and return it return data_key, estimator_key, CrossValidationResult( test_scores=res['test_score'], train_scores=res['train_score'], fit_times=res['fit_time'], score_times=res['score_time'], estimator=res['estimator'], metrics=res['metrics'], metrics_group=res['metrics_group'] ) @abstractmethod def get_cv_scores(self): chunks: List = [] for data_key, data in self.datasets.items(): for estimator_key, estimator in self.estimators.items(): chunks.append( pd.Series(self.results[data_key][estimator_key].test_scores, name=(data_key, estimator_key))) return pd.concat(chunks, axis=1) @abstractmethod def get_cv_metrics(self, metrics, by_group: bool = False) -> pd.DataFrame: chunks: List = [] metric_key = "metrics_group" if by_group else "metrics" for data_key, data in self.datasets.items(): for estimator_key, estimator in self.estimators.items(): for metric in metrics: # Access the metrics or metrics_group property of the CrossValidationResult instance metric_data = getattr(self.results[data_key][estimator_key], metric_key)[metric] chunks.append(pd.DataFrame(metric_data).assign( **dict(data_key=data_key, algo_key=estimator_key, metric=metric))) res: pd.DataFrame = pd.concat(chunks, axis=0).set_index(['data_key', 'algo_key'], append=True).rename( columns={0: 'value'}) res.index.names = ['fold', 'data_key', 'algo_key'] return res @abstractmethod def calculate_metrics(self, x, y, estimators, indices, group): metric_results: Dict = {} metric_results_group: Dict = {} for k, fn_metric in self.metrics.items(): metric_values: List = [] metric_groups: Dict = {} for estimator, test_indexes in zip(estimators, indices['test']): y_true = y[y.index[test_indexes]] y_est = estimator.predict(x.loc[x.index[test_indexes], :]) if isinstance(y_est, pd.DataFrame) and y_est.shape[1] == 1: y_est = y_est.iloc[:, 0] metric_values.append(fn_metric(y_true, y_est)) if group is not None: # calculate the metric by each group in the group series. y_est = pd.merge(left=pd.Series(y_est, name='y_est', index=x.index[test_indexes]), right=group, left_index=True, right_index=True) y_est_grouped = y_est.groupby([group.name]) grouped_results = [y_est_grouped.get_group(x) for x in y_est_grouped.groups] for grp_res in grouped_results: group_value = str(grp_res[group.name].iloc[0]) group_metric_results = fn_metric(y_true[grp_res.index], grp_res['y_est'].values) if group_value not in metric_groups.keys(): metric_groups[group_value] = [group_metric_results] else: metric_groups[group_value].append(group_metric_results) metric_results[k] = metric_values if group is not None: metric_results_group[k] = metric_groups return metric_results, metric_results_group def get_model_by_group_data(self, estimator, dataset) -> tuple[pd.DataFrame, pd.DataFrame]: results: dict = {} by_group_score_chunks: list = [] by_group_metric_chunks: list = [] for grp in self.group.unique(): grp_index: pd.Index = self.group.loc[self.group == grp].index x: pd.DataFrame = self.datasets[dataset][self.features_in].loc[grp_index] y: pd.DataFrame = self.datasets[dataset][self.target].loc[grp_index] if self.pre_processor: x = self.pre_processor.set_output(transform="pandas").fit_transform(X=x) cv = self.cv if isinstance(self.cv, int): cv = KFold(n_splits=self.cv, random_state=self.random_state, shuffle=True) res = cross_validate(self.estimators[estimator], x, y, cv=cv, scoring=self.scorer, return_estimator=True, return_indices=True, n_jobs=self.n_jobs) by_group_score_chunks.append(pd.Series(res['test_score'], name=grp)) if self.metrics is not None: res['metrics'], _ = self.calculate_metrics(x=x, y=y, estimators=res['estimator'], indices=res['indices'], group=None) results[grp] = res by_group_metric_chunks.append( pd.DataFrame(res['metrics']).assign(group=grp).rename_axis('fold', axis='index')) by_group_metrics: pd.DataFrame = pd.concat(by_group_metric_chunks) by_group_scores = pd.concat(by_group_score_chunks, axis=1) return by_group_metrics, by_group_scores @abstractmethod def plot(self, metrics=None, show_group=False, title=None, col_wrap=None): pass