Usage

In this notebook we will see how to:

  • cross-validate multiple binary classifiers;

  • accumulate ground-truths and predictions

  • assess calibration using the modelsight’s hosmer_lemeshow_plot function.

  • generate average ROC curves and test statistical significance of differences in AUCS between pairs of classifiers.

import os
import sys

sys.path.insert(0, os.path.abspath("../"))

Miscellaneous

import random
import numpy as np
from dataclasses import dataclass, field
from numpy.random import SeedSequence, BitGenerator, Generator
from typing import Union, List, Tuple, Annotated, Literal, TypeVar, Optional
from sklearn.base import BaseEstimator  # type: ignore
from sklearn.model_selection._split import BaseCrossValidator, _RepeatedSplits

try:
    from numpy.typing import ArrayLike
except ImportError:
    ArrayLike = Union[np.ndarray, List[List[float]]]

if sys.version_info >= (3, 10):
    from typing import TypeAlias
else:
    from typing_extensions import TypeAlias


Estimator = TypeVar("Estimator", bound=BaseEstimator)
SeedType = Union[None, int, ArrayLike, SeedSequence, BitGenerator, Generator]
CVScheme: TypeAlias = TypeVar("CVScheme", BaseCrossValidator, _RepeatedSplits)


@dataclass
class CVModellingOutput:    
    gts_train: ArrayLike
    gts_val: ArrayLike
    gts_train_conc: ArrayLike
    gts_val_conc: ArrayLike

    # predicted probabilities
    probas_train: ArrayLike
    probas_val: ArrayLike
    probas_train_conc: ArrayLike
    probas_val_conc: ArrayLike
    
    # misc
    models: List[Estimator]
    errors: Optional[ArrayLike]
    correct: Optional[ArrayLike]
    features: Optional[ArrayLike]

class Colors:
    blue = "#4974a5"
    lightblue = "#b0c4de"
    salmon = "#ff8c69"
    lightsalmon = "#FFD1C2"
    darksalmon = "#e57e5e"

    green2 = "#82B240"
    charcoal = "#31485D"
    gray = "#A3BAC3"
    white = "#EAEBED"

    yellow = "#FFC325"
    red = "#E31B23"
    blue = "#005CAB"
    violet = "#9A348E"

Load dataset

In this example, we will use the Breast Cancer Wisconsin dataset, which consists of features computed from a digitized image of a fine needle aspirate of a breast mass. It’s a binary classification problem where the dependent variable is the biopsy’s outcome: M is malignant and B is benign.

We can load the dataset using scikit-learn.datasets.load_breast_cancer.

from sklearn.datasets import load_breast_cancer
X, y = load_breast_cancer(return_X_y=True, as_frame=True)
y.value_counts(normalize=True)
target
1    0.627417
0    0.372583
Name: proportion, dtype: float64

We can see that 62.7% of breast cancers were malignant.

X.isna().sum()
mean radius                0
mean texture               0
mean perimeter             0
mean area                  0
mean smoothness            0
mean compactness           0
mean concavity             0
mean concave points        0
mean symmetry              0
mean fractal dimension     0
radius error               0
texture error              0
perimeter error            0
area error                 0
smoothness error           0
compactness error          0
concavity error            0
concave points error       0
symmetry error             0
fractal dimension error    0
worst radius               0
worst texture              0
worst perimeter            0
worst area                 0
worst smoothness           0
worst compactness          0
worst concavity            0
worst concave points       0
worst symmetry             0
worst fractal dimension    0
dtype: int64

This is a toy dataset, hence it’s super clean with no missing values.

Train and validate model within a cross-validation scheme

