import logging
import pandas as pd
try:
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
if SKLEARN_AVAILABLE:
class PandasPipeline(Pipeline, RegressorMixin):
def __init__(self, steps, memory=None, verbose=False):
super().__init__(steps, memory=memory, verbose=verbose)
self._logger = logging.getLogger(__class__.__name__)
self.feature_names_in__ = None
self.feature_names_out_ = None
self.set_output(transform='pandas')
[docs] def fit(self, X, y=None, **fit_params):
if not isinstance(X, pd.DataFrame) or not isinstance(y, pd.DataFrame):
raise ValueError("Input X and y must be pandas DataFrame")
self.feature_names_in__ = X.columns.to_list()
self.feature_names_out_ = y.columns.tolist()
super().fit(X, y, **fit_params)
return self
[docs] def predict(self, X: pd.DataFrame) -> pd.DataFrame:
if not isinstance(X, pd.DataFrame):
raise ValueError("Input X must be pandas DataFrame")
# ignore any features that the model was not fitted on and log
if self.feature_names_in__ is not None and any([col not in self.feature_names_in__ for col in X.columns]):
missing_features = [col for col in X.columns if col not in self.feature_names_in__]
self._logger.info(f"Features {missing_features} were passed but are ignored since they"
f" are not required by the model")
X = X.copy().drop(columns=missing_features)
predictions = super().predict(X)
return pd.DataFrame(predictions, columns=self.feature_names_out_, index=X.index)
[docs] def score(self, X: pd.DataFrame, y: pd.DataFrame) -> float:
# ignore any features that the model was not fitted on and log
if self.feature_names_in__ is not None and any([col not in self.feature_names_in__ for col in X.columns]):
missing_features = [col for col in X.columns if col not in self.feature_names_in__]
self._logger.info(f"Features {missing_features} were passed but are ignored since they"
f" are not required by the model")
X = X.copy().drop(columns=missing_features)
# Call the parent class's score method
return super().score(X, y)
[docs] def get_feature_names_out(self):
return self.feature_names_out_
@classmethod
def from_pipeline(cls, pipeline):
return PandasPipeline(pipeline.steps)
else:
[docs] class PandasPipeline:
[docs] def __init__(self, *args, **kwargs):
raise ImportError("sklearn is not installed but is required for PandasPipeline. Please install it.")