Source code for elphick.sklearn_viz.model_selection.learning_curve

import logging
import math
import multiprocessing
from dataclasses import dataclass
from datetime import timedelta, datetime
from typing import Union, Optional, Iterable, Any, Callable

import matplotlib
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from joblib import Parallel, delayed
from plotly.subplots import make_subplots
from sklearn.base import is_classifier, is_regressor
from sklearn.model_selection import learning_curve, train_test_split, StratifiedKFold, KFold
from sklearn.pipeline import Pipeline

from elphick.sklearn_viz.utils import log_timer


[docs]@dataclass class LearningCurveResult: training_scores: np.ndarray validation_scores: np.ndarray training_sizes: np.ndarray metrics: dict[str, dict[str, np.ndarray]] = None def get_results(self) -> pd.DataFrame: col_names = [f"train_count_{n}" for n in self.training_sizes] train: pd.DataFrame = pd.DataFrame(self.training_scores.T, columns=col_names) val: pd.DataFrame = pd.DataFrame(self.validation_scores.T, columns=col_names) if self.metrics is not None: for metric_name in self.metrics.keys(): train_metric_df = pd.DataFrame(self.metrics[metric_name]['training'].T, columns=col_names) val_metric_df = pd.DataFrame(self.metrics[metric_name]['validation'].T, columns=col_names) train = pd.concat([train, train_metric_df], axis=1) val = pd.concat([val, val_metric_df], axis=1) return pd.concat([train.assign(dataset='training'), val.assign(dataset='validation')], axis='index').reset_index(drop=True) def get_scorer_results(self, dataset_type) -> np.ndarray: return self.training_scores if dataset_type == 'training' else self.validation_scores def get_metric_results(self, dataset_type, metric_name) -> np.ndarray: return self.metrics[metric_name][dataset_type] def get_plot_data(self, key, dataset_type) -> tuple: x = list(self.training_sizes) if key == 'scorer': data = self.get_scorer_results(dataset_type=dataset_type) else: data = self.get_metric_results(dataset_type=dataset_type, metric_name=key) y = np.mean(data, axis=1) y_sd = np.std(data, axis=1) y_lo = list(y - y_sd) y_hi = list(y + y_sd) y = list(y) return x, y, y_lo, y_hi
[docs]def plot_learning_curve(estimator, x: pd.DataFrame, y: Union[pd.DataFrame, pd.Series], cv: Union[int, Any] = 5, title: Optional[str] = None) -> go.Figure: """ Args: estimator: The scikit-learn model or pipeline. x: X values provided to calculate the learning curve. y: y values provided to calculate the learning curve. cv: The number of cross validation folds or cv callable. title: Optional plot title Returns: a plotly GraphObjects.Figure """ return LearningCurve(estimator=estimator, x=x, y=y, cv=cv).plot(title=title)
[docs]class LearningCurve:
[docs] def __init__(self, estimator, x: pd.DataFrame, y: Union[pd.DataFrame, pd.Series], train_sizes: Iterable = np.linspace(0.1, 1.0, 5), cv: Union[int, Any] = 5, metrics: Optional[dict[str, Callable]] = None, scorer: Optional[Any] = None, random_state: int = 42, n_jobs: int = 1): """ Args: estimator: The scikit-learn model or pipeline. x: X values provided to calculate the learning curve. y: y values provided to calculate the learning curve. train_sizes: list of training sample counts (or fractions if < 1) cv: The number of cross validation folds or a cv callable. metrics: Optional Dict of callable metrics to calculate post-fitting scorer: The scoring method. If None, 'accuracy' is used for classifiers and 'r2' for regressors. random_state: Optional random seed n_jobs: Number of parallel jobs to run. If -1, then the number of jobs is set to the number of CPU cores. Recommend setting to -2 for large jobs to retain a core for system interaction. verbosity: Verbosity level. 0 = silent, 1 = overall (start/finish), 2 = each cross-validation. """ self._logger = logging.getLogger(name=__class__.__name__) self.estimator = estimator self.X: Optional[pd.DataFrame] = x self.y: Optional[Union[pd.DataFrame, pd.Series]] = y self.train_sizes: Iterable = train_sizes self.cv: int = cv self.random_state: int = random_state self.n_jobs: int = n_jobs self.metrics = metrics self.is_pipeline: bool = isinstance(estimator, Pipeline) self.is_classifier: bool = is_classifier(estimator) self.is_regressor: bool = is_regressor(estimator) if scorer is None: scorer = 'accuracy' if self.is_classifier else 'r2' self.scorer: Optional[Any] = scorer self._results: Optional[pd.DataFrame] = None
# check_is_fitted(mdl[-1]) if self.is_pipeline else check_is_fitted(mdl) @property def n_cores(self) -> int: n_cores = self.n_jobs if self.n_jobs < 0: n_cores = multiprocessing.cpu_count() + 1 + self.n_jobs return n_cores @property def results(self) -> Optional[pd.DataFrame]: if self._results is None: start_time = datetime.now() # Record the start time self._logger.info("Commencing Cross Validation") results = self.calculate_learning_curve() duration = str(timedelta(seconds=round((datetime.now() - start_time).total_seconds()))) self._logger.info(f"Cross validation complete in {duration} using {self.n_cores} " f"worker{'s' if self.n_cores > 1 else ''}") self._results = results return self._results def calculate_learning_curve(self) -> LearningCurveResult: if self.metrics is None: # Use the scikit-learn learning_curve method train_size_abs, train_scores, val_scores = learning_curve(self.estimator, X=self.X, y=self.y, train_sizes=self.train_sizes, scoring=self.scorer, cv=self.cv, n_jobs=self.n_jobs) results: LearningCurveResult = LearningCurveResult(training_scores=train_scores, validation_scores=val_scores, training_sizes=train_size_abs) else: # Use the ModelSelection class with the provided metrics results: LearningCurveResult = self.custom_learning_curve() return results def custom_learning_curve(self) -> LearningCurveResult: train_scores: list = [] val_scores: list = [] train_size_abs: list = [] metrics: dict = {metric: {'training': [], 'validation': []} for metric in self.metrics.keys()} # Determine the cross-validation strategy based on the estimator type if self.is_classifier: cv = StratifiedKFold(n_splits=self.cv) else: cv = KFold(n_splits=self.cv) def process_train_size(train_size): train_scores_fold: list = [] val_scores_fold: list = [] metrics_fold: dict = {metric: {'training': [], 'validation': []} for metric in self.metrics.keys()} for i, (train_index, val_index) in enumerate(cv.split(self.X, self.y)): X_train, X_val = self.X.iloc[train_index], self.X.iloc[val_index] y_train, y_val = self.y.iloc[train_index], self.y.iloc[val_index] # Ensure that train_size doesn't exceed the size of the training set train_size = min(train_size, len(X_train)) if train_size <= 1: train_size = int(train_size * len(X_train)) else: train_size = int(train_size) X_train = X_train[:train_size] y_train = y_train[:train_size] if i == 0: train_size_abs.append(len(X_train)) self.estimator.fit(X_train, y_train) train_scores_fold.append(self.estimator.score(X_train, y_train)) val_scores_fold.append(self.estimator.score(X_val, y_val)) if self.metrics is not None: for metric_name_, metric_func in self.metrics.items(): train_metric = metric_func(y_train, self.estimator.predict(X_train)) val_metric = metric_func(y_val, self.estimator.predict(X_val)) metrics_fold[metric_name_]['training'].append(train_metric) metrics_fold[metric_name_]['validation'].append(val_metric) return train_scores_fold, val_scores_fold, metrics_fold, train_size_abs results = Parallel(n_jobs=self.n_jobs)( delayed(process_train_size)(train_size) for train_size in self.train_sizes) for train_scores_fold, val_scores_fold, metrics_fold, train_size_abs_fold in results: train_scores.append(train_scores_fold) val_scores.append(val_scores_fold) train_size_abs.append(train_size_abs_fold) for metric_name in metrics.keys(): metrics[metric_name]['training'].append(metrics_fold[metric_name]['training']) metrics[metric_name]['validation'].append(metrics_fold[metric_name]['validation']) # Convert lists to numpy arrays for metric_name in metrics.keys(): metrics[metric_name]['training'] = np.array(metrics[metric_name]['training']) metrics[metric_name]['validation'] = np.array(metrics[metric_name]['validation']) return LearningCurveResult(training_scores=np.array(train_scores), validation_scores=np.array(val_scores), training_sizes=np.array(train_size_abs).ravel(), metrics=metrics)
[docs] def plot(self, title: Optional[str] = None, metrics: Optional[list[str]] = None, col_wrap: int = 1, plot_scorer: bool = True) -> go.Figure: """Create the plot Args: title: title for the plot metrics: Optional list of metric keys to plot col_wrap: The number of columns to use for the facet grid if plotting metrics. plot_scorer: If True, plot the scorer. Use False to plot only the metrics. Returns: a plotly GraphObjects.Figure """ # Determine the number of plots to create, their keys, and titles total_plots = 0 plot_keys = [] subplot_titles = [] if plot_scorer: total_plots += 1 plot_keys.append('scorer') subplot_titles.append(str(self.scorer)) if metrics: total_plots += len(metrics) plot_keys += metrics subplot_titles += metrics num_rows, num_cols, subplot_order = self.calculate_grid_and_subplot_order(total_plots, col_wrap) fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=subplot_titles) subtitle: str = f'Cross Validation: {self.cv}' if title is None: title = subtitle else: title = title + '<br>' + subtitle for key, y_label, (row, col) in zip(plot_keys, subplot_titles, subplot_order): self._add_subplot(fig=fig, key=key, y_label=y_label, row=row, col=col) fig.update_layout(title=title, showlegend=True) return fig
@staticmethod def calculate_grid_and_subplot_order(total_plots, col_wrap): num_cols = min(total_plots, col_wrap) num_rows = math.ceil(total_plots / num_cols) subplot_order = [(row, col) for row in range(1, num_rows + 1) for col in range(1, num_cols + 1)] subplot_order = subplot_order[:total_plots] # Trim to the actual number of plots return num_rows, num_cols, subplot_order def _add_subplot(self, fig: go.Figure, key: str, y_label: str, row: int, col: int) -> go.Figure: x, y_train, y_train_lo, y_train_hi = self.results.get_plot_data(key=key, dataset_type='training') x, y_val, y_val_lo, y_val_hi = self.results.get_plot_data(key=key, dataset_type='validation') # Add legend only for the first subplot show_legend = (row == 1 and col == 1) fig.add_trace(go.Scatter( x=x, y=y_train, line=dict(color='royalblue'), mode='lines', name='training', showlegend=show_legend, ), row=row, col=col) fig.add_trace(go.Scatter( x=x, y=y_val, line=dict(color='orange'), mode='lines', name='validation', showlegend=show_legend, ), row=row, col=col) fig.add_trace(go.Scatter( x=x + x[::-1], # x, then x reversed y=y_train_hi + y_train_lo[::-1], # upper, then lower reversed fill='toself', fillcolor=f"rgba{str(matplotlib.colors.to_rgba('royalblue', 0.4))}", line=dict(color='rgba(255,255,255,0)'), hoverinfo="skip", showlegend=show_legend, name='training error +/- 1SD' ), row=row, col=col) fig.add_trace(go.Scatter( x=x + x[::-1], # x, then x reversed y=y_val_hi + y_val_lo[::-1], # upper, then lower reversed fill='toself', fillcolor="rgba(255, 165, 0, 0.5)", line=dict(color='rgba(255,255,255,0)'), hoverinfo="skip", showlegend=show_legend, name='validation error +/- 1SD' ), row=row, col=col) fig.update_xaxes(title_text="Number of training samples", row=row, col=col) fig.update_yaxes(title_text=y_label, row=row, col=col) return fig