import pandas as pd
from sklearn.base import clone
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold, RepeatedStratifiedKFold
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import roc_curve
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from interpret.glassbox import ExplainableBoostingClassifier
class BinaryModelFactory:
    def get_model(self, model_name: str) -> Estimator:
        if model_name == "EBM":
            return ExplainableBoostingClassifier(random_state=cv_config.get("SEED"),
                                        interactions=6,
                                        learning_rate=0.02,
                                        min_samples_leaf=5,
                                        n_jobs=-1)
        elif model_name == "LR":
            return LogisticRegression(penalty="elasticnet",
                                 solver="saga",
                                 l1_ratio=0.3,
                                 max_iter=10000,
                                 random_state=cv_config.get("SEED"))
        elif model_name == "SVC": 
            return SVC(probability=True, 
                   class_weight="balanced", 
                   random_state=cv_config.get("SEED"))
        elif model_name == "RF": 
            return RandomForestClassifier(random_state=cv_config.get("SEED"),
                                     n_estimators=5,
                                     max_depth=3,
                                     n_jobs=-1)
        elif model_name == "KNN": 
            return KNeighborsClassifier(n_jobs=-1)        
        else:
            raise ValueError(f"{model_name} is not a valid estimator name.")
def get_calibrated_model(model: Estimator, 
                         X: ArrayLike, y: ArrayLike) -> Estimator:
    """Calibrate an already fitted model using data X, y.

    Parameters
    ----------
    model : Estimator
        Already fitted model that should be calibrated
    X : ArrayLike
        Design matrix of n features and m observations.
    y : ArrayLike
        Array of m ground-truths

    Returns
    -------
    Estimator
        The calibrated model
    """
    calib_model = CalibratedClassifierCV(estimator=model,
                                         method="sigmoid",
                                         n_jobs=10,
                                         cv="prefit")
    calib_model.fit(X, y)
    
    return calib_model


def get_feature_selector(seed: SeedType) -> Estimator:
    fs = RandomForestClassifier(random_state=seed,
                                n_estimators=10,
                                max_depth=3,
                                n_jobs=4,
                                min_samples_leaf=2,
                                min_samples_split=3)
    return fs


def select_features(X: ArrayLike,
                    y: ArrayLike,
                    selector: Estimator,
                    cv: CVScheme,
                    scale: bool,
                    frac: float) -> set[str]:
    """Custom feature selection using a feature selector (in our case
    a Random Forest) and cross-validation. We select all features that
    are associated with a Gini impurity reduction at least 25% greater
    than the mean value.

    Parameters
    ----------
    X : ArrayLike
        An array of features values
    y : ArrayLike
        An array of ground truths
    selector : Estimator
        The feature selector. In our case is a Random Forest Classifier
        and we will use the built-in feature importances.
    cv : CVScheme
        A cross-validation scheme for selecting features.
    scale: bool
        Whether data should be scaled according to a StandardScaler
    frac: float
        Fraction of mean importance 

    Returns
    -------
    The set union of all selected features
    """
    inner_feature_sets = dict()

    for ji, (train_idxs_inner, _) in enumerate(cv.split(X, y)):
        # print(f"Inner split no. {ji+1}")
        inner_split = f"inner_split_{ji}"
        inner_feature_sets[inner_split] = set()

        # select inner training and validation folds
        X_train_inner = X.iloc[train_idxs_inner, :].copy()
        if not isinstance(y, pd.Series):
            y = pd.Series(y)

        y_train_inner = y.iloc[train_idxs_inner]

        # X_val_inner, y_val_inner = X.iloc[val_idxs_inner].copy(), y[val_idxs_inner]

        if scale:
            numeric_cols = X_train_inner.select_dtypes(
                include=[np.float64, np.int64]).columns.tolist()
            scaler = StandardScaler()
            X_train_inner.loc[:, numeric_cols] = scaler.fit_transform(
                X_train_inner.loc[:, numeric_cols])
            # X_val_inner.loc[:, numeric_cols] = scaler.transform(X_val_inner.loc[:, numeric_cols])

        # feature selection
        selected_features = set()

        feat_selector = clone(selector)
        feat_selector.fit(X_train_inner, y_train_inner)

        selected_features = list(map(lambda t: t[0], list(filter(
            lambda w: w[1] > frac*feat_selector.feature_importances_.mean(), zip(X.columns, feat_selector.feature_importances_)))))

        if not selected_features:
            continue

        # Check that each output feature is present in the dataset's columns.
        diff = set.difference(set(selected_features),
                              set(X_train_inner.columns))
        if diff:
            raise ValueError(
                F"Could not find features {DIFF} in the dataframe.")

        if not isinstance(selected_features, set):
            selected_features = set(selected_features)

        inner_feature_sets[inner_split] = selected_features

    return sorted(set.union(*inner_feature_sets.values()))
