"""
Functions for feature reduction and selection.
"""
import os
import json
import shutil
import subprocess
import pandas as pd
import numpy as np
import warnings
import random
import matplotlib.pyplot as plt
from collections import defaultdict
from sklearn.metrics import get_scorer
from sklearn import metrics
from sklearn.base import clone
from sklearn.pipeline import Pipeline
from sklearn.model_selection import KFold, cross_validate
from deap import base, creator, tools, algorithms
from sissopp.postprocess.load_models import load_model
from tqdm.autonotebook import tqdm
from feature_engine.selection import SmartCorrelatedSelection, DropConstantFeatures
from arfs.feature_selection import GrootCV
import logging
warnings.filterwarnings("ignore")
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
[docs]
def get_relevant_features(
X_train: pd.DataFrame,
y_train: np.ndarray,
const_feat_tol: float = 0.95,
collinearity_tol: float = 0.9,
grootcv_nfolds: int = 5,
grootcv_n_iter: int = 20,
grootcv_lgbm_objective: str = "mae",
**pipeline_kwargs,
) -> tuple[Pipeline, pd.DataFrame]:
"""
Build and apply a feature selection pipeline to remove correlated and irrelevant features.
The pipeline applies the following steps:
1. **DropConstantFeatures**: Removes features with low variance (near-constant).
2. **SmartCorrelatedSelection**: Removes highly correlated features based on Pearson correlation.
3. **GrootCV**: Selects relevant features using cross-validation with a LightGBM-based model.
Parameters
----------
X_train : pd.DataFrame | np.ndarray
Training feature matrix.
y_train : np.ndarray
Target values corresponding to `X_train`. 1D numpy array
const_feat_tol : float, default=0.95
Threshold for removing near-constant features. A feature is removed if
a single value accounts for at least this proportion of observations.
collinearity_tol : float, default=0.9
Correlation threshold for removing highly correlated features.
grootcv_nfolds : int, default=5
Number of folds for cross-validation in `GrootCV`.
grootcv_n_iter : int, default=20
Number of iterations for feature selection in `GrootCV`.
grootcv_lgbm_objective : str, default="mae"
Objective function for the LightGBM model inside `GrootCV`.
**pipeline_kwargs : dict
Additional keyword arguments passed to the underlying `Pipeline`.
Returns
-------
pipeline, pd.DataFrame
Pipeline instance and a transformed training set with only the relevant features retained.
"""
pipeline = Pipeline(
[
("zero_variance", DropConstantFeatures(tol=const_feat_tol)),
(
"collinearity",
SmartCorrelatedSelection(
threshold=collinearity_tol,
cv=5,
method="pearson",
selection_method="variance",
),
),
(
"all_rel_feats",
GrootCV(
objective=grootcv_lgbm_objective,
n_jobs=8,
n_iter=grootcv_n_iter,
n_folds=grootcv_nfolds,
rf=False,
),
),
],
verbose=True,
)
if pipeline_kwargs is not None and isinstance(pipeline_kwargs, dict):
pipeline.set_params(**pipeline_kwargs)
return pipeline, pipeline.fit_transform(X_train, y_train)
# ----------SISSO_GAFeatureSelector----------
[docs]
def init_valid_individual(icls, n_features: int, num_selected_features: int):
"""
Initialize an individual with a fixed number of selected features.
Ensures that exactly `num_selected_features` in a binary vector are switched to 1.
Parameters
----------
icls : Individual creator class
The individual class to instantiate.
n_features : int
Total number of features.
num_selected_features : int
Number of features to select (element of binary vector set to 1).
"""
individual = [0] * n_features
selected_indices = np.random.choice(
range(n_features), num_selected_features, replace=False
)
for idx in selected_indices:
individual[idx] = 1
return icls(individual)
[docs]
def mutate_and_fix(individual: np.ndarray, indpb: float, num_selected_features: int):
"""
Mutate an individual and ensure it has exactly `num_selected_features` selected.
Parameters
----------
individual : np.ndarray
The individual to mutate.
indpb : float
Probability for individual bits to be flipped.
num_selected_features : int
Number of features to select (element of binary vector set to 1).
"""
tools.mutFlipBit(individual, indpb)
while sum(individual) > num_selected_features:
ones_indices = [i for i, bit in enumerate(individual) if bit == 1]
individual[np.random.choice(ones_indices)] = 0
while sum(individual) < num_selected_features:
zeros_indices = [i for i, bit in enumerate(individual) if bit == 0]
individual[np.random.choice(zeros_indices)] = 1
return (individual,)
[docs]
def hamming_distance(ind1: np.ndarray, ind2: np.ndarray) -> int:
"""
Calculate the Hamming distance between two individuals.
Parameters
----------
ind1 : np.ndarray
First individual.
ind2 : np.ndarray
Second individual.
Returns
-------
int
Hamming distance between the two individuals.
"""
return sum(x != y for x, y in zip(ind1, ind2))
[docs]
def is_diverse(
individual: np.ndarray,
population: list[np.ndarray],
min_distance: int = 5,
) -> bool:
"""
Check if an individual is diverse enough from a population based on Hamming distance.
Parameters
----------
individual : np.ndarray
The individual to check.
population : list of np.ndarray
The population to compare against.
min_distance : int, default=5
Minimum Hamming distance required for diversity.
Returns
-------
bool
True if the individual is diverse enough, False otherwise.
"""
return all(
hamming_distance(individual, other) >= min_distance for other in population
)
[docs]
def population_entropy(population: list[np.ndarray]) -> float:
"""
Calculate the entropy of a population of binary individuals.
Parameters
----------
population : list of np.ndarray
The population of individuals.
Returns
-------
float
Entropy of the population
"""
pop_array = np.array(population)
p1 = np.mean(pop_array, axis=0)
p1 = np.clip(p1, 1e-6, 1 - 1e-6)
entropy = -np.mean(p1 * np.log2(p1) + (1 - p1) * np.log2(1 - p1))
return entropy
[docs]
def mixed_selection(population: list[np.ndarray], k: int) -> list[np.ndarray]:
"""
Mixed selection strategy: combines elitism and random selection.
Parameters
----------
population : list of np.ndarray
The population of individuals.
k : int
Number of individuals to select.
Returns
-------
list of np.ndarray
Selected individuals.
"""
elite_count = int(0.2 * k)
random_count = k - elite_count
elites = tools.selBest(population, elite_count)
randoms = tools.selRandom(population, random_count)
return elites + randoms
[docs]
def cxTwoPointAndFix(
ind1: np.ndarray,
ind2: np.ndarray,
num_selected_features: int,
):
"""
Crossover two individuals and fix them to have a specific number of selected features.
Parameters
----------
ind1 : np.ndarray
First individual.
ind2 : np.ndarray
Second individual.
num_selected_features : int
Number of features to select (element of binary vector set to 1).
Returns
-------
tuple of np.ndarray
The two modified individuals after crossover and fixing.
"""
tools.cxTwoPoint(ind1, ind2)
mutate_and_fix(ind1, indpb=0, num_selected_features=num_selected_features)
mutate_and_fix(ind2, indpb=0, num_selected_features=num_selected_features)
return ind1, ind2
[docs]
class GAFeatureSelector:
"""Genetic Algorithm Feature Selector using DEAP.
Parameters
----------
X : pd.DataFrame or np.ndarray
Feature matrix.
y : pd.Series or np.ndarray
Target values.
model : sklearn-like estimator
Model to evaluate feature subsets.
num_features : int, default=25
Number of features to select.
population_size : int, default=50
Size of the GA population.
generations : int, default=100
Number of generations to run.
feature_names : list of str, optional
Names of the features. If None, indices will be used.
cxpb : float, default=0.5
Crossover probability.
mutpb : float, default=0.2
Mutation probability.
early_stop_patience : int, default=5
Generations to wait for improvement before stopping.
mutation_boost_threshold : int, default=3
Generations without improvement to trigger mutation boost.
mutation_boost_factor : float, default=2.0
Factor to increase mutation probability when boosting.
cv : int, default=5
Number of cross-validation folds.
scoring : str, default="r2"
Scoring metric for evaluation.
min_diversity : int, default=5
Minimum Hamming distance for diversity.
entropy_threshold : float, default=0.3
Entropy threshold to trigger mutation boost.
X_test : pd.DataFrame or np.ndarray, optional
Test feature matrix for diagnostic evaluation.
y_test : pd.Series or np.ndarray, optional
Test target values for diagnostic evaluation.
test_scoring : str or callable, optional
Scoring metric for test evaluation. Defaults to `scoring`.
n_jobs : int, default=1
Number of parallel jobs for evaluation.
return_train_score : bool, default=True
Whether to return training scores in cross-validation.
error_score : float, default=np.nan
Value to assign to failed evaluations.
sissopp_binary_path : str, optional
Path to SISSO++ binary for model evaluation.
mpi_tasks : int, default=8
Number of MPI tasks for SISSO++.
sissopp_inputs : dict, optional
Additional inputs for SISSO++.
"""
def __init__(
self,
X: pd.DataFrame or np.ndarray,
y: pd.Series or np.ndarray,
model: any,
num_features: int = 25,
population_size: int = 50,
generations: int = 100,
feature_names: list[str] | None = None,
cxpb: float = 0.5,
mutpb: float = 0.2,
early_stop_patience: int = 5,
mutation_boost_threshold: int = 3,
mutation_boost_factor: float = 2.0,
cv: int = 5,
scoring: str = "r2",
min_diversity: int = 5,
entropy_threshold: float = 0.3,
X_test: pd.DataFrame | np.ndarray | None = None,
y_test: pd.DataFrame | np.ndarray | None = None,
test_scoring: str | None = None,
n_jobs: int = 1,
return_train_score: bool = True,
error_score=np.nan,
sissopp_binary_path: str = None,
mpi_tasks: int = 8,
sissopp_inputs: dict | None = None,
):
self.X = X
self.y = y
self.model = model
self.num_features = num_features
self.population_size = population_size
self.generations = generations
self.cxpb = cxpb
self.base_mutpb = mutpb
self.early_stop_patience = early_stop_patience
self.mutation_boost_threshold = mutation_boost_threshold
self.mutation_boost_factor = mutation_boost_factor
self.cv = cv
self.scoring = scoring
self.n_features = X.shape[1]
self.min_diversity = min_diversity
self.entropy_threshold = entropy_threshold
self.n_jobs = n_jobs
self.error_score = error_score
self.return_train_score = return_train_score
self.sissopp_binary_path = sissopp_binary_path
self.mpi_tasks = mpi_tasks
self.fitness_history = []
self.sissopp_inputs = sissopp_inputs
self.operator_usage_counts = defaultdict(int)
self.feature_usage_counts = defaultdict(int)
self.test_score_history = []
self.X_test = X_test
self.y_test = y_test
self.test_scoring = test_scoring or scoring
if feature_names is not None:
self.feature_names = feature_names
elif isinstance(self.X, pd.DataFrame):
self.feature_names = list(self.X.columns)
else:
self.feature_names = [f"feat_{i}" for i in range(self.n_features)]
self._setup_deap()
def _setup_deap(self):
"""Setup DEAP toolbox and creator."""
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)
self.toolbox = base.Toolbox()
self.toolbox.register(
"individual",
init_valid_individual,
creator.Individual,
self.n_features,
self.num_features,
)
self.toolbox.register(
"population", tools.initRepeat, list, self.toolbox.individual
)
self.toolbox.register("evaluate", self.evaluate_individual)
self.toolbox.register(
"mate", cxTwoPointAndFix, num_selected_features=self.num_features
)
self.toolbox.register(
"mutate",
mutate_and_fix,
indpb=0.01,
num_selected_features=self.num_features,
)
self.toolbox.register("select", mixed_selection)
[docs]
def evaluate_individual(self, individual):
"""Evaluate an individual using cross-validation."""
selected_indices = [i for i, bit in enumerate(individual) if bit == 1]
X_selected = (
self.X.iloc[:, selected_indices]
if isinstance(self.X, pd.DataFrame)
else self.X[:, selected_indices]
)
if isinstance(self.y, np.ndarray):
y_df = pd.DataFrame(
index=X_selected.index,
columns=[self.sissopp_inputs["property_key"]],
data=self.y,
)
else:
y_df = self.y
df = pd.concat([X_selected, y_df], axis=1)
kf = KFold(random_state=18012019, n_splits=self.cv, shuffle=True)
if self.model is None and self.sissopp_binary_path:
kf_splits = kf.split(X_selected)
base_path = os.getcwd()
cv_scores = []
for i, (train, test) in enumerate(kf_splits):
_ = X_selected.iloc[train, :]
_ = self.y[train]
X_test = X_selected.iloc[test, :]
y_test = self.y[test]
cv_dir = f"cv_{i+1}"
os.makedirs(cv_dir, exist_ok=True)
os.chdir(cv_dir)
inputs_cv = self.sissopp_inputs.copy()
inputs_cv["leave_out_inds"] = [str(i) for i in test]
with open("sisso.json", "w") as f:
json.dump(inputs_cv, f, indent=4)
# json.dumps(inputs, indent=4)
df.to_csv("data.csv")
cmd = (
f"OMP_NUM_THREADS=64 OMP_PLACES=cores mpirun -n {self.mpi_tasks} "
+ self.sissopp_binary_path
)
_ = subprocess.run(
cmd, shell=True, check=True, capture_output=True, text=True
)
max_dim = inputs_cv["desc_dim"]
# Handle inf prediction models with nan scores
try:
model = load_model(
f"{os.getcwd()}/models/train_dim_{max_dim}_model_0.dat"
)
y_pred = model.eval_many(X_test.values)
self.parse_postfix(model=model, selected_indices=selected_indices)
score = getattr(metrics, self.scoring.split("neg_")[-1])(
y_test, y_pred
)
except Exception:
score = np.nan
cv_scores.append(score * -1 if "neg_" in self.scoring else score)
os.chdir(base_path)
else:
model_clone = clone(self.model)
cv_scores = cross_validate(
model_clone,
X_selected,
self.y,
cv=kf,
scoring=self.scoring,
n_jobs=self.n_jobs,
error_score=self.error_score,
return_train_score=self.return_train_score,
)
base_path = os.getcwd()
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
# Remove all cv models
if os.path.isdir(item_path) and (
item.startswith("SISSO_") or item.startswith("cv")
):
shutil.rmtree(item_path)
val_scores = (
cv_scores["test_score"] if isinstance(cv_scores, dict) else cv_scores
)
if np.isnan(val_scores).any():
return (-1e10,)
else:
return (np.mean(val_scores),)
[docs]
def evaluate_on_test(self, individual):
"""Evaluate the best individual on the test set."""
if self.X_test is None or self.y_test is None:
return None
selected_indices = [i for i, bit in enumerate(individual) if bit == 1]
X_train_sel = (
self.X.iloc[:, selected_indices]
if isinstance(self.X, pd.DataFrame)
else self.X[:, selected_indices]
)
X_test_sel = (
self.X_test.iloc[:, selected_indices]
if isinstance(self.X_test, pd.DataFrame)
else self.X_test[:, selected_indices]
)
model_clone = clone(self.model)
base_path = os.getcwd()
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
# Remove test set models
if os.path.isdir(item_path) and (
item.startswith("SISSO_") or item.startswith("cv")
):
shutil.rmtree(item_path)
model_clone.fit(X_train_sel, self.y)
y_pred = model_clone.predict(X_test_sel)
if isinstance(self.test_scoring, str):
scorer = get_scorer(self.test_scoring)
score = scorer(model_clone, X_test_sel, self.y_test)
else:
score = self.test_scoring(self.y_test, y_pred)
return score
[docs]
def differential_evolution_ga(self, pop, F=0.5, mutpb=0.1):
"""Differential Evolution GA offspring generation.
Parameters
----------
pop : list of Individuals
Current population.
F : float, default=0.5
Differential weight.
mutpb : float, default=0.1
Mutation probability.
Returns
-------
list of np.ndarray
New list of population after DE operation.
"""
new_pop = []
for target in pop:
a, b, c = random.sample(pop, 3)
trial = creator.Individual([0] * len(target))
for i in range(len(target)):
if random.random() < F:
trial[i] = int(a[i] ^ (b[i] ^ c[i]))
else:
trial[i] = target[i]
(trial,) = mutate_and_fix(
trial, indpb=mutpb, num_selected_features=sum(target)
)
trial.fitness.values = self.toolbox.evaluate(trial)
if not target.fitness.valid:
target.fitness.values = self.toolbox.evaluate(target)
new_pop.append(
trial if trial.fitness.values[0] > target.fitness.values[0] else target
)
return new_pop
[docs]
def parse_postfix(self, model, selected_indices):
"""Extracts features and operators from postfix expression string.
Parameters
----------
model : SISSO model object
The SISSO model containing features.
selected_indices : list of int
Indices of selected features.
"""
for feat in model.feats:
expression = feat.postfix_expr.split("|")
for exp in expression:
if exp.isdigit():
self.feature_usage_counts[selected_indices[int(exp)]] += 1
else:
self.operator_usage_counts[exp] += 1
[docs]
def run(self, plot=True, strategy="standard"):
"""Run the Genetic Algorithm for feature selection.
Parameters
----------
plot : bool, default=True
Whether to plot the fitness history.
strategy : str, default="standard"
Offspring generation strategy: "standard" or "de" (differential evolution).
Returns
-------
list of str
Selected feature names.
"""
pop = self.toolbox.population(n=self.population_size)
hall_of_fame = tools.HallOfFame(1)
no_improvement_counter = 0
best_fitness = -np.inf
for gen in range(self.generations):
entropy = population_entropy(pop)
if (
no_improvement_counter >= self.mutation_boost_threshold
or entropy < self.entropy_threshold
):
mutpb = self.base_mutpb * self.mutation_boost_factor
indpb = 0.05
else:
mutpb = self.base_mutpb
indpb = 0.01
self.toolbox.unregister("mutate")
self.toolbox.register(
"mutate",
mutate_and_fix,
indpb=indpb,
num_selected_features=self.num_features,
)
# Choose offspring generation strategy
if strategy == "standard":
offspring = algorithms.varAnd(pop, self.toolbox, self.cxpb, mutpb)
elif strategy == "de":
offspring = self.differential_evolution_ga(pop, F=0.7, mutpb=0.05)
else:
raise ValueError("Unknown strategy")
for ind in tqdm(offspring, desc=f"Evaluating Gen {gen}"):
ind.fitness.values = self.toolbox.evaluate(ind)
# Keep only diverse individuals
diverse_offspring = []
for ind in offspring:
if is_diverse(
ind, diverse_offspring + pop, min_distance=self.min_diversity
):
diverse_offspring.append(ind)
if len(diverse_offspring) < len(pop):
needed = len(pop) - len(diverse_offspring)
diverse_offspring.extend(self.toolbox.select(offspring, k=needed))
pop[:] = diverse_offspring
hall_of_fame.update(pop)
current_best = max(ind.fitness.values[0] for ind in pop)
current_avg = np.mean([ind.fitness.values[0] for ind in pop])
self.fitness_history.append((current_best, current_avg))
test_score = None
if self.X_test is not None:
test_score = self.evaluate_on_test(hall_of_fame[0])
self.test_score_history.append(test_score)
if current_best > best_fitness + 1e-8:
best_fitness = current_best
no_improvement_counter = 0
else:
no_improvement_counter += 1
if no_improvement_counter >= self.early_stop_patience:
print(f"Converged at generation {gen}")
break
logging.info(
f"Gen {gen}: Best Fitness ({self.scoring}) = {current_best:.4f}, Entropy = {entropy:.4f}, Mutation = {mutpb:.2f}"
+ (f", Test Score = {test_score:.4f}" if test_score is not None else "")
)
logging.info(f"All time best : {best_fitness:.4f}")
self.best_individual = hall_of_fame[0]
self.selected_features = [
self.feature_names[i]
for i, bit in enumerate(self.best_individual)
if bit == 1
]
if plot:
self.plot_fitness(strategy)
return self.selected_features
[docs]
def plot_fitness(self, strategy) -> None:
"""Plot the fitness history of the GA.
Parameters
----------
strategy : str
Offspring generation strategy used.
"""
gens = range(len(self.fitness_history))
best_fitness_vals = [f[0] for f in self.fitness_history]
plt.plot(gens, best_fitness_vals, label=f"Best Fitness ({self.scoring})")
if self.test_score_history:
plt.plot(
gens,
self.test_score_history,
label=f"Test {self.test_scoring} (diagnostic)",
linestyle="--",
)
plt.xlabel("Generation")
plt.ylabel(f"{self.scoring} Score")
plt.legend()
plt.title(f"GA Convergence Plot : {strategy.capitalize()}")
plt.savefig(f"GA_result_{strategy.capitalize()}.png")
plt.show()
plt.close()
[docs]
def get_selected_feature_indices(self) -> list[int]:
"""
Get indices of selected features from the best individual.
Returns
-------
list of int
Indices of selected features.
"""
return [i for i, bit in enumerate(self.best_individual) if bit == 1]