Source code for mlproject.postprocess.feature_importances

"""
Functions for getting feature importances data from ARFS, PFI and SHAP values from trained models
"""

import os
import pickle
import numpy as np
import pandas as pd
import shap
from modnet.preprocessing import MODData
from modnet.models.vanilla import MODNetModel
from sklearn.model_selection import KFold
from sklearn.inspection import permutation_importance
from mlproject.data.preprocessing import get_dataset
from tqdm import tqdm


[docs] def process_dataset(data: MODData, model: MODNetModel): """ Process MODData to get scaled feature array for a given MODNetModel. Parameters ---------- data : MODData The MODData object containing featurized data. model : MODNetModel The MODNet model for which to process the data. Returns ------- x : np.ndarray The processed and scaled feature array. """ x = data.get_featurized_df()[model.optimal_descriptors[: model.n_feat]] x = model._scaler.transform(x) x = np.nan_to_num(x) return x
[docs] def aggregate_importances( dfs: list[pd.DataFrame], value_col: str, all_features: list[str] ) -> pd.DataFrame: """ Aggregate feature importance DataFrames across folds. Parameters ---------- dfs : list[pd.DataFrame] List of DataFrames containing feature importances. value_col : str Column name for importance values to aggregate. all_features : list[str] List of all features to ensure consistent indexing. Returns ------- df_agg : pd.DataFrame Aggregated DataFrame with mean and std of importances. """ df_all = pd.concat(dfs) df_agg = ( df_all.groupby("feature")[value_col] .agg(["mean", "std"]) .reindex(all_features) .fillna(0) ) return df_agg
[docs] def get_arfs_mean_feature_importances( models_parent_dir: str, target_name: str, ) -> pd.DataFrame: """ Get mean feature importances across GrootCV runs. Parameters ---------- models_parent_dir : str Directory containing rf_*_pipeline.pkl files. target_name : str Target variable name used in model filenames. Returns ------- arfs_df : pd.DataFrame Mean sorted arfs feature importance. """ imp_dfs = [] # Loop through cross-validation folds for i in range(1, 6): file_path = ( f"{models_parent_dir}/rf_{target_name}_matminer_lob/{i}_pipeline.pkl" ) if not os.path.exists(file_path): raise ValueError(f" File not found: {file_path}") with open(file_path, "rb") as f: grootcv_obj = pickle.load(f)[-1] b_df = grootcv_obj.cv_df.T.copy() b_df.columns = b_df.iloc[0] # Split into shadow and real features shadow_columns = [col for col in b_df.columns if col.startswith("ShadowVar")] other_columns = [col for col in b_df.columns if not col.startswith("ShadowVar")] b_df = b_df[other_columns + shadow_columns] b_df = b_df.drop([b_df.index[0], b_df.index[-1]]).convert_dtypes() # Separate real vs shadow subsets real_df = b_df.iloc[:, : b_df.shape[1] // 2].copy() # Sort by mean importance real_df = real_df.reindex( real_df.select_dtypes(include=[np.number]) .mean() .sort_values(ascending=True) .index, axis=1, ) # Keep only selected features col_idx = np.argwhere( real_df.columns.isin(grootcv_obj.selected_features_) ).ravel() imp_dfs.append(real_df.iloc[:, col_idx]) # Combine all folds combined_df = pd.concat(imp_dfs).fillna(0) arfs_summary = pd.DataFrame( {"mean": combined_df.mean().values, "std": combined_df.std().values}, index=combined_df.columns, ) return arfs_summary
[docs] def get_rf_pfi_shap_summary( models_parent_dir: str, data_parent_dir: str, target_name: str, n_repeats: int = 5, random_state: int = 42, ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Load trained RandomForest models (5 folds), compute PFI + SHAP values Parameters ---------- models_parent_dir : str Directory containing rf_*_pipeline.pkl files for each fold. target_name : str Target variable name (used in file path pattern). n_repeats : int, optional Number of PFI repeats (default=5). random_state : int, optional Random seed for reproducibility. n_feats : int, optional Number of top features to display. Returns ------- pfi_df : pd.DataFrame Combined permutation feature importance results. shap_df : pd.DataFrame Combined mean absolute SHAP values across folds. """ all_pfi = [] all_shap = [] all_features = set() cv_outer = KFold(n_splits=5, shuffle=True, random_state=18012019) target, feat = get_dataset( feat_type="matminer_lob", target_name=target_name, data_parent_dir=data_parent_dir, ) # --- Step 1: Load models and compute importances --- for fold_ind, (train_ix, test_ix) in enumerate(cv_outer.split(feat)): # Model path model_path = ( f"{models_parent_dir}/rf_{target_name}_matminer_lob/model_{fold_ind+1}.pkl" ) pipeline_path = f"{models_parent_dir}/rf_{target_name}_matminer_lob/{fold_ind+1}_pipeline.pkl" if not os.path.exists(model_path): print(f"Missing model: {model_path}") break # Load model with open(model_path, "rb") as f: model = pickle.load(f) # Load pipeline with open(pipeline_path, "rb") as f: grootcv_obj = pickle.load(f)[-1] rel_feats = list(grootcv_obj.selected_features_) X_train, _ = feat.iloc[train_ix], feat.iloc[test_ix] y_train, _ = target.iloc[train_ix, 0], target.iloc[test_ix, 0] # Get input data for model fold (X_train, y_train) X_train = X_train.loc[:, rel_feats] # X_test = X_test.loc[:, rel_feats] model_features = model.feature_names_in_ # --- Compute PFI --- pfi_res = permutation_importance( model, X_train, y_train, n_repeats=n_repeats, random_state=random_state, n_jobs=-1, ) pfi_df = pd.DataFrame( { "feature": model_features, "pfi_mean": pfi_res.importances_mean, "pfi_std": pfi_res.importances_std, } ) all_pfi.append(pfi_df) all_features.update(model_features) # --- Compute SHAP --- explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_train) if isinstance(shap_values, list): # handle multiclass shap_values = np.mean(np.abs(shap_values), axis=0) else: shap_values = np.abs(shap_values) mean_abs_shap = shap_values.mean(axis=0) shap_df = pd.DataFrame({"feature": model_features, "shap_mean": mean_abs_shap}) all_shap.append(shap_df) # --- Step 2: Combine across folds --- all_features = sorted(list(all_features)) pfi_summary = aggregate_importances(all_pfi, "pfi_mean", all_features) shap_summary = aggregate_importances(all_shap, "shap_mean", all_features) return pfi_summary, shap_summary
[docs] def get_modnet_pfi_shap_summary( models_parent_dir: str, data_parent_dir: str, target_name: str, n_repeats: int = 5, random_state: int = 42, ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Load trained MODNet models (5 folds), compute PFI + SHAP values Parameters ---------- models_parent_dir : str Directory containing rf_*_pipeline.pkl files for each fold. target_name : str Target variable name (used in file path pattern). n_repeats : int, optional Number of PFI repeats (default=5). random_state : int, optional Random seed for reproducibility. n_feats : int, optional Number of top features to display. Returns ------- pfi_df : pd.DataFrame Combined permutation feature importance results. shap_df : pd.DataFrame Combined mean absolute SHAP values across folds. """ all_pfi = [] all_shap = [] all_features = set() cv_outer = KFold(n_splits=5, shuffle=True, random_state=18012019) target, feat = get_dataset( feat_type="matminer_lob", target_name=target_name, data_parent_dir=data_parent_dir, ) # --- Step 1: Load models and compute importances --- for fold_ind, (train_ix, test_ix) in enumerate(cv_outer.split(feat)): # Model path model_path = ( f"{models_parent_dir}/modnet_{target_name}_matminer_lob/model_{fold_ind+1}" ) pipeline_path = f"{models_parent_dir}/modnet_{target_name}_matminer_lob/{fold_ind+1}_pipeline.pkl" if not os.path.exists(model_path): print(f"Missing model: {model_path}") break # Load model with open(model_path, "rb") as f: model = pickle.load(f) model._restore_model() # get model instance from dict for inner models of ensemble # Load pipeline with open(pipeline_path, "rb") as f: grootcv_obj = pickle.load(f)[-1] rel_feats = list(grootcv_obj.selected_features_) X_train, _ = feat.iloc[train_ix], feat.iloc[test_ix] y_train, _ = ( target.iloc[train_ix].values.flatten(), target.iloc[test_ix].values.flatten(), ) # Get input data for model fold (X_train, y_train) X_train = X_train.loc[:, rel_feats] # X_test = X_test.loc[:, rel_feats] moddata_train = MODData( df_featurized=X_train, targets=y_train.reshape(-1, 1), target_names=[target_name], structure_ids=list(X_train.index), ) for inner_model in tqdm(model.models): X_train_scaled = process_dataset(moddata_train, inner_model) model_features = inner_model.optimal_descriptors[: inner_model.n_feat] # --- Compute PFI --- for each inner model of ensemble pfi_res = permutation_importance( inner_model.model, X_train_scaled, moddata_train.targets, n_repeats=n_repeats, scoring="neg_mean_absolute_error", n_jobs=16, random_state=random_state, ) pfi_df = pd.DataFrame( { "feature": model_features, "pfi_mean": pfi_res.importances_mean, "pfi_std": pfi_res.importances_std, } ) all_pfi.append(pfi_df) all_features.update(model_features) # --- Compute SHAP --- for each inner model of ensemble explainer = shap.DeepExplainer(model=inner_model.model, data=X_train_scaled) shap_values_obj = explainer(X_train_scaled) shap_values_obj.values = shap_values_obj.values[:, :, 0] if isinstance(shap_values_obj.values, list): # handle multiclass shap_values = np.mean(np.abs(shap_values_obj.values), axis=0) else: shap_values = np.abs(shap_values_obj.values) mean_abs_shap = shap_values.mean(axis=0) shap_df = pd.DataFrame( {"feature": model_features, "shap_mean": mean_abs_shap.flatten()} ) all_shap.append(shap_df) # --- Step 2: Combine across folds --- all_features = sorted(list(all_features)) pfi_summary = aggregate_importances(all_pfi, "pfi_mean", all_features) shap_summary = aggregate_importances(all_shap, "shap_mean", all_features) return pfi_summary, shap_summary