cv_config = {
    "N_REPEATS": 2,
    "N_SPLITS": 10,
    "SHUFFLE": True,
    "SCALE": False,
    "CALIBRATE": True,
    "CALIB_FRACTION": 0.15,
    "SEED": 1303
}

outer_cv = RepeatedStratifiedKFold(n_repeats=cv_config.get("N_REPEATS"),
                                   n_splits=cv_config.get("N_SPLITS"),
                                   random_state=cv_config.get("SEED"))

inner_cv = StratifiedKFold(n_splits=cv_config.get("N_SPLITS"),
                           shuffle=cv_config.get("SHUFFLE"),
                           random_state=cv_config.get("SEED"))

models_names = ["EBM", "LR", "SVC", "RF", "KNN"]

model_factory = BinaryModelFactory()
cv_results = dict()

for model_name in models_names:
    print(f"Processing model {model_name}\n")

    gts_train = []
    gts_val = []
    probas_train = []
    probas_val = []
    gts_train_conc = []
    gts_val_conc = []
    probas_train_conc = []
    probas_val_conc = []

    models = []
    errors = []
    correct = []
    features = []
        
    for i, (train_idx, val_idx) in enumerate(outer_cv.split(X, y)):
        Xtemp, ytemp = X.iloc[train_idx, :], y.iloc[train_idx]
        Xval, yval = X.iloc[val_idx, :], y.iloc[val_idx]

        if cv_config.get("CALIBRATE"):
            Xtrain, Xcal, ytrain, ycal = train_test_split(Xtemp, ytemp,
                                                          test_size=cv_config.get(
                                                              "CALIB_FRACTION"),
                                                          stratify=ytemp,
                                                          random_state=cv_config.get("SEED"))
        else:
            Xtrain, ytrain = Xtemp, ytemp

        model = model_factory.get_model(model_name)

        # select features
        feat_subset = select_features(Xtrain, ytrain,
                                      selector=get_feature_selector(
                                          cv_config.get("SEED")),
                                      cv=inner_cv,
                                      scale=False,
                                      frac=1.25)
        features.append(feat_subset)

        if cv_config.get("SCALE"):
            numeric_cols = Xtrain.select_dtypes(
                include=[np.float64, np.int64]).columns.tolist()
            scaler = StandardScaler()
            Xtrain.loc[:, numeric_cols] = scaler.fit_transform(
                Xtrain.loc[:, numeric_cols])
            Xtest.loc[:, numeric_cols] = scaler.transform(
                Xtest.loc[:, numeric_cols])

        model.fit(Xtrain.loc[:, feat_subset], ytrain)

        if cv_config.get("CALIBRATE"):
            model = get_calibrated_model(model,
                                         X=Xcal.loc[:, feat_subset],
                                         y=ycal)

        models.append(model)

        # accumulate ground-truths
        gts_train.append(ytrain)
        gts_val.append(yval)

        # accumulate predictions
        train_pred_probas = model.predict_proba(Xtrain.loc[:, feat_subset])[:, 1]
        val_pred_probas = model.predict_proba(Xval.loc[:, feat_subset])[:, 1]

        probas_train.append(train_pred_probas)
        probas_val.append(val_pred_probas)

        # identify correct and erroneous predictions according to the
        # classification cut-off that maximizes the Youden's J index
        fpr, tpr, thresholds = roc_curve(ytrain, train_pred_probas)
        idx = np.argmax(tpr - fpr)
        youden = thresholds[idx]

        labels_val = np.where(val_pred_probas >= youden, 1, 0)

        # indexes of validation instances misclassified by the model
        error_idxs = Xval[(yval != labels_val)].index
        errors.append(error_idxs)

        # indexes of correct predictions
        correct.append(Xval[(yval == labels_val)].index)

    # CV results for current model
    curr_est_results = CVModellingOutput(
        gts_train=gts_train,
        gts_val=gts_val,
        probas_train=probas_train,
        probas_val=probas_val,
        gts_train_conc=np.concatenate(gts_train),
        gts_val_conc=np.concatenate(gts_val),
        probas_train_conc=np.concatenate(probas_train),
        probas_val_conc=np.concatenate(probas_val),
        models=models,
        errors=errors,
        correct=correct,
        features=features
    )
    
    cv_results[model_name] = curr_est_results
