Source code for spare_scores.mlp

import logging
import time
from typing import Any

import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.exceptions import ConvergenceWarning
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.neural_network import MLPClassifier, MLPRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.utils._testing import ignore_warnings

from .data_prep import logging_basic_config


[docs] class MLPModel: """ A class for managing MLP models. Additionally, the class can be initialized with any number of keyword arguments. These will be added as attributes to the class. :param predictors: List of predictors used for modeling. :type predictors: list :param to_predict: Target variable for modeling. :type to_predict: str :param key_var: Key variable for modeling. :type key_var: str """ def __init__( self, predictors: list, to_predict: str, key_var: str, verbose: int = 1, **kwargs: Any, ) -> None: logger = logging_basic_config(verbose, content_only=True) self.predictors = predictors self.to_predict = to_predict self.key_var = key_var self.verbose = verbose self.stats: Any valid_parameters = ["k", "n_repeats", "task", "param_grid"] for parameter in kwargs.keys(): if parameter not in valid_parameters: print( "Parameter '" + parameter + "' is not accepted for " + "MLPModel. Ignoring..." ) continue if parameter == "task": if kwargs[parameter] not in ["Classification", "Regression"]: logger.error( "Only 'Classification' and 'Regression' " + "tasks are supported." ) raise ValueError( "Only 'Classification' and 'Regression' " + "tasks are supported." ) else: self.task = kwargs[parameter] continue self.__dict__.update({parameter: kwargs[parameter]}) # Set default values for the parameters if not provided if "k" not in kwargs.keys(): self.k = 5 if "n_repeats" not in kwargs.keys(): self.n_repeats = 1 if "task" not in kwargs.keys(): self.task = "Regression" if "param_grid" not in kwargs.keys() or kwargs["param_grid"] is None: self.param_grid = { "mlp__hidden_layer_sizes": [ (10, 30, 10), (10, 20, 10), (10, 30, 30, 10), (100,), (200,), ], "mlp__activation": ["tanh", "relu"], "mlp__solver": ["sgd", "adam"], "mlp__alpha": [0.001, 0.01, 0.05, 0.1], "mlp__learning_rate": ["constant", "adaptive"], "mlp__early_stopping": [True], "mlp__max_iter": [1000], }
[docs] def set_parameters(self, **parameters: Any) -> None: self.__dict__.update(parameters)
@ignore_warnings(category=RuntimeWarning) # type: ignore def _fit(self, df: pd.DataFrame) -> None: X = df[self.predictors].astype("float64") y = df[self.to_predict].astype("float64") if self.task == "Regression": mlp = MLPRegressor(early_stopping=True, max_iter=1000) scoring = "neg_mean_absolute_error" metrics = ["MAE", "RMSE", "R2"] else: mlp = MLPClassifier(early_stopping=True, max_iter=1000) scoring = "balanced_accuracy" metrics = [ "AUC", "Accuracy", "Sensitivity", "Specificity", "Precision", "Recall", "F1", ] pipeline = [("scaler", StandardScaler()), ("mlp", mlp)] pipeline_obj = Pipeline(pipeline) grid_search = GridSearchCV( pipeline_obj, self.param_grid, scoring=scoring, cv=KFold(n_splits=5, shuffle=True, random_state=42), refit=True, ) grid_search.fit(X, y) self.mdl = grid_search.best_estimator_["mlp"] self.scaler = grid_search.best_estimator_["scaler"] self.params = grid_search.best_params_ X_scale = self.scaler.transform(X) self.y_hat = self.mdl.predict(X_scale) self.stats = {metric: [] for metric in metrics} self.get_stats(y, self.y_hat)
[docs] @ignore_warnings(category=(ConvergenceWarning, UserWarning)) # type: ignore def fit(self, df: pd.DataFrame, verbose: int = 1) -> dict: """ Trains the model using the provided dataframe and default parameters. :param df: the provided dataframe. :type df: pandas.DataFrame :param verbose: the verbosity level :type verbose: int :return: A dictionary with the results from training. :rtype: dict """ logger = logging_basic_config(verbose, content_only=True) # Time the training: start_time = time.time() logger.info("Training the MLP model...") self._fit(df) training_time = time.time() - start_time self.stats["training_time"] = round(training_time, 4) result = { "predicted": self.y_hat, "model": self.mdl, "stats": self.stats, "best_params": self.params, "CV_folds": None, "scaler": self.scaler, } if self.task == "Regression": print(">>MAE = ", self.stats["MAE"][0]) print(">>RMSE = ", self.stats["RMSE"][0]) print(">>R2 = ", self.stats["R2"][0]) else: print(">>AUC = ", self.stats["AUC"][0]) print(">>Accuracy = ", self.stats["Accuracy"][0]) print(">>Sensityvity = ", self.stats["Sensitivity"][0]) print(">>Specificity = ", self.stats["Specificity"][0]) print(">>Precision = ", self.stats["Precision"][0]) print(">>Recall = ", self.stats["Recall"][0]) print(">>F1 = ", self.stats["F1"][0]) return result
[docs] def predict(self, df: pd.DataFrame) -> np.ndarray: """ Predicts the result of the provided dataframe using the trained model. :param df: the provided dataframe. :type df: pandas.DataFrame :return: The predictions from the trained model regarding the provided dataframe. :rtype: np.ndarray """ X = df[self.predictors] X_transformed = self.scaler.transform(X) y_pred = ( self.mdl.predict(X_transformed) if self.task == "Regression" else self.mdl.predict_proba(X_transformed)[:, 1] ) return y_pred
[docs] def get_stats(self, y: np.ndarray, y_hat: np.ndarray) -> None: """ Return the stats from the training :param y: original labels :type y: np.ndarray :param y_hat: predicted values :type y_hat: np.ndarray """ if len(y.unique()) == 2: fpr, tpr, thresholds = metrics.roc_curve(y, y_hat, pos_label=1) self.stats["AUC"].append(metrics.auc(fpr, tpr)) # tn, fp, fn, tp = metrics.confusion_matrix(y_test, (y_score >= thresholds[np.argmax(tpr - fpr)])*2-1).ravel() tn, fp, fn, tp = metrics.confusion_matrix(y, y_hat).ravel() self.stats["Accuracy"].append((tp + tn) / (tp + tn + fp + fn)) self.stats["Sensitivity"].append(tp / (tp + fp)) self.stats["Specificity"].append(tn / (tn + fn)) precision, recall = tp / (tp + fp), tp / (tp + fn) self.stats["Precision"].append(precision) self.stats["Recall"].append(recall) self.stats["F1"].append(2 * precision * recall / (precision + recall)) else: self.stats["MAE"].append(metrics.mean_absolute_error(y, y_hat)) self.stats["RMSE"].append( metrics.mean_squared_error(y, y_hat, squared=False) ) self.stats["R2"].append(metrics.r2_score(y, y_hat))
[docs] def output_stats(self) -> None: for key, value in self.stats.items(): logging.info( f">> {key} = {np.mean(value): #.4f} \u00B1 {np.std(value): #.4f}" )