#!/usr/bin/env python3
"""
Methods for Machine Learning for community inference.
"""
# imports
import os
import gc
import time
import warnings
import itertools
from typing import Union
from tqdm import tqdm
from typing import List
from pathlib import Path
import numpy as np
import pandas as pd
import pickle
import sklearn
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, roc_curve
from sklearn.model_selection import cross_val_predict
from sklearn.utils.random import sample_without_replacement
from sklearn.model_selection import KFold, StratifiedKFold, cross_val_score
from sklearn.preprocessing import StandardScaler, MaxAbsScaler, MinMaxScaler
from sklearn.model_selection import GridSearchCV, StratifiedShuffleSplit
from sklearn import tree
import seaborn as sns
import matplotlib.pyplot as plt
import mlflow
from pathlib import Path
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ['MALLOC_TRIM_THRESHOLD_'] = '0'
import tensorflow as tf
import keras
from keras import layers, activations, backend
from ConfigSpace import Categorical, Configuration, ConfigurationSpace, EqualsCondition, Float, InCondition, Integer, Constant, ForbiddenAndConjunction, ForbiddenEqualsClause
from smac import HyperparameterOptimizationFacade, MultiFidelityFacade, Scenario
from smac.intensifier.hyperband import Hyperband
## Helpers
[docs]
def dict_permutations(dictionary:dict) -> List[dict]:
"""
Combine all value combinations in a dictionary into a list of dictionaries.
:param dictionary: Input dictionary
:type dictionary: dict
:return: All permutations of dictionary
:rtype: List[dict]
"""
keys, values = zip(*dictionary.items())
return [dict(zip(keys, v)) for v in itertools.product(*values)]
[docs]
def join_df_metNames(df:pd.DataFrame, grouper="peakID", include_mass=False) -> pd.DataFrame:
"""
Join dataframe column metNames along grouper column. Sets common index for combination
of positively and negatively charged dataframes along their metabolite Names
:param df: Input dataframe
:type df: pandas.Dataframe
:param grouper: Grouper column, defaults to "peakID"
:type grouper: str, optional
:param include_mass: Include mass column, defaults to False
:type include_mass: bool, optional
:return: Combined datafraame
:rtype: pandas.Dataframe
"""
mass_name = "ref_mass" if "ref_mass" in df.columns else "mass"
data_cols = [col for col in df.columns[df.dtypes == "float"] if col not in [mass_name, "kegg", "kegg_id", "dmz"]]
cols = ["metNames"] + [f"MS{i+1}" for i in range(len(data_cols))]
if include_mass:
cols = cols + [mass_name]
comb = pd.DataFrame(columns=cols)
grouper = mass_name if "mass" in grouper else grouper
for g in df[grouper].unique():
comb_met_name = ""
grouped_rows = df.loc[df[grouper] == g]
for i, row in grouped_rows.iterrows():
comb_met_name += row["MetName"] + "\n"
ref_mass = row[mass_name]
if include_mass:
comb.loc[len(comb.index)] = [comb_met_name[:-2]] + list(grouped_rows.iloc[0][data_cols]) + [ref_mass]
else:
comb.loc[len(comb.index)] = [comb_met_name[:-2]] + list(grouped_rows.iloc[0][data_cols])
comb = comb.set_index('metNames')
return comb
[docs]
def intersect_impute_on_left(df_base:pd.DataFrame, df_right:pd.DataFrame, imputation:str="zero") -> pd.DataFrame:
"""
Use all indices from the left (df_base) DataFrame and fill it with the intersection of df_right.
Indices without match are imputed by zero, mean or via kNN, whereas k can be specified as a number.
The default is 5.
:param df_base: Left datframe
:type df_base: pandas.DataFrame
:param df_right: Right dataframe
:type df_right: pandas.DataFrame
:param imputation: Imputation procedure [zero|mean|kNN], defaults to "zero"
:type imputation: str, optional
:return: Merged dataframe with imputed values
:rtype: pandas.DataFrame
"""
df_merged = pd.merge(df_base, df_right, left_index=True, right_index=True, how="left")
df_merged = df_merged.loc[:, ~df_merged.columns.str.endswith('_x')]
df_merged.columns = [col.replace("_y", "") for col in df_merged.columns]
df_merged = df_merged[df_right.columns]
if imputation == "zero":
df_merged = df_merged.fillna(0.0)
elif imputation == "mean":
df_merged = df_merged.fillna(np.mean(df_merged))
elif imputation.endswith("NN"):
k = 5 if imputation[0] == "k" else int(imputation[0])
imputer = KNNImputer(n_neighbors=k, keep_empty_features=True)
df_merged.values = imputer.fit_transform(df_merged)
return df_merged
## SKLearn
[docs]
def individual_layers_to_tuple(config) -> dict:
"""
Change hidden_layer_sizes to tuple representation.
:param config: Configuration
:type config: dict-like
:return: Config with changed hidden_layer_sizes as tuples
:rtype: dict
"""
config = dict(config)
hidden_layer_sizes = tuple([config.pop(k) for k in list(config.keys()) if k.startswith("n_neurons")])
if hidden_layer_sizes:
config["hidden_layer_sizes"] = hidden_layer_sizes
return config
[docs]
class SKL_Classifier:
"""
Representation of Scikit-learn classifier for SMAC3
"""
def __init__(self, X, ys, cv, configuration_space:ConfigurationSpace, classifier, n_trials:int):
"""
Initialization of sklearn classifier
:param X: Data values
:type X: dataframe-like
:param ys: True classes
:type ys: dataframe-like
:param cv: Cross-validation scheme
:type cv: cv-scheme
:param configuration_space: Configuration Space
:type configuration_space: ConfigSpace.ConfigurationSpace
:param classifier: Classifier
:type classifier: Classifier from sklearn
:param n_trials: Number of trials
:type n_trials: int
"""
self.X = X
self.ys = ys
self.cv = cv
self.configuration_space = configuration_space
self.classifier = classifier
self.count = 0
self.progress_bar = tqdm(total=n_trials)
[docs]
def train(self, config: Configuration, seed:int=0) -> np.float64:
"""
Train the classifier with given configuration.
:param config: Configuration
:type config: ConfigSpace.Configuration
:param seed: Seed value to control randomness, defaults to 0
:type seed: int, optional
:return: Loss (1-accuracy)
:rtype: np.float64
"""
config = individual_layers_to_tuple(config)
if "hidden_layer_sizes" in config:
self.progress_bar.set_postfix_str(f'Connection size: {np.prod(config["hidden_layer_sizes"])}')
with mlflow.start_run(run_name=f"run_{self.count}", nested=True):
mlflow.set_tag("test_identifier", f"child_{self.count}")
if isinstance(self.cv, int):
splitter = StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=seed)
else:
splitter = self.cv
scores = []
for train, test in splitter.split(self.X, self.ys):
model = self.classifier(**config)
model.fit(self.X.iloc[train].values, self.ys.iloc[train].values)
y_pred = model.predict(self.X.iloc[test].values)
scores.append( np.mean( y_pred == self.ys.iloc[test].values) )
score = np.mean( scores )
mlflow.log_params( config )
mlflow.log_metrics( {"accuracy": score} )
self.count += 1
self.progress_bar.update(1)
return 1.0 - score
[docs]
def tune_classifier(X, y, classifier, cv, configuration_space:ConfigurationSpace, n_workers:int, n_trials:int, name:str,
algorithm_name:str, outdir, verbosity:int=0):
"""
Perform hyperparameter tuning on an Sklearn classifier.
:param X: Data values
:type X: dataframe-like
:param ys: True classes for one sample
:type ys: dataframe-like
:param classifier: Classifier
:type classifier: Classifier from sklearn
:param cv: Cross-validation scheme
:type cv: cv-scheme
:param configuration_space: Configuration Space
:type configuration_space: ConfigSpace.ConfigurationSpace
:param n_workers: Number of workers to work in parallel
:type n_workers: int
:param n_trials: Number of trials
:type n_trials: int
:param name: Name of run
:type name: str
:param algorithm_name: Algorithm name
:type algorithm_name: str
:param outdir: Output directory
:type outdir: path-like
:param verbosity: Level of verbosity, defaults to 0
:type verbosity: int, optional
:return: Incumbent
:rtype: configs (list, single config)
"""
classifier = SKL_Classifier(X, y, cv=cv, configuration_space=configuration_space, classifier=classifier, n_trials=n_trials)
scenario = Scenario( classifier.configuration_space, deterministic=True,
n_workers=n_workers, n_trials=n_trials,
walltime_limit=np.inf, cputime_limit=np.inf, trial_memory_limit=None,
output_directory=outdir )
facade = HyperparameterOptimizationFacade(scenario, classifier.train, overwrite=True, logging_level=30-verbosity*10)
mlflow.set_tracking_uri(Path(os.path.join(outdir, "mlruns")))
mlflow.set_experiment(f"{name}_{algorithm_name}")
with mlflow.start_run(run_name=f"{name}_{algorithm_name}"):
mlflow.set_tag("test_identifier", "parent")
incumbent = facade.optimize()
return incumbent
[docs]
def nested_cross_validate_model_sklearn(X, ys, labels, classifier, configuration_space, n_trials,
name, algorithm_name, outdir, fold:Union[KFold, StratifiedKFold]=KFold(),
inner_fold:int=3, n_workers:int=1, verbosity:int=0):
"""
Cross-validate a model against the given hyperparameters for all organisms in a nested manner.
:param X: Data values
:type X: dataframe-like
:param ys: True classes for one sample
:type ys: dataframe-like
:param labels: Labels
:type labels: dataframe-like
:param classifier: Classifier
:type classifier: Classifier from sklearn
:param configuration_space: Configuration Space
:type configuration_space: ConfigSpace.ConfigurationSpace
:param n_trials: Number of trials
:type n_trials: int
:param name: Name of run
:type name: str
:param algorithm_name: Algorithm name
:type algorithm_name: str
:param outdir: Output directory
:type outdir: path-like
:param fold: Outer fold, defaults to KFold()
:type fold: Union[KFold, StratifiedKFold], optional
:param inner_fold: Inner fold, defaults to 3
:type inner_fold: int, optional
:param n_workers: Number of workers, defaults to 1
:type n_workers: int, optional
:param verbosity: Level of verbosity, defaults to 0
:type verbosity: int, optional
:return: Dataframe with metrics on different levels
:rtype: tuple[pandas.DataFrame]
"""
metrics_df = pd.DataFrame(columns=["Organism", "Cross-Validation run", "Accuracy", "AUC", "TPR", "FPR", "Threshold", "Conf_Mat"])
organism_metrics_df = pd.DataFrame(columns=["Organism", "Accuracy", "AUC", "TPR", "FPR", "Threshold", "Conf_Mat"])
overall_metrics_df = pd.DataFrame(columns=["Accuracy", "AUC", "TPR", "FPR", "Threshold", "Conf_Mat"])
all_predictions = np.ndarray((0))
all_scorings = np.ndarray((0))
ys_deshuffeld = np.ndarray((0))
# Iterate over all organisms for binary distinction
for i, y in enumerate(tqdm(ys.columns)):
y = ys[y]
predictions = np.ndarray((0))
scorings = np.ndarray((0))
y_deshuffled = np.ndarray((0))
# Outer Loop
for cv_i, (train_index, val_index) in enumerate(tqdm(fold.split(X, y))):
training_data = X.iloc[train_index]
training_labels = y.iloc[train_index]
validation_data = X.iloc[val_index]
validation_labels = y.iloc[val_index]
# Hyperparameter Tuning with inner CV loop
incumbent = tune_classifier(training_data, training_labels, classifier, inner_fold, configuration_space,
n_workers, n_trials, name, algorithm_name, outdir, verbosity)
# Model definition and fitting
best_hp = extract_best_hyperparameters_from_incumbent(incumbent=incumbent, configuration_space=configuration_space)
best_hp = individual_layers_to_tuple(best_hp)
model = classifier(**best_hp) # Ensures model resetting for each cross-validation
model.fit(np.array(training_data), np.array(training_labels))
# Prediction and scoring
prediction = model.predict( np.array(validation_data) )
if hasattr(model, "decision_function"):
scoring = model.decision_function( np.array(validation_data) )
elif hasattr(model, "predict_proba"):
scoring = model.predict_proba( np.array(validation_data) )[::,1]
else:
scoring = prediction
metrics_df = extract_metrics(validation_labels, prediction, scoring, labels[i], cv_i+1, metrics_df)
if verbosity != 0:
if hasattr(model, "evaluate"):
model.evaluate(validation_data, validation_labels, verbose=verbosity)
if hasattr(model, "score"):
print(f"Mean accuracy: {model.score(validation_data, validation_labels)}")
predictions = np.append(predictions, prediction)
scorings = np.append(scorings, scoring)
y_deshuffled = np.append(y_deshuffled, validation_labels)
organism_metrics_df = extract_metrics(y_deshuffled, predictions, scorings, labels[i], metrics_df=organism_metrics_df)
all_predictions = np.append(all_predictions, predictions)
all_scorings = np.append(all_scorings, scorings)
ys_deshuffeld = np.append(ys_deshuffeld, y_deshuffled)
overall_metrics_df = extract_metrics(ys_deshuffeld, all_predictions, all_scorings, metrics_df=overall_metrics_df)
# Saving of results
metrics_df.to_csv(os.path.join(outdir, f"{algorithm_name}_metrics.tsv"), sep="\t")
organism_metrics_df.to_csv(os.path.join(outdir, f"{algorithm_name}_organism_metrics.tsv"), sep="\t")
overall_metrics_df.to_csv(os.path.join(outdir, f"{algorithm_name}_overall_metrics.tsv"), sep="\t")
return (metrics_df, organism_metrics_df, overall_metrics_df)
[docs]
def tune_train_model_sklearn( X, ys, labels, classifier, configuration_space, n_workers, n_trials,
source:str, name, algorithm_name, outdir, fold:Union[KFold, StratifiedKFold]=KFold(),
verbosity=0 ):
"""
Tune and train a model in sklearn.
:param X: Data values
:type X: dataframe-like
:param ys: True classes for one sample
:type ys: dataframe-like
:param labels: Labels
:type labels: dataframe-like
:param classifier: Classifier
:type classifier: Classifier from sklearn
:param configuration_space: Configuration Space
:type configuration_space: ConfigSpace.ConfigurationSpace
:param n_workers: Number of workers
:type n_workers: int
:param n_trials: Number of trials
:type n_trials: int
:param source: Source of data
:type source: str
:param name: Name of run
:type name: str
:param algorithm_name: Algorithm name
:type algorithm_name: str
:param outdir: Output directory
:type outdir: path-like
:param fold: Fold for cross validation during tuning, defaults to KFold()
:type fold: Union[KFold, StratifiedKFold], optional
:param verbosity: Level of verbosity, defaults to 0
:type verbosity: int, optional
"""
# Iterate over all organisms for binary distinction
for i, org in enumerate(tqdm(ys.columns)):
y = ys[org]
incumbent = tune_classifier(X, y, classifier, fold, configuration_space, n_workers, n_trials,
name, algorithm_name, outdir, verbosity)
# Model definition and fitting
best_hp = extract_best_hyperparameters_from_incumbent(incumbent=incumbent, configuration_space=configuration_space)
best_hp = individual_layers_to_tuple(best_hp)
model = classifier(**best_hp) # Ensures model resetting for each cross-validation
model.fit(np.array(X), np.array(y))
with open(os.path.join(outdir, f'model_{algorithm_name}_{labels[i]}_{source}.pkl'), 'wb') as f:
pickle.dump(model ,f)
[docs]
def evaluate_model_sklearn( X, ys, labels, indir, data_source, algorithm_name, outdir, verbosity=0 ):
"""
Evaluate a model against the given hyperparameters for all organisms and save the resulting metrics and feature importances.
:param X: Data values
:type X: dataframe-like
:param ys: True classes for one sample
:type ys: dataframe-like
:param labels: Labels
:type labels: dataframe-like
:param indir: Input directory
:type indir: path-like
:param data_source: Data source
:type data_source: str
:param algorithm_name: Algorithm name
:type algorithm_name: str
:param outdir: Output directory
:type outdir: path-like
:param verbosity: Level of verbosity, defaults to 0
:type verbosity: int, optional
"""
eval_metrics_df = pd.DataFrame(columns=["Organism", "Accuracy", "AUC", "TPR", "FPR", "Threshold", "Conf_Mat"])
feature_importances = {}
for i, org in enumerate(tqdm(ys.columns)):
y = ys[org]
with open(os.path.join(indir, f'model_{algorithm_name}_{labels[i]}.pkl'), 'rb') as f:
model = pickle.load(f)
prediction = model.predict( np.array(X) )
if hasattr(model, "decision_function"):
scoring = model.decision_function( np.array(X) )
elif hasattr(model, "predict_proba"):
scoring = model.predict_proba( np.array(X) )[::,1]
else:
scoring = prediction
eval_metrics_df = extract_metrics(y, prediction, scoring, labels[i], 0, eval_metrics_df)
model.fit(X.values, y)
if hasattr(model, "feature_importances_"):
feature_importances[labels[i]] = model.feature_importances_
feat_imp_df = pd.DataFrame(feature_importances, index=X.columns)
feat_imp_df.to_csv(os.path.join(outdir, f"feature_importance_{algorithm_name}_{data_source}.tsv"), sep="\t")
eval_metrics_df.to_csv(os.path.join(outdir, f"{algorithm_name}_{data_source}_eval_metrics.tsv"), sep="\t")
# PLOTTING
[docs]
def plot_cv_confmat(ys, target_labels, accuracies, confusion_matrices, outdir, name):
"""
Plot heatmap of confusion matrix
:param ys: Targets
:type ys: dataframe-like
:param target_labels: Target labels
:type target_labels: array-like
:param accuracies: Accuracies
:type accuracies: dataframe-like
:param confusion_matrices: Confusion matrices
:type confusion_matrices: dataframe-like
:param outdir: Output directory
:type outdir: path-like
:param name: Name of run
:type name: str
"""
warnings.filterwarnings("ignore", message="This figure includes Axes that are not compatible with tight_layout, so results might be incorrect*")
fig, axs = plt.subplots(nrows=4, ncols=2, figsize=(16, 8))
cbar_ax = fig.add_axes([.91, .3, .03, .4]) # type: ignore
for i, ax in enumerate(axs.flat):
sns.heatmap(confusion_matrices[i],
vmin=0, vmax=len(ys), annot=True, ax=ax,
cbar=i == 0, cbar_ax=None if i else cbar_ax)
ax.set_title(f'{target_labels[i]}, Accuracy: {round(accuracies[i], 5)}')
ax.axis('off')
plt.title(f"Overall accuracy: {round(np.mean(accuracies), 3)}")
fig.tight_layout(rect=[0, 0, .9, 1]) # type: ignore
plt.savefig(os.path.join(outdir, f"{name}.png"))
plt.close()
[docs]
def plot_decision_trees(model, feature_names, class_names, outdir, name):
"""
Plot decision trees of model
:param model: Model
:type model: model-lie
:param feature_names: Feature names
:type feature_names: array-like
:param class_names: Class names
:type class_names: array-like
:param outdir: Output directory
:type outdir: path-like
:param name: Name of run
:type name: str
"""
fig, axes = plt.subplots(nrows = 1,ncols = 1,figsize = (4,4), dpi=900)
tree.plot_tree(model,
feature_names = feature_names,
class_names = class_names,
filled = True)
plt.savefig(os.path.join(outdir, f"{name}.png"))
plt.close()
[docs]
def plot_metrics_df(metrics_df, organism_metrics_df, overall_metrics_df, algorithm_name, outdir, show=False):
"""
Plot the extracted metrics as a heatmap and ROC AUC curve
:param metrics_df: Metrics dataframe
:type metrics_df: pandas.DataFrame
:param organism_metrics_df: Metrics dataframe on organism level
:type organism_metrics_df: pandas.DataFrame
:param overall_metrics_df: Metrics dataframe on overall level
:type overall_metrics_df: pandas.DataFrame
:param algorithm_name: Algorithm name
:type algorithm_name: str
:param outdir: Output directory
:type outdir: path-like
:param show: Show plot, defaults to False
:type show: bool, optional
"""
ax = sns.heatmap(metrics_df.pivot(index="Organism", columns="Cross-Validation run", values="Accuracy"),
vmin=0, vmax=1.0, annot=True, cmap=sns.diverging_palette(328.87, 221.63, center="light", as_cmap=True))
fig = ax.get_figure()
fig.savefig(os.path.join(outdir, f"{algorithm_name}_heatmap_accuracies.png"), bbox_inches="tight")
if show:
plt.show()
plt.close()
ax = sns.lineplot( overall_metrics_df.explode(["TPR", "FPR"]), x="FPR", y="TPR", hue="AUC")
ax = sns.lineplot( organism_metrics_df.explode(["TPR", "FPR"]), x="FPR", y="TPR", hue="Organism", alpha=0.5, ax=ax)
ax.set_title("AUC")
leg = ax.axes.get_legend()
leg.set_title("Organism (AUC)")
for t, l in zip(leg.texts, [f'{row["Organism"]} (AUC={str(row["AUC"])})' for i, row in organism_metrics_df.iterrows()]+ [f"Overall (AUC={overall_metrics_df.loc[0, 'AUC']})"]):
t.set_text(l)
fig = ax.get_figure()
fig.savefig(os.path.join(outdir, f"{algorithm_name}_AUC.png"), bbox_inches="tight")
if show:
plt.show()
plt.close()