Processing model EBM
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[9], line 57
     54 model = model_factory.get_model(model_name)
     56 # select features
---> 57 feat_subset = select_features(Xtrain, ytrain,
     58                               selector=get_feature_selector(
     59                                   cv_config.get("SEED")),
     60                               cv=inner_cv,
     61                               scale=False,
     62                               frac=1.25)
     63 features.append(feat_subset)
     65 if cv_config.get("SCALE"):

Cell In[8], line 99, in select_features(X, y, selector, cv, scale, frac)
     96 feat_selector = clone(selector)
     97 feat_selector.fit(X_train_inner, y_train_inner)
---> 99 selected_features = list(map(lambda t: t[0], list(filter(
    100     lambda w: w[1] > frac*feat_selector.feature_importances_.mean(), zip(X.columns, feat_selector.feature_importances_)))))
    102 if not selected_features:
    103     continue

Cell In[8], line 100, in select_features.<locals>.<lambda>(w)
     96 feat_selector = clone(selector)
     97 feat_selector.fit(X_train_inner, y_train_inner)
     99 selected_features = list(map(lambda t: t[0], list(filter(
--> 100     lambda w: w[1] > frac*feat_selector.feature_importances_.mean(), zip(X.columns, feat_selector.feature_importances_)))))
    102 if not selected_features:
    103     continue

File ~/checkouts/readthedocs.org/user_builds/modelsight/envs/latest/lib/python3.10/site-packages/sklearn/ensemble/_forest.py:627, in BaseForest.feature_importances_(self)
    606 """
    607 The impurity-based feature importances.
    608 
   (...)
    623     array of zeros.
    624 """
    625 check_is_fitted(self)
--> 627 all_importances = Parallel(n_jobs=self.n_jobs, prefer="threads")(
    628     delayed(getattr)(tree, "feature_importances_")
    629     for tree in self.estimators_
    630     if tree.tree_.node_count > 1
    631 )
    633 if not all_importances:
    634     return np.zeros(self.n_features_in_, dtype=np.float64)

File ~/checkouts/readthedocs.org/user_builds/modelsight/envs/latest/lib/python3.10/site-packages/sklearn/utils/parallel.py:65, in Parallel.__call__(self, iterable)
     60 config = get_config()
     61 iterable_with_config = (
     62     (_with_config(delayed_func, config), args, kwargs)
     63     for delayed_func, args, kwargs in iterable
     64 )
---> 65 return super().__call__(iterable_with_config)

File ~/checkouts/readthedocs.org/user_builds/modelsight/envs/latest/lib/python3.10/site-packages/joblib/parallel.py:1952, in Parallel.__call__(self, iterable)
   1946 # The first item from the output is blank, but it makes the interpreter
   1947 # progress until it enters the Try/Except block of the generator and
   1948 # reach the first `yield` statement. This starts the aynchronous
   1949 # dispatch of the tasks to the workers.
   1950 next(output)
-> 1952 return output if self.return_generator else list(output)

File ~/checkouts/readthedocs.org/user_builds/modelsight/envs/latest/lib/python3.10/site-packages/joblib/parallel.py:1595, in Parallel._get_outputs(self, iterator, pre_dispatch)
   1592     yield
   1594     with self._backend.retrieval_context():
-> 1595         yield from self._retrieve()
   1597 except GeneratorExit:
   1598     # The generator has been garbage collected before being fully
   1599     # consumed. This aborts the remaining tasks if possible and warn
   1600     # the user if necessary.
   1601     self._exception = True

File ~/checkouts/readthedocs.org/user_builds/modelsight/envs/latest/lib/python3.10/site-packages/joblib/parallel.py:1707, in Parallel._retrieve(self)
   1702 # If the next job is not ready for retrieval yet, we just wait for
   1703 # async callbacks to progress.
   1704 if ((len(self._jobs) == 0) or
   1705     (self._jobs[0].get_status(
   1706         timeout=self.timeout) == TASK_PENDING)):
-> 1707     time.sleep(0.01)
   1708     continue
   1710 # We need to be careful: the job list can be filling up as
   1711 # we empty it and Python list are not thread-safe by
   1712 # default hence the use of the lock

KeyboardInterrupt: 

Average ROC curves

import matplotlib.pyplot as plt
from modelsight.curves import average_roc_curves, roc_comparisons, add_annotations


model_names = list(cv_results.keys())
alpha = 0.05
alph_str = str(alpha).split(".")[1]
alpha_formatted = f".{alph_str}"
roc_symbol = "*"
palette = [Colors.green2, Colors.blue, Colors.yellow, Colors.violet, Colors.darksalmon]
n_boot = 100

f, ax = plt.subplots(1, 1, figsize=(8, 8))

kwargs = dict()

f, ax, barplot, bars, all_data = average_roc_curves(cv_results,
                                                    colors=palette,
                                                    model_keys=model_names,
                                                    show_ci=True,
                                                    n_boot=n_boot,
                                                    bars_pos=[
                                                        0.3, 0.01, 0.6, 0.075*len(model_names)],
                                                    random_state=cv_config.get("SEED"),
                                                    ax=ax,
                                                    **kwargs)

roc_comparisons_results = roc_comparisons(cv_results, "EBM")

kwargs = dict(space_between_whiskers = 0.07)
order = [
    ("EBM", "RF"),
    ("EBM", "SVC"),
    ("EBM", "LR"),
    ("EBM", "KNN")
]
ax_annot = add_annotations(roc_comparisons_results, 
                  alpha = 0.05, 
                  bars=bars, 
                  direction = "vertical",
                  order = order,
                  symbol = roc_symbol,
                  symbol_fontsize = 30,
                  voffset = -0.05,
                  ext_voffset=0,
                  ext_hoffset=0,
                  ax=barplot,
                  **kwargs)


#plt.savefig("roc.png", dpi=300, bbox_inches='tight')
plt.show()
_images/cf4694618592d4b51238fef68e136c260c599865f78fce705b037c8bd8a113f7.png

Model calibration

We will now compute median Brier score (95% CI) of the ExplainableBoostingClassifier and use it to annotate the calibration plot.

from sklearn.metrics import brier_score_loss
briers = []
for gt, preds in zip(cv_results["EBM"].gts_val, cv_results["EBM"].probas_val):
    brier = brier_score_loss(gt, preds)
    briers.append(brier)

brier_low, brier_med, brier_up = np.percentile(briers, [2.5, 50, 97.5])

brier_annot = f"{brier_med:.2f} ({brier_low:.2f} - {brier_up:.2f})"

Let’s see how well calibrated are the predicted probabilities

import matplotlib.pyplot as plt
from modelsight.calibration import hosmer_lemeshow_plot

f, ax = plt.subplots(1, 1, figsize=(11,6))

f, ax = hosmer_lemeshow_plot(cv_results["EBM"].gts_val_conc,
                             cv_results["EBM"].probas_val_conc,
                             n_bins=10,
                             colors=(Colors.darksalmon, Colors.green2),
                             annotate_bars=True,
                             title="",
                             brier_score_annot=brier_annot,
                             ax=ax
                             )

#plt.savefig("calib.png", dpi=300, bbox_inches='tight')
plt.show()
_images/7701a50df038065a702d91a43af95251a5accdd4ad6e94da642b375c54bb03a7.png

The model appears to be perfectly calibrated, as predicted probabilites match observed ones.

Median brier score is also very low, indicating a good, calibrated classifier.