This is page 2 of 16. Use http://codebase.md/mljar/mljar-supervised?page={x} to view the full context. # Directory Structure ``` ├── .github │ └── workflows │ ├── run-tests.yml │ ├── test-installation-with-conda.yml │ └── test-installation-with-pip-on-windows.yml ├── .gitignore ├── CITATION ├── examples │ ├── notebooks │ │ ├── basic_run.ipynb │ │ └── Titanic.ipynb │ └── scripts │ ├── binary_classifier_adult_fairness.py │ ├── binary_classifier_ensemble.py │ ├── binary_classifier_marketing.py │ ├── binary_classifier_random.py │ ├── binary_classifier_Titanic.py │ ├── binary_classifier.py │ ├── multi_class_classifier_digits.py │ ├── multi_class_classifier_MNIST.py │ ├── multi_class_classifier.py │ ├── multi_class_drug_fairness.py │ ├── regression_acs_fairness.py │ ├── regression_crime_fairness.py │ ├── regression_housing_fairness.py │ ├── regression_law_school_fairness.py │ ├── regression.py │ └── tabular_mar_2021.py ├── LICENSE ├── MANIFEST.in ├── pytest.ini ├── README.md ├── requirements_dev.txt ├── requirements.txt ├── setup.py ├── supervised │ ├── __init__.py │ ├── algorithms │ │ ├── __init__.py │ │ ├── algorithm.py │ │ ├── baseline.py │ │ ├── catboost.py │ │ ├── decision_tree.py │ │ ├── extra_trees.py │ │ ├── factory.py │ │ ├── knn.py │ │ ├── lightgbm.py │ │ ├── linear.py │ │ ├── nn.py │ │ ├── random_forest.py │ │ ├── registry.py │ │ ├── sklearn.py │ │ └── xgboost.py │ ├── automl.py │ ├── base_automl.py │ ├── callbacks │ │ ├── __init__.py │ │ ├── callback_list.py │ │ ├── callback.py │ │ ├── early_stopping.py │ │ ├── learner_time_constraint.py │ │ ├── max_iters_constraint.py │ │ ├── metric_logger.py │ │ ├── terminate_on_nan.py │ │ └── total_time_constraint.py │ ├── ensemble.py │ ├── exceptions.py │ ├── fairness │ │ ├── __init__.py │ │ ├── metrics.py │ │ ├── optimization.py │ │ ├── plots.py │ │ ├── report.py │ │ └── utils.py │ ├── model_framework.py │ ├── preprocessing │ │ ├── __init__.py │ │ ├── datetime_transformer.py │ │ ├── encoding_selector.py │ │ ├── exclude_missing_target.py │ │ ├── goldenfeatures_transformer.py │ │ ├── kmeans_transformer.py │ │ ├── label_binarizer.py │ │ ├── label_encoder.py │ │ ├── preprocessing_categorical.py │ │ ├── preprocessing_missing.py │ │ ├── preprocessing_utils.py │ │ ├── preprocessing.py │ │ ├── scale.py │ │ └── text_transformer.py │ ├── tuner │ │ ├── __init__.py │ │ ├── data_info.py │ │ ├── hill_climbing.py │ │ ├── mljar_tuner.py │ │ ├── optuna │ │ │ ├── __init__.py │ │ │ ├── catboost.py │ │ │ ├── extra_trees.py │ │ │ ├── knn.py │ │ │ ├── lightgbm.py │ │ │ ├── nn.py │ │ │ ├── random_forest.py │ │ │ ├── tuner.py │ │ │ └── xgboost.py │ │ ├── preprocessing_tuner.py │ │ ├── random_parameters.py │ │ └── time_controller.py │ ├── utils │ │ ├── __init__.py │ │ ├── additional_metrics.py │ │ ├── additional_plots.py │ │ ├── automl_plots.py │ │ ├── common.py │ │ ├── config.py │ │ ├── constants.py │ │ ├── data_validation.py │ │ ├── importance.py │ │ ├── jsonencoder.py │ │ ├── leaderboard_plots.py │ │ ├── learning_curves.py │ │ ├── metric.py │ │ ├── shap.py │ │ ├── subsample.py │ │ └── utils.py │ └── validation │ ├── __init__.py │ ├── validation_step.py │ ├── validator_base.py │ ├── validator_custom.py │ ├── validator_kfold.py │ └── validator_split.py └── tests ├── __init__.py ├── checks │ ├── __init__.py │ ├── check_automl_with_regression.py │ ├── run_ml_tests.py │ └── run_performance_tests.py ├── conftest.py ├── data │ ├── 179.csv │ ├── 24.csv │ ├── 3.csv │ ├── 31.csv │ ├── 38.csv │ ├── 44.csv │ ├── 720.csv │ ├── 737.csv │ ├── acs_income_1k.csv │ ├── adult_missing_values_missing_target_500rows.csv │ ├── boston_housing.csv │ ├── CrimeData │ │ ├── cities.json │ │ ├── crimedata.csv │ │ └── README.md │ ├── Drug │ │ ├── Drug_Consumption.csv │ │ └── README.md │ ├── housing_regression_missing_values_missing_target.csv │ ├── iris_classes_missing_values_missing_target.csv │ ├── iris_missing_values_missing_target.csv │ ├── LawSchool │ │ ├── bar_pass_prediction.csv │ │ └── README.md │ ├── PortugeseBankMarketing │ │ └── Data_FinalProject.csv │ └── Titanic │ ├── test_with_Survived.csv │ └── train.csv ├── README.md ├── tests_algorithms │ ├── __init__.py │ ├── test_baseline.py │ ├── test_catboost.py │ ├── test_decision_tree.py │ ├── test_extra_trees.py │ ├── test_factory.py │ ├── test_knn.py │ ├── test_lightgbm.py │ ├── test_linear.py │ ├── test_nn.py │ ├── test_random_forest.py │ ├── test_registry.py │ └── test_xgboost.py ├── tests_automl │ ├── __init__.py │ ├── test_adjust_validation.py │ ├── test_automl_init.py │ ├── test_automl_report.py │ ├── test_automl_sample_weight.py │ ├── test_automl_time_constraints.py │ ├── test_automl.py │ ├── test_data_types.py │ ├── test_dir_change.py │ ├── test_explain_levels.py │ ├── test_golden_features.py │ ├── test_handle_imbalance.py │ ├── test_integration.py │ ├── test_joblib_version.py │ ├── test_models_needed_for_predict.py │ ├── test_prediction_after_load.py │ ├── test_repeated_validation.py │ ├── test_restore.py │ ├── test_stack_models_constraints.py │ ├── test_targets.py │ └── test_update_errors_report.py ├── tests_callbacks │ ├── __init__.py │ └── test_total_time_constraint.py ├── tests_ensemble │ ├── __init__.py │ └── test_save_load.py ├── tests_fairness │ ├── __init__.py │ ├── test_binary_classification.py │ ├── test_multi_class_classification.py │ └── test_regression.py ├── tests_preprocessing │ ├── __init__.py │ ├── disable_eda.py │ ├── test_categorical_integers.py │ ├── test_datetime_transformer.py │ ├── test_encoding_selector.py │ ├── test_exclude_missing.py │ ├── test_goldenfeatures_transformer.py │ ├── test_label_binarizer.py │ ├── test_label_encoder.py │ ├── test_preprocessing_missing.py │ ├── test_preprocessing_utils.py │ ├── test_preprocessing.py │ ├── test_scale.py │ └── test_text_transformer.py ├── tests_tuner │ ├── __init__.py │ ├── test_hill_climbing.py │ ├── test_time_controller.py │ └── test_tuner.py ├── tests_utils │ ├── __init__.py │ ├── test_compute_additional_metrics.py │ ├── test_importance.py │ ├── test_learning_curves.py │ ├── test_metric.py │ ├── test_shap.py │ └── test_subsample.py └── tests_validation ├── __init__.py ├── test_validator_kfold.py └── test_validator_split.py ``` # Files -------------------------------------------------------------------------------- /supervised/preprocessing/preprocessing_missing.py: -------------------------------------------------------------------------------- ```python import numpy as np import pandas as pd from supervised.preprocessing.preprocessing_utils import PreprocessingUtils class PreprocessingMissingValues(object): FILL_NA_MIN = "na_fill_min_1" FILL_NA_MEAN = "na_fill_mean" FILL_NA_MEDIAN = "na_fill_median" FILL_DATETIME = "na_fill_datetime" NA_EXCLUDE = "na_exclude" MISSING_VALUE = "_missing_value_" REMOVE_COLUMN = "remove_column" def __init__(self, columns=[], na_fill_method=FILL_NA_MEDIAN): self._columns = columns # fill method self._na_fill_method = na_fill_method # fill parameters stored as a dict, feature -> fill value self._na_fill_params = {} self._datetime_columns = [] def fit(self, X): X = self._fit_na_fill(X) def _fit_na_fill(self, X): for column in self._columns: if np.sum(pd.isnull(X[column]) == True) == 0: continue self._na_fill_params[column] = self._get_fill_value(X[column]) if PreprocessingUtils.get_type(X[column]) == PreprocessingUtils.DATETIME: self._datetime_columns += [column] def _get_fill_value(self, x): # categorical type if PreprocessingUtils.get_type(x) == PreprocessingUtils.CATEGORICAL: if self._na_fill_method == PreprocessingMissingValues.FILL_NA_MIN: return ( PreprocessingMissingValues.MISSING_VALUE ) # add new categorical value return PreprocessingUtils.get_most_frequent(x) # datetime if PreprocessingUtils.get_type(x) == PreprocessingUtils.DATETIME: return PreprocessingUtils.get_most_frequent(x) # text if PreprocessingUtils.get_type(x) == PreprocessingUtils.TEXT: return PreprocessingMissingValues.MISSING_VALUE # numerical type if self._na_fill_method == PreprocessingMissingValues.FILL_NA_MIN: return PreprocessingUtils.get_min(x) - 1.0 if self._na_fill_method == PreprocessingMissingValues.FILL_NA_MEAN: return PreprocessingUtils.get_mean(x) return PreprocessingUtils.get_median(x) def transform(self, X): X = self._transform_na_fill(X) # this is additional run through columns, # in case of transforming data with new columns with missing values # X = self._make_sure_na_filled(X) # disbaled for now return X def _transform_na_fill(self, X): for column, value in self._na_fill_params.items(): ind = pd.isnull(X.loc[:, column]) X.loc[ind, column] = value return X def _make_sure_na_filled(self, X): self._fit_na_fill(X) return self._transform_na_fill(X) def to_json(self): # prepare json with all parameters if len(self._na_fill_params) == 0: return {} params = { "fill_method": self._na_fill_method, "fill_params": self._na_fill_params, "datetime_columns": list(self._datetime_columns), } for col in self._datetime_columns: params["fill_params"][col] = str(params["fill_params"][col]) return params def from_json(self, params): if params is not None: self._na_fill_method = params.get("fill_method", None) self._na_fill_params = params.get("fill_params", {}) self._datetime_columns = params.get("datetime_columns", []) for col in self._datetime_columns: self._na_fill_params[col] = pd.to_datetime(self._na_fill_params[col]) else: self._na_fill_method, self._na_fill_params = None, None self._datetime_columns = [] ``` -------------------------------------------------------------------------------- /supervised/preprocessing/scale.py: -------------------------------------------------------------------------------- ```python import numpy as np from sklearn import preprocessing class Scale(object): SCALE_NORMAL = "scale_normal" SCALE_LOG_AND_NORMAL = "scale_log_and_normal" def __init__(self, columns=[], scale_method=SCALE_NORMAL): self.scale_method = scale_method self.columns = columns self.scale = preprocessing.StandardScaler( copy=True, with_mean=True, with_std=True ) self.X_min_values = None # it is used in SCALE_LOG_AND_NORMAL def fit(self, X): if len(self.columns): for c in self.columns: X[c] = X[c].astype(float) if self.scale_method == self.SCALE_NORMAL: self.scale.fit(X[self.columns]) elif self.scale_method == self.SCALE_LOG_AND_NORMAL: self.X_min_values = np.min(X[self.columns], axis=0) self.scale.fit(np.log(X[self.columns] - self.X_min_values + 1)) def transform(self, X): if len(self.columns): for c in self.columns: X[c] = X[c].astype(float) if self.scale_method == self.SCALE_NORMAL: X.loc[:, self.columns] = self.scale.transform(X[self.columns]) elif self.scale_method == self.SCALE_LOG_AND_NORMAL: X[self.columns] = np.log( np.clip( X[self.columns] - self.X_min_values + 1, a_min=1, a_max=None ) ) X.loc[:, self.columns] = self.scale.transform(X[self.columns]) return X def inverse_transform(self, X): if len(self.columns): if self.scale_method == self.SCALE_NORMAL: X.loc[:, self.columns] = self.scale.inverse_transform(X[self.columns]) elif self.scale_method == self.SCALE_LOG_AND_NORMAL: X[self.columns] = X[self.columns].astype("float64") X[self.columns] = self.scale.inverse_transform(X[self.columns]) X[self.columns] = np.exp(X[self.columns]) X.loc[:, self.columns] += self.X_min_values - 1 return X def to_json(self): if len(self.columns) == 0: return None data_json = { "scale": list(self.scale.scale_), "mean": list(self.scale.mean_), "var": list(self.scale.var_), "n_samples_seen": int(self.scale.n_samples_seen_), "n_features_in": int(self.scale.n_features_in_), "columns": self.columns, "scale_method": self.scale_method, } if self.X_min_values is not None: data_json["X_min_values"] = list(self.X_min_values) return data_json def from_json(self, data_json): self.scale = preprocessing.StandardScaler( copy=True, with_mean=True, with_std=True ) self.scale.scale_ = data_json.get("scale") if self.scale.scale_ is not None: self.scale.scale_ = np.array(self.scale.scale_) self.scale.mean_ = data_json.get("mean") if self.scale.mean_ is not None: self.scale.mean_ = np.array(self.scale.mean_) self.scale.var_ = data_json.get("var") if self.scale.var_ is not None: self.scale.var_ = np.array(self.scale.var_) self.scale.n_samples_seen_ = int(data_json.get("n_samples_seen")) self.scale.n_features_in_ = int(data_json.get("n_features_in")) self.columns = data_json.get("columns", []) self.scale.feature_names_in_ = data_json.get("columns") self.scale_method = data_json.get("scale_method") self.X_min_values = data_json.get("X_min_values") if self.X_min_values is not None: self.X_min_values = np.array(self.X_min_values) ``` -------------------------------------------------------------------------------- /supervised/preprocessing/kmeans_transformer.py: -------------------------------------------------------------------------------- ```python import os import time import joblib import numpy as np from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from supervised.exceptions import AutoMLException class KMeansTransformer(object): def __init__(self, results_path=None, model_name=None, k_fold=None): self._new_features = [] self._input_columns = [] self._error = None self._kmeans = None self._scale = None self._model_name = model_name self._k_fold = k_fold if results_path is not None: self._result_file = os.path.join( self._model_name, f"kmeans_fold_{k_fold}.joblib" ) self._result_path = os.path.join(results_path, self._result_file) # self.try_load() def fit(self, X, y): if self._new_features: return if self._error is not None and self._error: raise AutoMLException( "KMeans Features not created due to error (please check errors.md). " + self._error ) return if X.shape[1] == 0: self._error = f"KMeans not created. No continous features. Input data shape: {X.shape}, {y.shape}" raise AutoMLException("KMeans Features not created. No continous features.") start_time = time.time() n_clusters = int(np.log10(X.shape[0]) * 8) n_clusters = max(8, n_clusters) n_clusters = min(n_clusters, X.shape[1]) self._input_columns = X.columns.tolist() # scale data self._scale = StandardScaler(copy=True, with_mean=True, with_std=True) X = self._scale.fit_transform(X) # Kmeans self._kmeans = kmeans = MiniBatchKMeans(n_clusters=n_clusters, init="k-means++") self._kmeans.fit(X) self._create_new_features_names() # print( # f"Created {len(self._new_features)} KMeans Features in {np.round(time.time() - start_time,2)} seconds." # ) def _create_new_features_names(self): n_clusters = self._kmeans.cluster_centers_.shape[0] self._new_features = [f"Dist_Cluster_{i}" for i in range(n_clusters)] self._new_features += ["Cluster"] def transform(self, X): if self._kmeans is None: raise AutoMLException("KMeans not fitted") # scale X_scaled = self._scale.transform(X[self._input_columns]) # kmeans distances = self._kmeans.transform(X_scaled) clusters = self._kmeans.predict(X_scaled) X[self._new_features[:-1]] = distances X[self._new_features[-1]] = clusters return X def to_json(self): self.save() data_json = { "new_features": self._new_features, "result_file": self._result_file, "input_columns": self._input_columns, } if self._error is not None and self._error: data_json["error"] = self._error return data_json def from_json(self, data_json, results_path): self._new_features = data_json.get("new_features", []) self._input_columns = data_json.get("input_columns", []) self._result_file = data_json.get("result_file") self._result_path = os.path.join(results_path, self._result_file) self._error = data_json.get("error") self.try_load() def save(self): joblib.dump( {"kmeans": self._kmeans, "scale": self._scale}, self._result_path, compress=True, ) def try_load(self): if os.path.exists(self._result_path): data = joblib.load(self._result_path) self._kmeans = data["kmeans"] self._scale = data["scale"] self._create_new_features_names() ``` -------------------------------------------------------------------------------- /tests/tests_automl/test_handle_imbalance.py: -------------------------------------------------------------------------------- ```python import shutil import unittest import numpy as np import pandas as pd from supervised import AutoML from supervised.algorithms.random_forest import additional from supervised.algorithms.registry import MULTICLASS_CLASSIFICATION additional["max_steps"] = 1 additional["trees_in_step"] = 1 from supervised.algorithms.xgboost import additional additional["max_rounds"] = 1 class AutoMLHandleImbalanceTest(unittest.TestCase): automl_dir = "AutoMLHandleImbalanceTest" def tearDown(self): shutil.rmtree(self.automl_dir, ignore_errors=True) def test_handle_drastic_imbalance(self): a = AutoML( results_path=self.automl_dir, total_time_limit=10, algorithms=["Random Forest"], train_ensemble=False, validation_strategy={ "validation_type": "kfold", "k_folds": 10, "shuffle": True, "stratify": True, }, start_random_models=1, ) rows = 100 X = pd.DataFrame( { "f1": np.random.rand(rows), "f2": np.random.rand(rows), "f3": np.random.rand(rows), } ) y = np.ones(rows) y[:8] = 0 y[10:12] = 2 y = pd.Series(np.array(y), name="target") a._ml_task = MULTICLASS_CLASSIFICATION a._handle_drastic_imbalance(X, y) self.assertEqual(X.shape[0], 130) self.assertEqual(X.shape[1], 3) self.assertEqual(y.shape[0], 130) def test_handle_drastic_imbalance_sample_weight(self): a = AutoML( results_path=self.automl_dir, total_time_limit=10, algorithms=["Random Forest"], train_ensemble=False, validation_strategy={ "validation_type": "kfold", "k_folds": 10, "shuffle": True, "stratify": True, }, start_random_models=1, ) rows = 100 X = pd.DataFrame( { "f1": np.random.rand(rows), "f2": np.random.rand(rows), "f3": np.random.rand(rows), } ) y = np.ones(rows) sample_weight = pd.Series(np.array(range(rows)), name="sample_weight") y[:1] = 0 y[10:11] = 2 y = pd.Series(np.array(y), name="target") a._ml_task = MULTICLASS_CLASSIFICATION a._handle_drastic_imbalance(X, y, sample_weight) self.assertEqual(X.shape[0], 138) self.assertEqual(X.shape[1], 3) self.assertEqual(y.shape[0], 138) self.assertEqual(np.sum(sample_weight[100:119]), 0) self.assertEqual(np.sum(sample_weight[119:138]), 19 * 10) def test_imbalance_dont_change_data_after_fit(self): a = AutoML( results_path=self.automl_dir, total_time_limit=5, train_ensemble=False, validation_strategy={ "validation_type": "kfold", "k_folds": 10, "shuffle": True, "stratify": True, }, start_random_models=1, explain_level=0, ) rows = 100 X = pd.DataFrame( { "f1": np.random.rand(rows), "f2": np.random.rand(rows), "f3": np.random.rand(rows), } ) y = np.ones(rows) y[:8] = 0 y[10:12] = 2 sample_weight = np.ones(rows) a.fit(X, y, sample_weight=sample_weight) # original data **without** inserted samples to handle imbalance self.assertEqual(X.shape[0], rows) self.assertEqual(y.shape[0], rows) self.assertEqual(sample_weight.shape[0], rows) ``` -------------------------------------------------------------------------------- /tests/tests_algorithms/test_random_forest.py: -------------------------------------------------------------------------------- ```python import os import tempfile import unittest from numpy.testing import assert_almost_equal from sklearn import datasets from supervised.algorithms.random_forest import ( RandomForestAlgorithm, RandomForestRegressorAlgorithm, additional, regression_additional, ) from supervised.utils.metric import Metric additional["trees_in_step"] = 1 regression_additional["trees_in_step"] = 1 additional["max_steps"] = 1 regression_additional["max_steps"] = 1 class RandomForestRegressorAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_regression( n_samples=100, n_features=5, n_informative=4, shuffle=False, random_state=0 ) def test_reproduce_fit(self): metric = Metric({"name": "mse"}) params = {"trees_in_step": 1, "seed": 1, "ml_task": "regression"} prev_loss = None for _ in range(3): model = RandomForestRegressorAlgorithm(params) model.fit(self.X, self.y) y_predicted = model.predict(self.X) loss = metric(self.y, y_predicted) if prev_loss is not None: assert_almost_equal(prev_loss, loss) prev_loss = loss class RandomForestAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_classification( n_samples=100, n_features=5, n_informative=4, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) def test_reproduce_fit(self): metric = Metric({"name": "logloss"}) params = {"trees_in_step": 1, "seed": 1, "ml_task": "binary_classification"} prev_loss = None for _ in range(3): model = RandomForestAlgorithm(params) model.fit(self.X, self.y) y_predicted = model.predict(self.X) loss = metric(self.y, y_predicted) if prev_loss is not None: assert_almost_equal(prev_loss, loss) prev_loss = loss def test_fit_predict(self): metric = Metric({"name": "logloss"}) params = {"ml_task": "binary_classification"} rf = RandomForestAlgorithm(params) rf.fit(self.X, self.y) y_predicted = rf.predict(self.X) self.assertTrue(metric(self.y, y_predicted) < 1.5) def test_copy(self): metric = Metric({"name": "logloss"}) rf = RandomForestAlgorithm({"ml_task": "binary_classification"}) rf.fit(self.X, self.y) y_predicted = rf.predict(self.X) loss = metric(self.y, y_predicted) rf2 = RandomForestAlgorithm({"ml_task": "binary_classification"}) rf2 = rf.copy() self.assertEqual(type(rf), type(rf2)) y_predicted = rf2.predict(self.X) loss2 = metric(self.y, y_predicted) assert_almost_equal(loss, loss2) def test_save_and_load(self): metric = Metric({"name": "logloss"}) rf = RandomForestAlgorithm({"ml_task": "binary_classification"}) rf.fit(self.X, self.y) y_predicted = rf.predict(self.X) loss = metric(self.y, y_predicted) filename = os.path.join(tempfile.gettempdir(), os.urandom(12).hex()) rf.save(filename) rf2 = RandomForestAlgorithm({"ml_task": "binary_classification"}) rf2.load(filename) # Finished with the file, delete it os.remove(filename) y_predicted = rf2.predict(self.X) loss2 = metric(self.y, y_predicted) assert_almost_equal(loss, loss2) def test_is_fitted(self): model = RandomForestAlgorithm({"ml_task": "binary_classification"}) self.assertFalse(model.is_fitted()) model.fit(self.X, self.y) self.assertTrue(model.is_fitted()) ``` -------------------------------------------------------------------------------- /tests/tests_algorithms/test_extra_trees.py: -------------------------------------------------------------------------------- ```python import os import tempfile import unittest from numpy.testing import assert_almost_equal from sklearn import datasets from supervised.algorithms.extra_trees import ( ExtraTreesAlgorithm, ExtraTreesRegressorAlgorithm, additional, regression_additional, ) from supervised.utils.metric import Metric additional["trees_in_step"] = 1 regression_additional["trees_in_step"] = 1 additional["max_steps"] = 1 regression_additional["max_steps"] = 1 class ExtraTreesRegressorAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_regression( n_samples=100, n_features=5, n_informative=4, shuffle=False, random_state=0 ) def test_reproduce_fit(self): metric = Metric({"name": "mse"}) params = {"trees_in_step": 1, "seed": 1, "ml_task": "regression"} prev_loss = None for _ in range(3): model = ExtraTreesRegressorAlgorithm(params) model.fit(self.X, self.y) y_predicted = model.predict(self.X) loss = metric(self.y, y_predicted) if prev_loss is not None: assert_almost_equal(prev_loss, loss) prev_loss = loss class ExtraTreesAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_classification( n_samples=100, n_features=5, n_informative=4, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) def test_reproduce_fit(self): metric = Metric({"name": "logloss"}) params = {"trees_in_step": 1, "seed": 1, "ml_task": "binary_classification"} prev_loss = None for _ in range(3): model = ExtraTreesAlgorithm(params) model.fit(self.X, self.y) y_predicted = model.predict(self.X) loss = metric(self.y, y_predicted) if prev_loss is not None: assert_almost_equal(prev_loss, loss) prev_loss = loss def test_fit_predict(self): metric = Metric({"name": "logloss"}) params = {"trees_in_step": 50, "ml_task": "binary_classification"} rf = ExtraTreesAlgorithm(params) rf.fit(self.X, self.y) y_predicted = rf.predict(self.X) self.assertTrue(metric(self.y, y_predicted) < 0.6) def test_copy(self): metric = Metric({"name": "logloss"}) rf = ExtraTreesAlgorithm({"ml_task": "binary_classification"}) rf.fit(self.X, self.y) y_predicted = rf.predict(self.X) loss = metric(self.y, y_predicted) rf2 = ExtraTreesAlgorithm({"ml_task": "binary_classification"}) rf2 = rf.copy() self.assertEqual(type(rf), type(rf2)) y_predicted = rf2.predict(self.X) loss2 = metric(self.y, y_predicted) assert_almost_equal(loss, loss2) def test_save_and_load(self): metric = Metric({"name": "logloss"}) rf = ExtraTreesAlgorithm({"ml_task": "binary_classification"}) rf.fit(self.X, self.y) y_predicted = rf.predict(self.X) loss = metric(self.y, y_predicted) filename = os.path.join(tempfile.gettempdir(), os.urandom(12).hex()) rf.save(filename) rf2 = ExtraTreesAlgorithm({"ml_task": "binary_classification"}) rf2.load(filename) # Finished with the file, delete it os.remove(filename) y_predicted = rf2.predict(self.X) loss2 = metric(self.y, y_predicted) assert_almost_equal(loss, loss2) def test_is_fitted(self): params = {"trees_in_step": 50, "ml_task": "binary_classification"} model = ExtraTreesAlgorithm(params) self.assertFalse(model.is_fitted()) model.fit(self.X, self.y) self.assertTrue(model.is_fitted()) ``` -------------------------------------------------------------------------------- /tests/tests_algorithms/test_lightgbm.py: -------------------------------------------------------------------------------- ```python import os import tempfile import unittest import numpy as np import pandas as pd from numpy.testing import assert_almost_equal from sklearn import datasets from supervised.algorithms.lightgbm import LightgbmAlgorithm, additional from supervised.utils.metric import Metric additional["max_rounds"] = 1 class LightgbmAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_classification( n_samples=100, n_features=5, n_informative=4, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) cls.params = { "metric": "binary_logloss", "num_leaves": "2", "learning_rate": 0.1, "feature_fraction": 0.8, "bagging_fraction": 0.8, "bagging_freq": 1, "seed": 1, "early_stopping_rounds": 0, } def test_reproduce_fit(self): metric = Metric({"name": "logloss"}) prev_loss = None for i in range(3): model = LightgbmAlgorithm(self.params) model.fit(self.X, self.y) y_predicted = model.predict(self.X) loss = metric(self.y, y_predicted) if prev_loss is not None: assert_almost_equal(prev_loss, loss) prev_loss = loss def test_fit_predict(self): metric = Metric({"name": "logloss"}) lgb = LightgbmAlgorithm(self.params) lgb.fit(self.X, self.y) y_predicted = lgb.predict(self.X) loss = metric(self.y, y_predicted) self.assertTrue(loss < 0.7) def test_copy(self): # train model #1 metric = Metric({"name": "logloss"}) lgb = LightgbmAlgorithm(self.params) lgb.fit(self.X, self.y) y_predicted = lgb.predict(self.X) loss = metric(self.y, y_predicted) # create model #2 lgb2 = LightgbmAlgorithm(self.params) # model #2 is set to None, while initialized self.assertTrue(lgb2.model is None) # do a copy and use it for predictions lgb2 = lgb.copy() self.assertEqual(type(lgb), type(lgb2)) y_predicted = lgb2.predict(self.X) loss2 = metric(self.y, y_predicted) self.assertEqual(loss, loss2) def test_save_and_load(self): metric = Metric({"name": "logloss"}) lgb = LightgbmAlgorithm(self.params) lgb.fit(self.X, self.y) y_predicted = lgb.predict(self.X) loss = metric(self.y, y_predicted) filename = os.path.join(tempfile.gettempdir(), os.urandom(12).hex()) lgb.save(filename) lgb2 = LightgbmAlgorithm({}) self.assertTrue(lgb.uid != lgb2.uid) self.assertTrue(lgb2.model is None) lgb2.load(filename) # Finished with the file, delete it os.remove(filename) y_predicted = lgb2.predict(self.X) loss2 = metric(self.y, y_predicted) assert_almost_equal(loss, loss2) def test_get_metric_name(self): model = LightgbmAlgorithm(self.params) self.assertEqual(model.get_metric_name(), "logloss") def test_restricted_characters_in_feature_name(self): df = pd.DataFrame( { "y": np.random.randint(0, 2, size=100), "[test1]": np.random.uniform(0, 1, size=100), "test2 < 1": np.random.uniform(0, 1, size=100), } ) y = df.iloc[:, 0] X = df.iloc[:, 1:] metric = Metric({"name": "logloss"}) params = {"objective": "binary:logistic", "eval_metric": "logloss"} lgb = LightgbmAlgorithm(self.params) lgb.fit(X, y) lgb.predict(X) def test_is_fitted(self): model = LightgbmAlgorithm(self.params) self.assertFalse(model.is_fitted()) model.fit(self.X, self.y) self.assertTrue(model.is_fitted()) ``` -------------------------------------------------------------------------------- /supervised/preprocessing/preprocessing_utils.py: -------------------------------------------------------------------------------- ```python import numpy as np import pandas as pd from scipy import stats from sklearn import preprocessing class PreprocessingUtilsException(Exception): pass class PreprocessingUtils(object): CATEGORICAL = "categorical" CONTINOUS = "continous" DISCRETE = "discrete" DATETIME = "datetime" TEXT = "text" @staticmethod def get_type(x): if len(x.shape) > 1: if x.shape[1] != 1: raise PreprocessingUtilsException( "Please select one column to get its type" ) col_type = str(x.dtype) data_type = PreprocessingUtils.CATEGORICAL if col_type.startswith("float"): data_type = PreprocessingUtils.CONTINOUS elif col_type.startswith("int") or col_type.startswith("uint"): data_type = PreprocessingUtils.DISCRETE elif col_type.startswith("datetime"): data_type = PreprocessingUtils.DATETIME elif col_type.startswith("category"): # do not check the additional condition for text feature # treat it as categorical return PreprocessingUtils.CATEGORICAL if data_type == PreprocessingUtils.CATEGORICAL: # check maybe this categorical is a text # it is a text, if: # has more than 200 unique values # more than half of rows is unique unique_cnt = len(np.unique(x[~pd.isnull(x)])) if unique_cnt > 200 and unique_cnt > int(0.5 * x.shape[0]): data_type = PreprocessingUtils.TEXT return data_type @staticmethod def is_categorical(x_org): x = x_org[~pd.isnull(x_org)] return PreprocessingUtils.get_type(x) == PreprocessingUtils.CATEGORICAL @staticmethod def is_datetime(x_org): x = x_org[~pd.isnull(x_org)] return PreprocessingUtils.get_type(x) == PreprocessingUtils.DATETIME @staticmethod def is_text(x_org): x = x_org[~pd.isnull(x_org)] return PreprocessingUtils.get_type(x) == PreprocessingUtils.TEXT @staticmethod def is_0_1(x_org): x = x_org[~pd.isnull(x_org)] u = np.unique(x) if len(u) != 2: return False return 0 in u and 1 in u @staticmethod def num_class(x_org): x = x_org[~pd.isnull(x_org)] u = np.unique(x) return len(u) @staticmethod def is_scale_needed(x_org): x = x_org[~pd.isnull(x_org)] abs_avg = np.abs(np.mean(x)) stddev = np.std(x) if abs_avg > 0.5 or stddev > 1.5: return True return False @staticmethod def is_log_scale_needed(x_org): x_full = np.array(x_org[~pd.isnull(x_org)]) # first scale on raw data x = preprocessing.scale(x_full) # second scale on log data x_log = preprocessing.scale(np.log(x_full - np.min(x_full) + 1)) # the old approach, let's check how new approach will work # original_skew = np.abs(stats.skew(x)) # log_skew = np.abs(stats.skew(x_log)) # return log_skew < original_skew ######################################################################## # p is probability of being normal distributions k2, p1 = stats.normaltest(x) k2, p2 = stats.normaltest(x_log) return p2 > p1 @staticmethod def is_na(x): return np.sum(pd.isnull(x) == True) > 0 @staticmethod def get_most_frequent(x): a = x.value_counts() first = sorted(dict(a).items(), key=lambda x: -x[1])[0] return first[0] @staticmethod def get_min(x): v = np.amin(np.nanmin(x)) if pd.isnull(v): return 0 return float(v) @staticmethod def get_mean(x): v = np.nanmean(x) if pd.isnull(v): return 0 return float(v) @staticmethod def get_median(x): v = np.nanmedian(x) if pd.isnull(v): return 0 return float(v) ``` -------------------------------------------------------------------------------- /tests/tests_fairness/test_binary_classification.py: -------------------------------------------------------------------------------- ```python import shutil import unittest import numpy as np import pandas as pd from supervised import AutoML class FairnessInBinaryClassificationTest(unittest.TestCase): automl_dir = "automl_fairness_testing" def tearDown(self): shutil.rmtree(self.automl_dir, ignore_errors=True) def test_init(self): X = np.random.uniform(size=(30, 2)) y = np.random.randint(0, 2, size=(30,)) S = pd.DataFrame({"sensitive": ["A", "B"] * 15}) automl = AutoML( results_path=self.automl_dir, model_time_limit=10, algorithms=["Xgboost"], explain_level=0, train_ensemble=False, stack_models=False, validation_strategy={"validation_type": "split"}, start_random_models=1, ) automl.fit(X, y, sensitive_features=S) self.assertGreater(len(automl._models), 0) sensitive_features_names = automl._models[0].get_sensitive_features_names() self.assertEqual(len(sensitive_features_names), 1) self.assertTrue("sensitive" in sensitive_features_names) self.assertTrue(automl._models[0].get_fairness_metric("sensitive") is not None) self.assertTrue(len(automl._models[0].get_fairness_optimization()) > 1) self.assertTrue(automl._models[0].get_worst_fairness() is not None) self.assertTrue(automl._models[0].get_best_fairness() is not None) def test_arguments(self): X = np.random.uniform(size=(30, 2)) y = np.random.randint(0, 2, size=(30,)) S = pd.DataFrame({"sensitive": ["A", "B"] * 15}) automl = AutoML( results_path=self.automl_dir, model_time_limit=10, algorithms=["Xgboost"], privileged_groups=[{"sensitive": "A"}], underprivileged_groups=[{"sensitive": "B"}], fairness_metric="demographic_parity_ratio", fairness_threshold=0.2, explain_level=0, train_ensemble=False, stack_models=False, validation_strategy={"validation_type": "split"}, start_random_models=1, ) automl.fit(X, y, sensitive_features=S) self.assertGreater(len(automl._models), 0) def test_wrong_metric_name(self): X = np.random.uniform(size=(30, 2)) y = np.random.randint(0, 2, size=(30,)) S = pd.DataFrame({"sensitive": ["A", "B"] * 15}) with self.assertRaises(ValueError) as context: automl = AutoML( results_path=self.automl_dir, model_time_limit=10, algorithms=["Xgboost"], privileged_groups=[{"sensitive": "A"}], underprivileged_groups=[{"sensitive": "B"}], fairness_metric="wrong_metric_name", fairness_threshold=0.2, explain_level=0, train_ensemble=False, stack_models=False, validation_strategy={"validation_type": "split"}, start_random_models=1, ) automl.fit(X, y, sensitive_features=S) self.assertTrue("is not allowed" in str(context.exception)) def test_two_sensitive_features(self): X = np.random.uniform(size=(30, 2)) y = np.random.randint(0, 2, size=(30,)) S = pd.DataFrame( { "sensitive_1": ["White", "Black"] * 15, "sensitive_2": ["Male", "Female"] * 15, } ) automl = AutoML( results_path=self.automl_dir, model_time_limit=10, algorithms=["Xgboost"], explain_level=0, train_ensemble=False, stack_models=False, start_random_models=1, ) automl.fit(X, y, sensitive_features=S) self.assertGreater(len(automl._models), 0) sensitive_features_names = automl._models[0].get_sensitive_features_names() self.assertEqual(len(sensitive_features_names), 2) ``` -------------------------------------------------------------------------------- /supervised/fairness/plots.py: -------------------------------------------------------------------------------- ```python import numpy as np from matplotlib import pyplot as plt class FairnessPlots: @staticmethod def binary_classification( fairness_metric, col_name, metrics, selection_rates, max_selection_rate, fairness_threshold, ): figures = [] # selection rate figure fair_selection_rate = max_selection_rate * fairness_threshold fig = plt.figure(figsize=(10, 7)) ax1 = fig.add_subplot(1, 1, 1) bars = ax1.bar(metrics.index[1:], metrics["Selection Rate"][1:]) ax1.spines[["right", "top", "left"]].set_visible(False) ax1.yaxis.set_visible(False) _ = ax1.bar_label(bars, padding=5) if fairness_metric == "demographic_parity_ratio": ax1.axhline(y=fair_selection_rate, zorder=0, color="grey", ls="--", lw=1.5) _ = ax1.text( y=fair_selection_rate, x=-0.6, s="Fairness threshold", ha="center", fontsize=12, bbox=dict(facecolor="white", edgecolor="grey", ls="--"), ) _ = ax1.text( y=1.2 * fair_selection_rate, x=-0.6, s="Fair", ha="center", fontsize=12, ) _ = ax1.text( y=0.8 * fair_selection_rate, x=-0.6, s="Unfair", ha="center", fontsize=12, ) ax1.axhspan( fairness_threshold * max_selection_rate, 1.25 * np.max(selection_rates[1:]), color="green", alpha=0.05, ) ax1.axhspan( 0, fairness_threshold * max_selection_rate, color="red", alpha=0.05 ) figures += [ { "title": f"Selection Rate for {col_name}", "fname": f"selection_rate_{col_name}.png", "figure": fig, } ] fig, axes = plt.subplots(figsize=(10, 5), ncols=2, sharey=True) fig.tight_layout() bars = axes[0].barh( metrics.index[1:], metrics["False Negative Rate"][1:], zorder=10, color="tab:orange", ) xmax = 1.2 * max( metrics["False Negative Rate"][1:].max(), metrics["False Positive Rate"][1:].max(), ) axes[0].set_xlim(0, xmax) axes[0].invert_xaxis() axes[0].set_title("False Negative Rate") _ = axes[0].bar_label(bars, padding=5) bars = axes[1].barh( metrics.index[1:], metrics["False Positive Rate"][1:], zorder=10, color="tab:blue", ) axes[1].tick_params(axis="y", colors="tab:orange") # tick color axes[1].set_xlim(0, xmax) axes[1].set_title("False Positive Rate") _ = axes[1].bar_label(bars, padding=5) _ = plt.subplots_adjust(wspace=0, top=0.85, bottom=0.1, left=0.18, right=0.95) figures += [ { "title": f"False Rates for {col_name}", "fname": f"false_rates_{col_name}.png", "figure": fig, } ] return figures @staticmethod def regression(fairness_metric, col_name, metrics, fairness_metric_name): figures = [] metric_name = fairness_metric.split("@")[1].upper() fig = plt.figure(figsize=(10, 7)) ax1 = fig.add_subplot(1, 1, 1) bars = ax1.bar(metrics.index[1:], metrics[metric_name][1:]) ax1.spines[["right", "top"]].set_visible(False) # ax1.yaxis.set_visible(False) ax1.set_ylabel(metric_name) _ = ax1.bar_label(bars, padding=5) figures += [ { "title": f"{metric_name} for {col_name}", "fname": f"{metric_name}_{col_name}.png", "figure": fig, } ] return figures ``` -------------------------------------------------------------------------------- /supervised/validation/validator_custom.py: -------------------------------------------------------------------------------- ```python import logging import os import joblib import numpy as np log = logging.getLogger(__name__) from supervised.exceptions import AutoMLException from supervised.utils.utils import load_data from supervised.validation.validator_base import BaseValidator class CustomValidator(BaseValidator): def __init__(self, params): BaseValidator.__init__(self, params) cv_path = self.params.get("cv_path") if cv_path is None: raise AutoMLException("You need to specify `cv` as list or iterable") self.cv = joblib.load(cv_path) self.cv = list(self.cv) self._results_path = self.params.get("results_path") self._X_path = self.params.get("X_path") self._y_path = self.params.get("y_path") self._sample_weight_path = self.params.get("sample_weight_path") self._sensitive_features_path = self.params.get("sensitive_features_path") if self._X_path is None or self._y_path is None: raise AutoMLException("No data path set in CustomValidator params") folds_path = os.path.join(self._results_path, "folds") if not os.path.exists(folds_path): os.mkdir(folds_path) print("Custom validation strategy") for fold_cnt, (train_index, validation_index) in enumerate(self.cv): print(f"Split {fold_cnt}.") print(f"Train {train_index.shape[0]} samples.") print(f"Validation {validation_index.shape[0]} samples.") train_index_file = os.path.join( self._results_path, "folds", f"fold_{fold_cnt}_train_indices.npy", ) validation_index_file = os.path.join( self._results_path, "folds", f"fold_{fold_cnt}_validation_indices.npy", ) np.save(train_index_file, train_index) np.save(validation_index_file, validation_index) else: log.debug("Folds split already done, reuse it") def get_split(self, k, repeat=0): try: train_index_file = os.path.join( self._results_path, "folds", f"fold_{k}_train_indices.npy" ) validation_index_file = os.path.join( self._results_path, "folds", f"fold_{k}_validation_indices.npy" ) train_index = np.load(train_index_file) validation_index = np.load(validation_index_file) X = load_data(self._X_path) y = load_data(self._y_path) y = y["target"] sample_weight = None if self._sample_weight_path is not None: sample_weight = load_data(self._sample_weight_path) sample_weight = sample_weight["sample_weight"] sensitive_features = None if self._sensitive_features_path is not None: sensitive_features = load_data(self._sensitive_features_path) train_data = {"X": X.iloc[train_index], "y": y.iloc[train_index]} validation_data = { "X": X.iloc[validation_index], "y": y.iloc[validation_index], } if sample_weight is not None: train_data["sample_weight"] = sample_weight.iloc[train_index] validation_data["sample_weight"] = sample_weight.iloc[validation_index] if sensitive_features is not None: train_data["sensitive_features"] = sensitive_features.iloc[train_index] validation_data["sensitive_features"] = sensitive_features.iloc[ validation_index ] except Exception as e: import traceback print(traceback.format_exc()) raise AutoMLException("Problem with custom validation. " + str(e)) return (train_data, validation_data) def get_n_splits(self): return len(self.cv) def get_repeats(self): return 1 ``` -------------------------------------------------------------------------------- /tests/tests_automl/test_integration.py: -------------------------------------------------------------------------------- ```python import shutil import unittest import numpy as np import pandas as pd from sklearn import datasets from supervised import AutoML class AutoMLIntegrationTest(unittest.TestCase): automl_dir = "AutoMLIntegrationTest" def tearDown(self): shutil.rmtree(self.automl_dir, ignore_errors=True) def test_integration(self): a = AutoML( results_path=self.automl_dir, total_time_limit=1, explain_level=0, start_random_models=1, ) X, y = datasets.make_classification( n_samples=100, n_features=5, n_informative=4, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) a.fit(X, y) p = a.predict(X) self.assertIsInstance(p, np.ndarray) self.assertEqual(len(p), X.shape[0]) def test_one_column_input_regression(self): a = AutoML( results_path=self.automl_dir, total_time_limit=5, explain_level=0, start_random_models=1, ) X, y = datasets.make_regression(n_features=1) a.fit(X, y) p = a.predict(X) self.assertIsInstance(p, np.ndarray) self.assertEqual(len(p), X.shape[0]) def test_one_column_input_bin_class(self): a = AutoML( results_path=self.automl_dir, total_time_limit=5, explain_level=0, start_random_models=1, ) X = pd.DataFrame({"feature_1": np.random.rand(100)}) y = (np.random.rand(X.shape[0]) > 0.5).astype(int) a.fit(X, y) p = a.predict(X) self.assertIsInstance(p, np.ndarray) self.assertEqual(len(p), X.shape[0]) def test_different_input_types(self): """Test the different data input types for AutoML""" model = AutoML( total_time_limit=10, explain_level=0, start_random_models=1, algorithms=["Linear"], verbose=0, ) X, y = datasets.make_regression() # First test - X and y as numpy arrays pred = model.fit(X, y).predict(X) self.assertIsInstance(pred, np.ndarray) self.assertEqual(len(pred), X.shape[0]) del model model = AutoML( total_time_limit=10, explain_level=0, start_random_models=1, algorithms=["Linear"], verbose=0, ) # Second test - X and y as pandas dataframe X_pandas = pd.DataFrame(X) y_pandas = pd.DataFrame(y) pred_pandas = model.fit(X_pandas, y_pandas).predict(X_pandas) self.assertIsInstance(pred_pandas, np.ndarray) self.assertEqual(len(pred_pandas), X.shape[0]) del model model = AutoML( total_time_limit=10, explain_level=0, start_random_models=1, algorithms=["Linear"], verbose=0, ) # Third test - X and y as lists X_list = pd.DataFrame(X).values.tolist() y_list = pd.DataFrame(y).values.tolist() pred_list = model.fit(X_pandas, y_pandas).predict(X_pandas) self.assertIsInstance(pred_list, np.ndarray) self.assertEqual(len(pred_list), X.shape[0]) def test_integration_float16_data(self): a = AutoML( results_path=self.automl_dir, total_time_limit=1, explain_level=0, start_random_models=1, ) X, y = datasets.make_classification( n_samples=100, n_features=5, n_informative=4, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) X = pd.DataFrame(X) X = X.astype(float) a.fit(X, y) p = a.predict(X) self.assertIsInstance(p, np.ndarray) self.assertEqual(len(p), X.shape[0]) ``` -------------------------------------------------------------------------------- /supervised/algorithms/algorithm.py: -------------------------------------------------------------------------------- ```python import uuid import numpy as np from sklearn.base import BaseEstimator from supervised.utils.common import construct_learner_name from supervised.utils.importance import PermutationImportance from supervised.utils.shap import PlotSHAP class BaseAlgorithm(BaseEstimator): """ This is an abstract class. All algorithms inherit from BaseAlgorithm. """ algorithm_name = "Unknown" algorithm_short_name = "Unknown" def __init__(self, params): self.params = params self.stop_training = False self.library_version = None self.model = None self.uid = params.get("uid", str(uuid.uuid4())) self.ml_task = params.get("ml_task") self.model_file_path = None self.name = "amazing_learner" def set_learner_name(self, fold, repeat, repeats): self.name = construct_learner_name(fold, repeat, repeats) def is_fitted(self): # base class method return False def reload(self): if not self.is_fitted() and self.model_file_path is not None: self.load(self.model_file_path) def fit( self, X, y, sample_weight=None, X_validation=None, y_validation=None, sample_weight_validation=None, log_to_file=None, max_time=None, ): pass def predict(self, X): pass # needed for feature importance def predict_proba(self, X): y = self.predict(X) if "num_class" in self.params and self.params["num_class"] > 2: return y return np.column_stack((1 - y, y)) def update(self, update_params): pass def copy(self): pass def save(self, model_file_path): pass def load(self, model_file_path): pass def get_fname(self): return f"{self.name}.{self.file_extension()}" def interpret( self, X_train, y_train, X_validation, y_validation, model_file_path, learner_name, target_name=None, class_names=None, metric_name=None, ml_task=None, explain_level=2, ): # do not produce feature importance for Baseline if self.algorithm_short_name == "Baseline": return if explain_level > 0: PermutationImportance.compute_and_plot( self, X_validation, y_validation, model_file_path, learner_name, metric_name, ml_task, self.params.get("n_jobs", -1), ) if explain_level > 1: PlotSHAP.compute( self, X_train, y_train, X_validation, y_validation, model_file_path, learner_name, class_names, ml_task, ) def get_metric_name(self): return None def get_params(self): params = { "library_version": self.library_version, "algorithm_name": self.algorithm_name, "algorithm_short_name": self.algorithm_short_name, "uid": self.uid, "params": self.params, "name": self.name, } if hasattr(self, "best_ntree_limit") and self.best_ntree_limit is not None: params["best_ntree_limit"] = self.best_ntree_limit return params def set_params(self, json_desc, learner_path): self.library_version = json_desc.get("library_version", self.library_version) self.algorithm_name = json_desc.get("algorithm_name", self.algorithm_name) self.algorithm_short_name = json_desc.get( "algorithm_short_name", self.algorithm_short_name ) self.uid = json_desc.get("uid", self.uid) self.params = json_desc.get("params", self.params) self.name = json_desc.get("name", self.name) self.model_file_path = learner_path if hasattr(self, "best_ntree_limit"): self.best_ntree_limit = json_desc.get( "best_ntree_limit", self.best_ntree_limit ) ``` -------------------------------------------------------------------------------- /tests/data/iris_missing_values_missing_target.csv: -------------------------------------------------------------------------------- ``` feature_1,feature_2,feature_3,feature_4,class 5.1,3.5,1.4,0.2,Iris-setosa 4.9,3.0,1.4,0.2,Iris-setosa 4.7,3.2,1.3,,Iris-setosa 4.6,3.1,1.5,,Iris-setosa 5.0,3.6,1.4,0.2,Iris-setosa ,3.9,1.7,0.4,Iris-setosa 4.6,3.4,1.4,0.3,Iris-setosa 5.0,3.4,1.5,0.2,Iris-setosa 4.4,,1.4,0.2,Iris-setosa 4.9,3.1,1.5,0.1,Iris-setosa 5.4,3.7,1.5,0.2,Iris-setosa 4.8,3.4,,0.2,Iris-setosa 4.8,3.0,1.4,0.1,Iris-setosa 4.3,3.0,1.1,0.1,Iris-setosa 5.8,4.0,1.2,0.2,Iris-setosa 5.7,4.4,1.5,0.4,Iris-setosa 5.4,3.9,1.3,0.4,Iris-setosa 5.1,3.5,1.4,0.3, 5.7,3.8,1.7,0.3,Iris-setosa 5.1,3.8,1.5,0.3,Iris-setosa 5.4,3.4,1.7,0.2,Iris-setosa 5.1,3.7,1.5,0.4,Iris-setosa 4.6,3.6,1.0,0.2,Iris-setosa 5.1,3.3,1.7,0.5,Iris-setosa 4.8,3.4,1.9,0.2,Iris-setosa 5.0,3.0,1.6,0.2,Iris-setosa 5.0,3.4,1.6,0.4,Iris-setosa 5.2,3.5,1.5,0.2,Iris-setosa 5.2,3.4,1.4,0.2,Iris-setosa 4.7,3.2,1.6,0.2,Iris-setosa 4.8,3.1,1.6,0.2,Iris-setosa 5.4,3.4,1.5,0.4,Iris-setosa 5.2,4.1,1.5,0.1,Iris-setosa 5.5,4.2,1.4,0.2,Iris-setosa 4.9,3.1,1.5,0.1,Iris-setosa 5.0,3.2,1.2,0.2,Iris-setosa 5.5,3.5,1.3,0.2,Iris-setosa 4.9,3.1,1.5,0.1,Iris-setosa 4.4,3.0,1.3,0.2,Iris-setosa 5.1,3.4,1.5,0.2,Iris-setosa 5.0,3.5,1.3,0.3,Iris-setosa 4.5,2.3,1.3,0.3,Iris-setosa 4.4,3.2,1.3,0.2,Iris-setosa 5.0,3.5,1.6,0.6,Iris-setosa 5.1,3.8,1.9,0.4,Iris-setosa 4.8,3.0,1.4,0.3,Iris-setosa 5.1,3.8,1.6,0.2,Iris-setosa 4.6,3.2,1.4,0.2,Iris-setosa 5.3,3.7,1.5,0.2,Iris-setosa 5.0,3.3,1.4,0.2,Iris-setosa 7.0,3.2,4.7,1.4,Iris-versicolor 6.4,3.2,4.5,1.5,Iris-versicolor 6.9,3.1,4.9,1.5, 5.5,2.3,4.0,1.3,Iris-versicolor 6.5,2.8,4.6,1.5,Iris-versicolor 5.7,2.8,4.5,1.3,Iris-versicolor 6.3,3.3,4.7,1.6,Iris-versicolor 4.9,2.4,3.3,1.0,Iris-versicolor 6.6,2.9,4.6,1.3,Iris-versicolor 5.2,2.7,3.9,1.4,Iris-versicolor 5.0,2.0,3.5,1.0,Iris-versicolor 5.9,3.0,4.2,1.5,Iris-versicolor 6.0,2.2,4.0,1.0,Iris-versicolor 6.1,2.9,4.7,1.4,Iris-versicolor 5.6,2.9,3.6,1.3,Iris-versicolor 6.7,3.1,4.4,1.4,Iris-versicolor 5.6,3.0,4.5,1.5,Iris-versicolor 5.8,2.7,4.1,1.0,Iris-versicolor 6.2,2.2,4.5,1.5,Iris-versicolor 5.6,2.5,3.9,1.1,Iris-versicolor 5.9,3.2,4.8,1.8,Iris-versicolor 6.1,2.8,4.0,1.3,Iris-versicolor 6.3,2.5,4.9,1.5,Iris-versicolor 6.1,2.8,4.7,1.2,Iris-versicolor 6.4,2.9,4.3,1.3,Iris-versicolor 6.6,3.0,4.4,1.4,Iris-versicolor 6.8,2.8,4.8,1.4,Iris-versicolor 6.7,3.0,5.0,1.7,Iris-versicolor 6.0,2.9,4.5,1.5,Iris-versicolor 5.7,2.6,3.5,1.0,Iris-versicolor 5.5,2.4,3.8,1.1,Iris-versicolor 5.5,2.4,3.7,1.0,Iris-versicolor 5.8,2.7,3.9,1.2,Iris-versicolor 6.0,2.7,5.1,1.6,Iris-versicolor 5.4,3.0,4.5,1.5,Iris-versicolor 6.0,3.4,4.5,1.6,Iris-versicolor 6.7,3.1,4.7,1.5,Iris-versicolor 6.3,2.3,4.4,1.3,Iris-versicolor 5.6,3.0,4.1,1.3,Iris-versicolor 5.5,2.5,4.0,1.3,Iris-versicolor 5.5,2.6,4.4,1.2,Iris-versicolor 6.1,3.0,4.6,1.4,Iris-versicolor 5.8,2.6,4.0,1.2,Iris-versicolor 5.0,2.3,3.3,1.0,Iris-versicolor 5.6,2.7,4.2,1.3,Iris-versicolor 5.7,3.0,4.2,1.2,Iris-versicolor 5.7,2.9,4.2,1.3,Iris-versicolor 6.2,2.9,4.3,1.3,Iris-versicolor 5.1,2.5,3.0,1.1,Iris-versicolor 5.7,2.8,4.1,1.3,Iris-versicolor 6.3,3.3,6.0,2.5,Iris-virginica 5.8,2.7,5.1,1.9,Iris-virginica 7.1,3.0,5.9,2.1,Iris-virginica 6.3,2.9,5.6,1.8,Iris-virginica 6.5,3.0,5.8,2.2,Iris-virginica 7.6,3.0,6.6,2.1,Iris-virginica 4.9,2.5,4.5,1.7,Iris-virginica 7.3,2.9,6.3,1.8,Iris-virginica 6.7,2.5,5.8,1.8,Iris-virginica 7.2,3.6,6.1,2.5,Iris-virginica 6.5,3.2,5.1,2.0,Iris-virginica 6.4,2.7,5.3,1.9,Iris-virginica 6.8,3.0,5.5,2.1,Iris-virginica 5.7,2.5,5.0,2.0,Iris-virginica 5.8,2.8,5.1,2.4,Iris-virginica 6.4,3.2,5.3,2.3,Iris-virginica 6.5,3.0,5.5,1.8,Iris-virginica 7.7,3.8,6.7,2.2,Iris-virginica 7.7,2.6,6.9,2.3,Iris-virginica 6.0,2.2,5.0,1.5,Iris-virginica 6.9,3.2,5.7,2.3,Iris-virginica 5.6,2.8,4.9,2.0,Iris-virginica 7.7,2.8,6.7,2.0,Iris-virginica 6.3,2.7,4.9,1.8,Iris-virginica 6.7,3.3,5.7,2.1,Iris-virginica 7.2,3.2,6.0,1.8,Iris-virginica 6.2,2.8,4.8,1.8,Iris-virginica 6.1,3.0,4.9,1.8,Iris-virginica 6.4,2.8,5.6,2.1,Iris-virginica 7.2,3.0,5.8,1.6,Iris-virginica 7.4,2.8,6.1,1.9,Iris-virginica 7.9,3.8,6.4,2.0,Iris-virginica 6.4,2.8,5.6,2.2,Iris-virginica 6.3,2.8,5.1,1.5,Iris-virginica 6.1,2.6,5.6,1.4,Iris-virginica 7.7,3.0,6.1,2.3,Iris-virginica 6.3,3.4,5.6,2.4,Iris-virginica 6.4,3.1,5.5,1.8,Iris-virginica 6.0,3.0,4.8,1.8,Iris-virginica 6.9,3.1,5.4,2.1,Iris-virginica 6.7,3.1,5.6,2.4,Iris-virginica 6.9,3.1,5.1,2.3,Iris-virginica 5.8,2.7,5.1,1.9,Iris-virginica 6.8,3.2,5.9,2.3,Iris-virginica 6.7,3.3,5.7,2.5,Iris-virginica 6.7,3.0,5.2,2.3,Iris-virginica 6.3,2.5,5.0,1.9,Iris-virginica 6.5,3.0,5.2,2.0,Iris-virginica 6.2,3.4,5.4,2.3,Iris-virginica 5.9,3.0,5.1,1.8,Iris-virginica ``` -------------------------------------------------------------------------------- /supervised/preprocessing/preprocessing_categorical.py: -------------------------------------------------------------------------------- ```python import numpy as np import pandas as pd from supervised.preprocessing.label_binarizer import LabelBinarizer from supervised.preprocessing.label_encoder import LabelEncoder from supervised.preprocessing.preprocessing_utils import PreprocessingUtils class PreprocessingCategorical(object): CONVERT_ONE_HOT = "categorical_to_onehot" CONVERT_INTEGER = "categorical_to_int" FEW_CATEGORIES = "few_categories" MANY_CATEGORIES = "many_categories" def __init__(self, columns=[], method=CONVERT_INTEGER): self._convert_method = method self._convert_params = {} self._columns = columns self._enc = None def fit(self, X, y=None): self._fit_categorical_convert(X) def _fit_categorical_convert(self, X): for column in self._columns: if PreprocessingUtils.get_type(X[column]) != PreprocessingUtils.CATEGORICAL: # no need to convert, already a number continue # limit categories - it is needed when doing one hot encoding # this code is also used in predict.py file # and transform_utils.py # TODO it needs refactoring !!! too_much_categories = len(np.unique(list(X[column].values))) > 200 lbl = None if ( self._convert_method == PreprocessingCategorical.CONVERT_ONE_HOT and not too_much_categories ): lbl = LabelBinarizer() lbl.fit(X, column) else: lbl = LabelEncoder() lbl.fit(X[column]) if lbl is not None: self._convert_params[column] = lbl.to_json() def transform(self, X): for column, lbl_params in self._convert_params.items(): if "unique_values" in lbl_params and "new_columns" in lbl_params: # convert to one hot lbl = LabelBinarizer() lbl.from_json(lbl_params) X = lbl.transform(X, column) else: # convert to integer lbl = LabelEncoder() lbl.from_json(lbl_params) transformed_values = lbl.transform(X.loc[:, column]) # check for pandas FutureWarning: Setting an item # of incompatible dtype is deprecated and will raise # in a future error of pandas. if transformed_values.dtype != X.loc[:, column].dtype and \ (X.loc[:, column].dtype == bool or X.loc[:, column].dtype == int): X = X.astype({column: transformed_values.dtype}) if isinstance(X[column].dtype, pd.CategoricalDtype): X[column] = X[column].astype('object') X.loc[:, column] = transformed_values return X def inverse_transform(self, X): for column, lbl_params in self._convert_params.items(): if "unique_values" in lbl_params and "new_columns" in lbl_params: # convert to one hot lbl = LabelBinarizer() lbl.from_json(lbl_params) X = lbl.inverse_transform(X, column) # should raise exception else: # convert to integer lbl = LabelEncoder() lbl.from_json(lbl_params) transformed_values = lbl.inverse_transform(X.loc[:, column]) # check for pandas FutureWarning: Setting an item # of incompatible dtype is deprecated and will raise # in a future error of pandas. if transformed_values.dtype != X.loc[:, column].dtype and \ (X.loc[:, column].dtype == bool or X.loc[:, column].dtype == int): X = X.astype({column: transformed_values.dtype}) X.loc[:, column] = transformed_values return X def to_json(self): params = {} if len(self._convert_params) == 0: return {} params = { "convert_method": self._convert_method, "convert_params": self._convert_params, "columns": self._columns, } return params def from_json(self, params): if params is not None: self._convert_method = params.get("convert_method", None) self._columns = params.get("columns", []) self._convert_params = params.get("convert_params", {}) else: self._convert_method, self._convert_params = None, None self._columns = [] ``` -------------------------------------------------------------------------------- /tests/tests_preprocessing/test_label_encoder.py: -------------------------------------------------------------------------------- ```python import json import unittest import numpy as np import pandas as pd from supervised.preprocessing.label_encoder import LabelEncoder class LabelEncoderTest(unittest.TestCase): def test_fit(self): # training data d = {"col1": ["a", "a", "c"], "col2": ["w", "e", "d"]} df = pd.DataFrame(data=d) le = LabelEncoder() # check first column le.fit(df["col1"]) data_json = le.to_json() # values from column should be in data json self.assertTrue("a" in data_json) self.assertTrue("c" in data_json) self.assertTrue("b" not in data_json) # there is alphabetical order for values self.assertEqual(0, data_json["a"]) self.assertEqual(1, data_json["c"]) # check next column le.fit(df["col2"]) data_json = le.to_json() self.assertEqual(0, data_json["d"]) self.assertEqual(1, data_json["e"]) self.assertEqual(2, data_json["w"]) def test_transform(self): # training data d = {"col1": ["a", "a", "c"]} df = pd.DataFrame(data=d) # fit encoder le = LabelEncoder() le.fit(df["col1"]) # test data d_test = {"col2": ["c", "c", "a"]} df_test = pd.DataFrame(data=d_test) # transform y = le.transform(df_test["col2"]) self.assertEqual(y[0], 1) self.assertEqual(y[1], 1) self.assertEqual(y[2], 0) def test_transform_with_new_values(self): # training data d = {"col1": ["a", "a", "c"]} df = pd.DataFrame(data=d) # fit encoder le = LabelEncoder() le.fit(df["col1"]) # test data d_test = {"col2": ["c", "a", "d", "f"]} df_test = pd.DataFrame(data=d_test) # transform y = le.transform(df_test["col2"]) self.assertEqual(y[0], 1) self.assertEqual(y[1], 0) self.assertEqual(y[2], 2) self.assertEqual(y[3], 3) def test_to_and_from_json(self): # training data d = {"col1": ["a", "a", "c"]} df = pd.DataFrame(data=d) # fit encoder le = LabelEncoder() le.fit(df["col1"]) # new encoder new_le = LabelEncoder() new_le.from_json(le.to_json()) # test data d_test = {"col2": ["c", "c", "a"]} df_test = pd.DataFrame(data=d_test) # transform y = new_le.transform(df_test["col2"]) self.assertEqual(y[0], 1) self.assertEqual(y[1], 1) self.assertEqual(y[2], 0) def test_to_and_from_json_booleans(self): # training data d = {"col1": [True, False, True]} df = pd.DataFrame(data=d) # fit encoder le = LabelEncoder() le.fit(df["col1"]) # new encoder new_le = LabelEncoder() new_le.from_json(json.loads(json.dumps(le.to_json(), indent=4))) # test data d_test = {"col2": [True, False, True]} df_test = pd.DataFrame(data=d_test) # transform y = new_le.transform(df_test["col2"]) self.assertEqual(y[0], 1) self.assertEqual(y[1], 0) self.assertEqual(y[2], 1) def test_fit_on_numeric_categories(self): # categories are as strings # but they represent numbers # we force encoder to sort them by numeric values # it is needed for computing predictions for many classes # training data d = {"col1": ["1", "10", "2"]} df = pd.DataFrame(data=d) le = LabelEncoder(try_to_fit_numeric=True) # check first column le.fit(df["col1"]) data_json = le.to_json() print(data_json) # values from column should be in data json self.assertTrue("1" in data_json) self.assertTrue("10" in data_json) self.assertTrue("2" in data_json) # there is numeric order for values self.assertEqual(0, data_json["1"]) self.assertEqual(1, data_json["2"]) self.assertEqual(2, data_json["10"]) p = le.transform(df["col1"]) p2 = le.transform(np.array(df["col1"].values)) self.assertEqual(p[0], 0) self.assertEqual(p[1], 2) self.assertEqual(p[2], 1) self.assertEqual(p[0], p2[0]) self.assertEqual(p[1], p2[1]) self.assertEqual(p[2], p2[2]) new_le = LabelEncoder() new_le.from_json(json.loads(json.dumps(le.to_json(), indent=4))) p2 = new_le.transform(df["col1"]) self.assertEqual(p[0], p2[0]) self.assertEqual(p[1], p2[1]) self.assertEqual(p[2], p2[2]) ``` -------------------------------------------------------------------------------- /tests/tests_algorithms/test_nn.py: -------------------------------------------------------------------------------- ```python import os import tempfile import unittest from numpy.testing import assert_almost_equal from sklearn import datasets from sklearn import preprocessing from supervised.algorithms.nn import MLPAlgorithm, MLPRegressorAlgorithm from supervised.utils.metric import Metric class MLPAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_classification( n_samples=100, n_features=5, n_informative=4, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=1, ) cls.params = { "dense_1_size": 8, "dense_2_size": 4, "learning_rate": 0.01, "ml_task": "binary_classification", } def test_fit_predict(self): metric = Metric({"name": "logloss"}) nn = MLPAlgorithm(self.params) nn.fit(self.X, self.y) y_predicted = nn.predict_proba(self.X) loss = metric(self.y, y_predicted) self.assertLess(loss, 2) def test_copy(self): # train model #1 metric = Metric({"name": "logloss"}) nn = MLPAlgorithm(self.params) nn.fit(self.X, self.y) y_predicted = nn.predict(self.X) loss = metric(self.y, y_predicted) # create model #2 nn2 = MLPAlgorithm(self.params) # do a copy and use it for predictions nn2 = nn.copy() self.assertEqual(type(nn), type(nn2)) y_predicted = nn2.predict(self.X) loss2 = metric(self.y, y_predicted) self.assertEqual(loss, loss2) # the loss of model #2 should not change y_predicted = nn2.predict(self.X) loss4 = metric(self.y, y_predicted) assert_almost_equal(loss2, loss4) def test_save_and_load(self): metric = Metric({"name": "logloss"}) nn = MLPAlgorithm(self.params) nn.fit(self.X, self.y) y_predicted = nn.predict(self.X) loss = metric(self.y, y_predicted) filename = os.path.join(tempfile.gettempdir(), os.urandom(12).hex()) nn.save(filename) json_desc = nn.get_params() nn2 = MLPAlgorithm(json_desc["params"]) nn2.load(filename) # Finished with the file, delete it os.remove(filename) y_predicted = nn2.predict(self.X) loss2 = metric(self.y, y_predicted) assert_almost_equal(loss, loss2) class MLPRegressorAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_regression( n_samples=100, n_features=5, n_informative=4, shuffle=False, random_state=0 ) cls.params = { "dense_layers": 2, "dense_1_size": 8, "dense_2_size": 4, "dropout": 0, "learning_rate": 0.01, "momentum": 0.9, "decay": 0.001, "ml_task": "regression", } cls.y = preprocessing.scale(cls.y) def test_fit_predict(self): metric = Metric({"name": "mse"}) nn = MLPRegressorAlgorithm(self.params) nn.fit(self.X, self.y) y_predicted = nn.predict(self.X) loss = metric(self.y, y_predicted) self.assertLess(loss, 2) class MultiClassNeuralNetworkAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_classification( n_samples=100, n_features=5, n_informative=4, n_redundant=1, n_classes=3, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) cls.params = { "dense_layers": 2, "dense_1_size": 8, "dense_2_size": 4, "dropout": 0, "learning_rate": 0.01, "momentum": 0.9, "decay": 0.001, "ml_task": "multiclass_classification", "num_class": 3, } lb = preprocessing.LabelEncoder() lb.fit(cls.y) cls.y = lb.transform(cls.y) def test_fit_predict(self): metric = Metric({"name": "logloss"}) nn = MLPAlgorithm(self.params) nn.fit(self.X, self.y) y_predicted = nn.predict(self.X) loss = metric(self.y, y_predicted) self.assertLess(loss, 2) def test_is_fitted(self): model = MLPAlgorithm(self.params) self.assertFalse(model.is_fitted()) model.fit(self.X, self.y) self.assertTrue(model.is_fitted()) ``` -------------------------------------------------------------------------------- /supervised/validation/validator_split.py: -------------------------------------------------------------------------------- ```python import logging import os import warnings import numpy as np log = logging.getLogger(__name__) from sklearn.model_selection import train_test_split from supervised.exceptions import AutoMLException from supervised.utils.utils import load_data from supervised.validation.validator_base import BaseValidator class SplitValidator(BaseValidator): def __init__(self, params): BaseValidator.__init__(self, params) self.train_ratio = self.params.get("train_ratio", 0.8) self.shuffle = self.params.get("shuffle", True) self.stratify = self.params.get("stratify", False) self.random_seed = self.params.get("random_seed", 1234) self.repeats = self.params.get("repeats", 1) if not self.shuffle and self.repeats > 1: warnings.warn( "Disable repeats in validation because shuffle is disabled", UserWarning ) self.repeats = 1 self._results_path = self.params.get("results_path") self._X_path = self.params.get("X_path") self._y_path = self.params.get("y_path") self._sample_weight_path = self.params.get("sample_weight_path") self._sensitive_features_path = self.params.get("sensitive_features_path") if self._X_path is None or self._y_path is None: raise AutoMLException("No data path set in SplitValidator params") def get_split(self, k=0, repeat=0): X = load_data(self._X_path) y = load_data(self._y_path) y = y["target"] sample_weight = None if self._sample_weight_path is not None: sample_weight = load_data(self._sample_weight_path) sample_weight = sample_weight["sample_weight"] sensitive_features = None if self._sensitive_features_path is not None: sensitive_features = load_data(self._sensitive_features_path) stratify = None if self.stratify: stratify = y if self.shuffle == False: stratify = None input_data = [X, y] if sample_weight is not None: input_data += [sample_weight] if sensitive_features is not None: input_data += [sensitive_features] output_data = train_test_split( *input_data, train_size=self.train_ratio, test_size=1.0 - self.train_ratio, shuffle=self.shuffle, stratify=stratify, random_state=self.random_seed + repeat, ) X_train = output_data[0] X_validation = output_data[1] y_train = output_data[2] y_validation = output_data[3] if sample_weight is not None: sample_weight_train = output_data[4] sample_weight_validation = output_data[5] if sensitive_features is not None: sensitive_features_train = output_data[6] sensitive_features_validation = output_data[7] else: if sensitive_features is not None: sensitive_features_train = output_data[4] sensitive_features_validation = output_data[5] train_data = {"X": X_train, "y": y_train} validation_data = {"X": X_validation, "y": y_validation} if sample_weight is not None: train_data["sample_weight"] = sample_weight_train validation_data["sample_weight"] = sample_weight_validation if sensitive_features is not None: train_data["sensitive_features"] = sensitive_features_train validation_data["sensitive_features"] = sensitive_features_validation repeat_str = f"repeat_{repeat}_" if self.repeats > 1 else "" train_data_file = os.path.join( self._results_path, f"split_{repeat_str}train_indices.npy" ) validation_data_file = os.path.join( self._results_path, f"split_{repeat_str}validation_indices.npy" ) np.save(train_data_file, X_train.index) np.save(validation_data_file, X_validation.index) return train_data, validation_data def get_n_splits(self): return 1 def get_repeats(self): return self.repeats """ import numpy as np import pandas as pd from sklearn.utils.fixes import bincount from sklearn.model_selection import train_test_split import logging logger = logging.getLogger('mljar') def validation_split(train, validation_train_split, stratify, shuffle, random_seed): if shuffle: else: if stratify is None: train, validation = data_split(validation_train_split, train) else: train, validation = data_split_stratified(validation_train_split, train, stratify) return train, validation """ ``` -------------------------------------------------------------------------------- /supervised/tuner/optuna/xgboost.py: -------------------------------------------------------------------------------- ```python import numpy as np import optuna import optuna_integration import xgboost as xgb from supervised.algorithms.registry import ( MULTICLASS_CLASSIFICATION, ) from supervised.algorithms.xgboost import xgboost_eval_metric, xgboost_objective from supervised.utils.metric import ( Metric, xgboost_eval_metric_accuracy, xgboost_eval_metric_average_precision, xgboost_eval_metric_f1, xgboost_eval_metric_mse, xgboost_eval_metric_pearson, xgboost_eval_metric_r2, xgboost_eval_metric_spearman, xgboost_eval_metric_user_defined, ) EPS = 1e-8 class XgboostObjective: def __init__( self, ml_task, X_train, y_train, sample_weight, X_validation, y_validation, sample_weight_validation, eval_metric, n_jobs, random_state, ): self.dtrain = xgb.DMatrix(X_train, label=y_train, weight=sample_weight) self.dvalidation = xgb.DMatrix( X_validation, label=y_validation, weight=sample_weight_validation ) self.X_validation = X_validation self.y_validation = y_validation self.eval_metric = eval_metric self.n_jobs = n_jobs self.learning_rate = 0.0125 self.rounds = 1000 self.early_stopping_rounds = 50 self.seed = random_state self.objective = "" self.eval_metric_name = "" self.num_class = ( len(np.unique(y_train)) if ml_task == MULTICLASS_CLASSIFICATION else None ) self.objective = xgboost_objective(ml_task, eval_metric.name) self.eval_metric_name = xgboost_eval_metric(ml_task, eval_metric.name) self.custom_eval_metric = None if self.eval_metric_name == "r2": self.custom_eval_metric = xgboost_eval_metric_r2 elif self.eval_metric_name == "spearman": self.custom_eval_metric = xgboost_eval_metric_spearman elif self.eval_metric_name == "pearson": self.custom_eval_metric = xgboost_eval_metric_pearson elif self.eval_metric_name == "f1": self.custom_eval_metric = xgboost_eval_metric_f1 elif self.eval_metric_name == "average_precision": self.custom_eval_metric = xgboost_eval_metric_average_precision elif self.eval_metric_name == "accuracy": self.custom_eval_metric = xgboost_eval_metric_accuracy elif self.eval_metric_name == "mse": self.custom_eval_metric = xgboost_eval_metric_mse elif self.eval_metric_name == "user_defined_metric": self.custom_eval_metric = xgboost_eval_metric_user_defined def __call__(self, trial): param = { "objective": self.objective, "eval_metric": self.eval_metric_name, "tree_method": "hist", "booster": "gbtree", "eta": trial.suggest_categorical("eta", [0.0125, 0.025, 0.05, 0.1]), "max_depth": trial.suggest_int("max_depth", 2, 12), "lambda": trial.suggest_float("lambda", EPS, 10.0, log=True), "alpha": trial.suggest_float("alpha", EPS, 10.0, log=True), "colsample_bytree": min( trial.suggest_float("colsample_bytree", 0.3, 1.0 + EPS), 1.0 ), "subsample": min(trial.suggest_float("subsample", 0.3, 1.0 + EPS), 1.0), "min_child_weight": trial.suggest_int("min_child_weight", 1, 100), "n_jobs": self.n_jobs, "seed": self.seed, "verbosity": 0, } if self.custom_eval_metric is not None: del param["eval_metric"] if self.num_class is not None: param["num_class"] = self.num_class try: pruning_callback = optuna_integration.XGBoostPruningCallback( trial, f"validation-{self.eval_metric_name}" ) bst = xgb.train( param, self.dtrain, self.rounds, evals=[(self.dvalidation, "validation")], early_stopping_rounds=self.early_stopping_rounds, callbacks=[pruning_callback], verbose_eval=False, custom_metric=self.custom_eval_metric, ) preds = bst.predict( self.dvalidation, iteration_range=(0, bst.best_iteration) ) score = self.eval_metric(self.y_validation, preds) if Metric.optimize_negative(self.eval_metric.name): score *= -1.0 except optuna.exceptions.TrialPruned as e: raise e except Exception as e: print("Exception in XgboostObjective", str(e)) return None return score ``` -------------------------------------------------------------------------------- /supervised/algorithms/nn.py: -------------------------------------------------------------------------------- ```python import logging import warnings import numpy as np import pandas as pd import sklearn from sklearn.base import ClassifierMixin, RegressorMixin from sklearn.neural_network import MLPClassifier, MLPRegressor from supervised.algorithms.registry import ( BINARY_CLASSIFICATION, MULTICLASS_CLASSIFICATION, REGRESSION, AlgorithmsRegistry, ) from supervised.algorithms.sklearn import SklearnAlgorithm from supervised.utils.config import LOG_LEVEL logger = logging.getLogger(__name__) logger.setLevel(LOG_LEVEL) class NNFit(SklearnAlgorithm): def file_extension(self): return "neural_network" def is_fitted(self): return ( hasattr(self.model, "n_iter_") and self.model.n_iter_ is not None and self.model.n_iter_ > 0 ) def fit( self, X, y, sample_weight=None, X_validation=None, y_validation=None, sample_weight_validation=None, log_to_file=None, max_time=None, ): with warnings.catch_warnings(): warnings.simplefilter(action="ignore") # filter # X does not have valid feature names, but MLPClassifier was fitted with feature names self.model.fit(X, y) if log_to_file is not None: loss_curve = self.model.loss_curve_ result = pd.DataFrame( { "iteration": range(len(loss_curve)), "train": loss_curve, "validation": None, } ) result.to_csv(log_to_file, index=False, header=False) if self.params["ml_task"] != REGRESSION: self.classes_ = np.unique(y) class MLPAlgorithm(ClassifierMixin, NNFit): algorithm_name = "Neural Network" algorithm_short_name = "Neural Network" def __init__(self, params): super(MLPAlgorithm, self).__init__(params) logger.debug("MLPAlgorithm.__init__") self.max_iters = 1 self.library_version = sklearn.__version__ h1 = params.get("dense_1_size", 32) h2 = params.get("dense_2_size", 16) learning_rate = params.get("learning_rate", 0.05) max_iter = 500 self.model = MLPClassifier( hidden_layer_sizes=(h1, h2), activation="relu", solver="adam", learning_rate=params.get("learning_rate_type", "constant"), learning_rate_init=learning_rate, alpha=params.get("alpha", 0.0001), early_stopping=True, n_iter_no_change=50, max_iter=max_iter, random_state=params.get("seed", 123), ) def get_metric_name(self): return "logloss" class MLPRegressorAlgorithm(RegressorMixin, NNFit): algorithm_name = "Neural Network" algorithm_short_name = "Neural Network" def __init__(self, params): super(MLPRegressorAlgorithm, self).__init__(params) logger.debug("MLPRegressorAlgorithm.__init__") self.max_iters = 1 self.library_version = sklearn.__version__ h1 = params.get("dense_1_size", 32) h2 = params.get("dense_2_size", 16) learning_rate = params.get("learning_rate", 0.05) momentum = params.get("momentum", 0.9) early_stopping = True max_iter = 500 self.model = MLPRegressor( hidden_layer_sizes=(h1, h2), activation="relu", solver="adam", learning_rate="constant", learning_rate_init=learning_rate, momentum=momentum, early_stopping=early_stopping, max_iter=max_iter, ) def get_metric_name(self): return "mse" nn_params = { "dense_1_size": [16, 32, 64], "dense_2_size": [4, 8, 16, 32], "learning_rate": [0.01, 0.05, 0.08, 0.1], } default_nn_params = {"dense_1_size": 32, "dense_2_size": 16, "learning_rate": 0.05} additional = {"max_rows_limit": None, "max_cols_limit": None} required_preprocessing = [ "missing_values_inputation", "convert_categorical", "datetime_transform", "text_transform", "scale", "target_as_integer", ] AlgorithmsRegistry.add( BINARY_CLASSIFICATION, MLPAlgorithm, nn_params, required_preprocessing, additional, default_nn_params, ) AlgorithmsRegistry.add( MULTICLASS_CLASSIFICATION, MLPAlgorithm, nn_params, required_preprocessing, additional, default_nn_params, ) required_preprocessing = [ "missing_values_inputation", "convert_categorical", "datetime_transform", "text_transform", "scale", "target_scale", ] AlgorithmsRegistry.add( REGRESSION, MLPRegressorAlgorithm, nn_params, required_preprocessing, additional, default_nn_params, ) ``` -------------------------------------------------------------------------------- /supervised/utils/leaderboard_plots.py: -------------------------------------------------------------------------------- ```python import logging import os import numpy as np import pandas as pd logger = logging.getLogger(__name__) from supervised.utils.config import LOG_LEVEL from supervised.utils.metric import Metric logger.setLevel(LOG_LEVEL) import warnings import matplotlib.pyplot as plt warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning) markers = { "Baseline": {"color": "tab:cyan", "marker": "8"}, "Linear": {"color": "tab:pink", "marker": "s"}, "Decision Tree": {"color": "tab:gray", "marker": "^"}, "Random Forest": {"color": "tab:green", "marker": "o"}, "Extra Trees": {"color": "tab:brown", "marker": "v"}, "LightGBM": {"color": "tab:purple", "marker": "P"}, "Xgboost": {"color": "tab:blue", "marker": "*"}, "CatBoost": {"color": "tab:orange", "marker": "D"}, "Neural Network": {"color": "tab:red", "marker": "x"}, "Nearest Neighbors": {"color": "tab:olive", "marker": "+"}, "Ensemble": {"color": "black", "marker": "p"}, } class LeaderboardPlots: performance_fname = "ldb_performance.png" performance_boxplot_fname = "ldb_performance_boxplot.png" @staticmethod def compute(ldb, model_path, fout, fairness_threshold=None): if ldb.shape[0] < 2: return # Scatter plot plt.figure(figsize=(10, 7)) for model_type in ldb.model_type.unique(): ii = ldb.model_type == model_type plt.plot( ldb.metric_value[ii], markers[model_type]["marker"], markersize=12, alpha=0.75, color=markers[model_type]["color"], label=model_type, ) # plt.plot(ldb.metric_value, "*", markersize=12, alpha=0.75) plt.xlabel("#Iteration") plt.ylabel(ldb.metric_type.iloc[0]) plt.legend() plt.title("AutoML Performance") plt.tight_layout(pad=2.0) plot_path = os.path.join(model_path, LeaderboardPlots.performance_fname) plt.savefig(plot_path) plt.close("all") fout.write("\n\n### AutoML Performance\n") fout.write(f"") # Boxplot by = "model_type" column = "metric_value" df2 = pd.DataFrame({col: vals[column] for col, vals in ldb.groupby(by)}) ascending_sort = Metric.optimize_negative(ldb.metric_type.iloc[0]) mins = df2.min().sort_values(ascending=ascending_sort) plt.figure(figsize=(10, 7)) # plt.title("") plt.ylabel(ldb.metric_type.iloc[0]) df2[mins.index].boxplot(rot=90, fontsize=12) plt.tight_layout(pad=2.0) plot_path = os.path.join(model_path, LeaderboardPlots.performance_boxplot_fname) plt.savefig(plot_path) plt.close("all") fout.write("\n\n### AutoML Performance Boxplot\n") fout.write( f"" ) if fairness_threshold is not None: fairness_metrics = [ f for f in ldb.columns if "fairness_" in f and f != "fairness_metric" ] for fm in fairness_metrics: x_axis_name = ldb.metric_type.iloc[0] y_axis_name = ldb["fairness_metric"].iloc[0] # Scatter plot plt.figure(figsize=(10, 7)) for model_type in ldb.model_type.unique(): ii = ldb.model_type == model_type plt.plot( ldb.metric_value[ii], ldb[fm][ii], markers[model_type]["marker"], markersize=12, alpha=0.75, color=markers[model_type]["color"], label=model_type, ) plt.xlabel(x_axis_name) plt.ylabel(y_axis_name) plt.legend() plt.title(f"Performance vs {fm}") plt.tight_layout(pad=2.0) ymin = 0 ymax = max(1, ldb[fm].max() * 1.1) plt.ylim(0, ymax) if "ratio" in y_axis_name: plt.axhspan(fairness_threshold, ymax, color="green", alpha=0.05) plt.axhspan(ymin, fairness_threshold, color="red", alpha=0.05) else: # difference metric plt.axhspan(ymin, fairness_threshold, color="green", alpha=0.05) plt.axhspan(fairness_threshold, ymax, color="red", alpha=0.05) fname = f"performance_vs_{fm}.png" plot_path = os.path.join(model_path, fname) plt.savefig(plot_path) plt.close("all") fout.write(f"\n\n### Performance vs {fm}\n") fout.write(f"") ``` -------------------------------------------------------------------------------- /supervised/utils/learning_curves.py: -------------------------------------------------------------------------------- ```python import logging import os import numpy as np import pandas as pd logger = logging.getLogger(__name__) from supervised.utils.common import learner_name_to_fold_repeat from supervised.utils.config import LOG_LEVEL from supervised.utils.metric import Metric logger.setLevel(LOG_LEVEL) import matplotlib.colors as mcolors import matplotlib.pyplot as plt MY_COLORS = list(mcolors.TABLEAU_COLORS.values()) class LearningCurves: output_file_name = "learning_curves.png" @staticmethod def single_iteration(learner_names, model_path): for ln in learner_names: df = pd.read_csv( os.path.join(model_path, f"{ln}_training.log"), names=["iteration", "train", "test"], ) if df.shape[0] > 1: return False return True @staticmethod def plot(learner_names, metric_name, model_path, trees_in_iteration=None): colors = MY_COLORS if len(learner_names) > len(colors): repeat_colors = int(np.ceil(len(learner_names) / len(colors))) colors = colors * repeat_colors if LearningCurves.single_iteration(learner_names, model_path): LearningCurves.plot_single_iter( learner_names, metric_name, model_path, colors ) else: LearningCurves.plot_iterations( learner_names, metric_name, model_path, colors, trees_in_iteration ) @staticmethod def plot_single_iter(learner_names, metric_name, model_path, colors): plt.figure(figsize=(10, 7)) for ln in learner_names: df = pd.read_csv( os.path.join(model_path, f"{ln}_training.log"), names=["iteration", "train", "test"], ) fold, repeat = learner_name_to_fold_repeat(ln) repeat_str = f" Reapeat {repeat+1}," if repeat is not None else "" plt.bar( f"Fold {fold+1},{repeat_str} train", df.train[0], color="white", edgecolor=colors[fold], ) plt.bar(f"Fold {fold+1},{repeat_str} test", df.test[0], color=colors[fold]) plt.ylabel(metric_name) plt.xticks(rotation=90) plt.tight_layout(pad=2.0) plot_path = os.path.join(model_path, LearningCurves.output_file_name) plt.savefig(plot_path) plt.close("all") @staticmethod def plot_iterations( learner_names, metric_name, model_path, colors, trees_in_iteration=None ): plt.figure(figsize=(10, 7)) for ln in learner_names: df = pd.read_csv( os.path.join(model_path, f"{ln}_training.log"), names=["iteration", "train", "test"], ) fold, repeat = learner_name_to_fold_repeat(ln) repeat_str = f" Reapeat {repeat+1}," if repeat is not None else "" # if trees_in_iteration is not None: # df.iteration = df.iteration * trees_in_iteration any_none = np.sum(pd.isnull(df.train)) if any_none == 0: plt.plot( df.iteration, df.train, "--", color=colors[fold], label=f"Fold {fold+1},{repeat_str} train", ) any_none = np.sum(pd.isnull(df.test)) if any_none == 0: plt.plot( df.iteration, df.test, color=colors[fold], label=f"Fold {fold+1},{repeat_str} test", ) if not df.test.isnull().values.any(): best_iter = None if Metric.optimize_negative(metric_name): best_iter = df.test.argmax() else: best_iter = df.test.argmin() if best_iter is not None and best_iter != -1: plt.axvline(best_iter, color=colors[fold], alpha=0.3) if trees_in_iteration is not None: plt.xlabel("#Trees") else: plt.xlabel("#Iteration") plt.ylabel(metric_name) # limit number of learners in the legend # too many will raise warnings if len(learner_names) <= 15: plt.legend(loc="best") plt.tight_layout(pad=2.0) plot_path = os.path.join(model_path, LearningCurves.output_file_name) plt.savefig(plot_path) plt.close("all") @staticmethod def plot_for_ensemble(scores, metric_name, model_path): plt.figure(figsize=(10, 7)) plt.plot(range(1, len(scores) + 1), scores, label=f"Ensemble") plt.xlabel("#Iteration") plt.ylabel(metric_name) plt.legend(loc="best") plot_path = os.path.join(model_path, LearningCurves.output_file_name) plt.savefig(plot_path) plt.close("all") ``` -------------------------------------------------------------------------------- /supervised/fairness/report.py: -------------------------------------------------------------------------------- ```python import os class FairnessReport: """Saves information about fairness in the report.""" @staticmethod def save_classification(fairness_metrics, fout, model_path, is_multi=False): for k, v in fairness_metrics.items(): if k == "fairness_optimization": continue if is_multi: a = k.split("__", maxsplit=1) feature, class_name = a if is_multi: fout.write( f"\n\n## Fairness metrics for {feature} feature and {class_name} class\n\n" ) else: fout.write(f"\n\n## Fairness metrics for {k} feature\n\n") fout.write(v["metrics"].to_markdown()) fout.write("\n\n") fout.write(v["stats"].to_markdown()) fout.write("\n\n") if is_multi: fout.write( f"\n\n## Is model fair for {feature} feature and {class_name} class?\n" ) else: fout.write(f"\n\n## Is model fair for {k} feature?\n") fair_str = "fair" if v["is_fair"] else "unfair" fairness_threshold = fairness_metrics.get("fairness_optimization", {}).get( "fairness_threshold" ) fairness_threshold_str = "" if fairness_threshold is not None: if "ratio" in v["fairness_metric_name"].lower(): fairness_threshold_str = ( f"It should be higher than {fairness_threshold}." ) else: fairness_threshold_str = ( f"It should be lower than {fairness_threshold}." ) if is_multi: fout.write( f"Model is {fair_str} for {feature} feature and {class_name} class.\n" ) else: fout.write(f"Model is {fair_str} for {k} feature.\n") fout.write( f'The {v["fairness_metric_name"]} is {v["fairness_metric_value"]}. {fairness_threshold_str}\n' ) if not v["is_fair"]: # display information about privileged and underprivileged groups # for unfair models if v.get("underprivileged_value") is not None: fout.write( f'Underprivileged value is {v["underprivileged_value"]}.\n' ) if v.get("privileged_value") is not None: fout.write(f'Privileged value is {v["privileged_value"]}.\n') for figure in v["figures"]: fout.write(f"\n\n### {figure['title']}\n\n") figure["figure"].savefig(os.path.join(model_path, figure["fname"])) fout.write(f"\n\n\n") @staticmethod def regression(fairness_metrics, fout, model_path): for k, v in fairness_metrics.items(): if k == "fairness_optimization": continue fout.write(f"\n\n## Fairness metrics for {k} feature\n\n") fout.write(v["metrics"].to_markdown()) fout.write("\n\n") fout.write(f'Privileged value: {v["privileged_value"]}\n\n') fout.write(f'Underprivileged value: {v["underprivileged_value"]}\n\n\n') fout.write(f'Fairness metric: {v["fairness_metric_name"]}\n\n') fout.write(f'{v["metric_name"]} Difference: {v["diff"]}\n\n') fout.write(f'{v["metric_name"]} Ratio: {v["ratio"]}\n\n') # add sentence about model fairness if v["is_fair"]: fout.write(f"Model is fair for {k} feature.\n") if "ratio" in v["fairness_metric_name"].lower(): fout.write( f"The {v['fairness_metric_name']} value is above threshold {v['fairness_threshold']}.\n\n" ) else: fout.write( f"The {v['fairness_metric_name']} value is below threshold {v['fairness_threshold']}.\n\n" ) else: # model is not fair fout.write(f"Model is unfair for {k} feature.\n") if "ratio" in v["fairness_metric_name"].lower(): fout.write( f"The {v['fairness_metric_name']} value is below threshold {v['fairness_threshold']}.\n\n" ) else: fout.write( f"The {v['fairness_metric_name']} value is above threshold {v['fairness_threshold']}.\n\n" ) for figure in v["figures"]: fout.write(f"\n\n### {figure['title']}\n\n") figure["figure"].savefig(os.path.join(model_path, figure["fname"])) fout.write(f"\n\n\n") ``` -------------------------------------------------------------------------------- /tests/tests_algorithms/test_catboost.py: -------------------------------------------------------------------------------- ```python import os import tempfile import unittest import pandas as pd from numpy.testing import assert_almost_equal from sklearn import datasets from supervised.algorithms.catboost import CatBoostAlgorithm, additional from supervised.utils.metric import Metric additional["max_rounds"] = 1 class CatBoostRegressorAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_regression( n_samples=100, n_features=5, n_informative=4, shuffle=False, random_state=0 ) cls.X = pd.DataFrame(cls.X, columns=[f"f_{i}" for i in range(cls.X.shape[1])]) cls.params = { "learning_rate": 0.1, "depth": 4, "rsm": 0.5, "l2_leaf_reg": 1, "seed": 1, "ml_task": "regression", "loss_function": "RMSE", "eval_metric": "RMSE", } def test_reproduce_fit(self): metric = Metric({"name": "mse"}) prev_loss = None for _ in range(2): model = CatBoostAlgorithm(self.params) model.fit(self.X, self.y) y_predicted = model.predict(self.X) loss = metric(self.y, y_predicted) if prev_loss is not None: assert_almost_equal(prev_loss, loss, decimal=3) prev_loss = loss def test_get_metric_name(self): model = CatBoostAlgorithm(self.params) self.assertEqual(model.get_metric_name(), "rmse") class CatBoostAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_classification( n_samples=100, n_features=5, n_informative=4, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) cls.X = pd.DataFrame(cls.X, columns=[f"f_{i}" for i in range(cls.X.shape[1])]) cls.params = { "learning_rate": 0.1, "depth": 4, "rsm": 0.5, "l2_leaf_reg": 1, "seed": 1, "ml_task": "binary_classification", "loss_function": "Logloss", "eval_metric": "Logloss", } def test_reproduce_fit(self): metric = Metric({"name": "logloss"}) prev_loss = None for _ in range(2): model = CatBoostAlgorithm(self.params) model.fit(self.X, self.y) y_predicted = model.predict(self.X) loss = metric(self.y, y_predicted) if prev_loss is not None: assert_almost_equal(prev_loss, loss, decimal=3) prev_loss = loss def test_fit_predict(self): metric = Metric({"name": "logloss"}) loss_prev = None for _ in range(2): cat = CatBoostAlgorithm(self.params) cat.fit(self.X, self.y) y_predicted = cat.predict(self.X) loss = metric(self.y, y_predicted) if loss_prev is not None: assert_almost_equal(loss, loss_prev, decimal=3) loss_prev = loss def test_copy(self): # train model #1 metric = Metric({"name": "logloss"}) cat = CatBoostAlgorithm(self.params) cat.fit(self.X, self.y) y_predicted = cat.predict(self.X) loss = metric(self.y, y_predicted) # create model #2 cat2 = CatBoostAlgorithm(self.params) # model #2 is initialized in constructor self.assertTrue(cat2.model is not None) # do a copy and use it for predictions cat2 = cat.copy() self.assertEqual(type(cat), type(cat2)) y_predicted = cat2.predict(self.X) loss2 = metric(self.y, y_predicted) self.assertEqual(loss, loss2) def test_save_and_load(self): metric = Metric({"name": "logloss"}) cat = CatBoostAlgorithm(self.params) cat.fit(self.X, self.y) y_predicted = cat.predict(self.X) loss = metric(self.y, y_predicted) filename = os.path.join(tempfile.gettempdir(), os.urandom(12).hex()) cat.save(filename) cat2 = CatBoostAlgorithm(self.params) self.assertTrue(cat.uid != cat2.uid) self.assertTrue(cat2.model is not None) cat2.load(filename) # Finished with the file, delete it os.remove(filename) y_predicted = cat2.predict(self.X) loss2 = metric(self.y, y_predicted) assert_almost_equal(loss, loss2, decimal=3) def test_get_metric_name(self): model = CatBoostAlgorithm(self.params) self.assertEqual(model.get_metric_name(), "logloss") params = dict(self.params) params["loss_function"] = "MultiClass" params["eval_metric"] = "MultiClass" model = CatBoostAlgorithm(params) self.assertEqual(model.get_metric_name(), "logloss") def test_is_fitted(self): cat = CatBoostAlgorithm(self.params) self.assertFalse(cat.is_fitted()) cat.fit(self.X, self.y) self.assertTrue(cat.is_fitted()) ``` -------------------------------------------------------------------------------- /supervised/algorithms/extra_trees.py: -------------------------------------------------------------------------------- ```python import logging import sklearn from sklearn.base import ClassifierMixin, RegressorMixin from sklearn.ensemble import ExtraTreesClassifier, ExtraTreesRegressor from supervised.algorithms.registry import ( BINARY_CLASSIFICATION, MULTICLASS_CLASSIFICATION, REGRESSION, AlgorithmsRegistry, ) from supervised.algorithms.sklearn import ( SklearnTreesEnsembleClassifierAlgorithm, SklearnTreesEnsembleRegressorAlgorithm, ) from supervised.utils.config import LOG_LEVEL logger = logging.getLogger(__name__) logger.setLevel(LOG_LEVEL) class ExtraTreesAlgorithm(ClassifierMixin, SklearnTreesEnsembleClassifierAlgorithm): algorithm_name = "Extra Trees Classifier" algorithm_short_name = "Extra Trees" def __init__(self, params): super(ExtraTreesAlgorithm, self).__init__(params) logger.debug("ExtraTreesAlgorithm.__init__") self.library_version = sklearn.__version__ self.trees_in_step = additional.get("trees_in_step", 100) self.max_steps = additional.get("max_steps", 50) self.early_stopping_rounds = additional.get("early_stopping_rounds", 50) self.model = ExtraTreesClassifier( n_estimators=self.trees_in_step, criterion=params.get("criterion", "gini"), max_features=params.get("max_features", 0.8), max_depth=params.get("max_depth", 6), min_samples_split=params.get("min_samples_split", 4), min_samples_leaf=params.get("min_samples_leaf", 1), warm_start=True, n_jobs=params.get("n_jobs", -1), random_state=params.get("seed", 1), ) self.max_steps = self.params.get("max_steps", self.max_steps) def file_extension(self): return "extra_trees" class ExtraTreesRegressorAlgorithm( RegressorMixin, SklearnTreesEnsembleRegressorAlgorithm ): algorithm_name = "Extra Trees Regressor" algorithm_short_name = "Extra Trees" def __init__(self, params): super(ExtraTreesRegressorAlgorithm, self).__init__(params) logger.debug("ExtraTreesRegressorAlgorithm.__init__") self.library_version = sklearn.__version__ self.trees_in_step = regression_additional.get("trees_in_step", 100) self.max_steps = regression_additional.get("max_steps", 50) self.early_stopping_rounds = regression_additional.get( "early_stopping_rounds", 50 ) self.model = ExtraTreesRegressor( n_estimators=self.trees_in_step, criterion=params.get("criterion", "squared_error"), max_features=params.get("max_features", 0.6), max_depth=params.get("max_depth", 6), min_samples_split=params.get("min_samples_split", 30), min_samples_leaf=params.get("min_samples_leaf", 1), warm_start=True, n_jobs=params.get("n_jobs", -1), random_state=params.get("seed", 1), ) self.max_steps = self.params.get("max_steps", self.max_steps) def file_extension(self): return "extra_trees" # For binary classification target should be 0, 1. There should be no NaNs in target. et_params = { "criterion": ["gini", "entropy"], "max_features": [0.5, 0.6, 0.7, 0.8, 0.9, 1.0], "min_samples_split": [10, 20, 30, 40, 50], "max_depth": [3, 4, 5, 6, 7], } classification_default_params = { "criterion": "gini", "max_features": 0.9, "min_samples_split": 30, "max_depth": 4, } additional = { "trees_in_step": 100, "max_steps": 50, "early_stopping_rounds": 50, "max_rows_limit": None, "max_cols_limit": None, } required_preprocessing = [ "missing_values_inputation", "convert_categorical", "datetime_transform", "text_transform", "target_as_integer", ] AlgorithmsRegistry.add( BINARY_CLASSIFICATION, ExtraTreesAlgorithm, et_params, required_preprocessing, additional, classification_default_params, ) AlgorithmsRegistry.add( MULTICLASS_CLASSIFICATION, ExtraTreesAlgorithm, et_params, required_preprocessing, additional, classification_default_params, ) # # REGRESSION # regression_et_params = { "criterion": [ "squared_error" ], # remove "mae" because it slows down a lot https://github.com/scikit-learn/scikit-learn/issues/9626 "max_features": [0.5, 0.6, 0.7, 0.8, 0.9, 1.0], "min_samples_split": [10, 20, 30, 40, 50], "max_depth": [3, 4, 5, 6, 7], } regression_default_params = { "criterion": "squared_error", "max_features": 0.9, "min_samples_split": 30, "max_depth": 4, } regression_additional = { "trees_in_step": 100, "max_steps": 50, "early_stopping_rounds": 50, "max_rows_limit": None, "max_cols_limit": None, } regression_required_preprocessing = [ "missing_values_inputation", "convert_categorical", "datetime_transform", "text_transform", "target_scale", ] AlgorithmsRegistry.add( REGRESSION, ExtraTreesRegressorAlgorithm, regression_et_params, regression_required_preprocessing, regression_additional, regression_default_params, ) ``` -------------------------------------------------------------------------------- /supervised/algorithms/random_forest.py: -------------------------------------------------------------------------------- ```python import logging import sklearn from sklearn.base import ClassifierMixin, RegressorMixin from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from supervised.algorithms.registry import ( BINARY_CLASSIFICATION, MULTICLASS_CLASSIFICATION, REGRESSION, AlgorithmsRegistry, ) from supervised.algorithms.sklearn import ( SklearnTreesEnsembleClassifierAlgorithm, SklearnTreesEnsembleRegressorAlgorithm, ) from supervised.utils.config import LOG_LEVEL logger = logging.getLogger(__name__) logger.setLevel(LOG_LEVEL) class RandomForestAlgorithm(ClassifierMixin, SklearnTreesEnsembleClassifierAlgorithm): algorithm_name = "Random Forest" algorithm_short_name = "Random Forest" def __init__(self, params): super(RandomForestAlgorithm, self).__init__(params) logger.debug("RandomForestAlgorithm.__init__") self.library_version = sklearn.__version__ self.trees_in_step = additional.get("trees_in_step", 5) self.max_steps = additional.get("max_steps", 3) self.early_stopping_rounds = additional.get("early_stopping_rounds", 50) self.model = RandomForestClassifier( n_estimators=self.trees_in_step, criterion=params.get("criterion", "gini"), max_features=params.get("max_features", 0.8), max_depth=params.get("max_depth", 6), min_samples_split=params.get("min_samples_split", 4), min_samples_leaf=params.get("min_samples_leaf", 1), warm_start=True, n_jobs=params.get("n_jobs", -1), random_state=params.get("seed", 1), ) self.max_steps = self.params.get("max_steps", self.max_steps) def file_extension(self): return "random_forest" class RandomForestRegressorAlgorithm( RegressorMixin, SklearnTreesEnsembleRegressorAlgorithm ): algorithm_name = "Random Forest" algorithm_short_name = "Random Forest" def __init__(self, params): super(RandomForestRegressorAlgorithm, self).__init__(params) logger.debug("RandomForestRegressorAlgorithm.__init__") self.library_version = sklearn.__version__ self.trees_in_step = regression_additional.get("trees_in_step", 5) self.max_steps = regression_additional.get("max_steps", 3) self.early_stopping_rounds = regression_additional.get( "early_stopping_rounds", 50 ) self.model = RandomForestRegressor( n_estimators=self.trees_in_step, criterion=params.get("criterion", "squared_error"), max_features=params.get("max_features", 0.8), max_depth=params.get("max_depth", 6), min_samples_split=params.get("min_samples_split", 4), min_samples_leaf=params.get("min_samples_leaf", 1), warm_start=True, n_jobs=params.get("n_jobs", -1), random_state=params.get("seed", 1), ) self.max_steps = self.params.get("max_steps", self.max_steps) def file_extension(self): return "random_forest" # For binary classification target should be 0, 1. There should be no NaNs in target. rf_params = { "criterion": ["gini", "entropy"], "max_features": [0.5, 0.6, 0.7, 0.8, 0.9, 1.0], "min_samples_split": [10, 20, 30, 40, 50], "max_depth": [3, 4, 5, 6, 7], } classification_default_params = { "criterion": "gini", "max_features": 0.9, "min_samples_split": 30, "max_depth": 4, } additional = { "trees_in_step": 100, "train_cant_improve_limit": 1, "min_steps": 1, "max_steps": 50, "early_stopping_rounds": 50, "max_rows_limit": None, "max_cols_limit": None, } required_preprocessing = [ "missing_values_inputation", "convert_categorical", "datetime_transform", "text_transform", "target_as_integer", ] AlgorithmsRegistry.add( BINARY_CLASSIFICATION, RandomForestAlgorithm, rf_params, required_preprocessing, additional, classification_default_params, ) AlgorithmsRegistry.add( MULTICLASS_CLASSIFICATION, RandomForestAlgorithm, rf_params, required_preprocessing, additional, classification_default_params, ) # # REGRESSION # regression_rf_params = { "criterion": [ "squared_error" ], # remove "mae" because it slows down a lot https://github.com/scikit-learn/scikit-learn/issues/9626 "max_features": [0.5, 0.6, 0.7, 0.8, 0.9, 1.0], "min_samples_split": [10, 20, 30, 40, 50], "max_depth": [3, 4, 5, 6, 7], } regression_default_params = { "criterion": "squared_error", "max_features": 0.9, "min_samples_split": 30, "max_depth": 4, } regression_additional = { "trees_in_step": 100, "train_cant_improve_limit": 1, "min_steps": 1, "max_steps": 50, "early_stopping_rounds": 50, "max_rows_limit": None, "max_cols_limit": None, } regression_required_preprocessing = [ "missing_values_inputation", "convert_categorical", "datetime_transform", "text_transform", "target_scale", ] AlgorithmsRegistry.add( REGRESSION, RandomForestRegressorAlgorithm, regression_rf_params, regression_required_preprocessing, regression_additional, regression_default_params, ) ``` -------------------------------------------------------------------------------- /tests/tests_algorithms/test_xgboost.py: -------------------------------------------------------------------------------- ```python import os import tempfile import unittest import numpy as np import pandas as pd from numpy.testing import assert_almost_equal from sklearn import datasets from supervised.algorithms.xgboost import XgbAlgorithm, additional from supervised.utils.constants import BINARY_CLASSIFICATION from supervised.utils.metric import Metric additional["max_rounds"] = 1 class XgboostAlgorithmTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.X, cls.y = datasets.make_classification( n_samples=100, n_features=5, n_informative=4, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) def test_reproduce_fit(self): metric = Metric({"name": "logloss"}) params = { "objective": "binary:logistic", "eval_metric": "logloss", "seed": 1, "ml_task": BINARY_CLASSIFICATION, } prev_loss = None for _ in range(3): xgb = XgbAlgorithm(params) xgb.fit(self.X, self.y) y_predicted = xgb.predict(self.X) loss = metric(self.y, y_predicted) if prev_loss is not None: assert_almost_equal(prev_loss, loss) prev_loss = loss def test_copy(self): metric = Metric({"name": "logloss"}) params = { "objective": "binary:logistic", "eval_metric": "logloss", "ml_task": BINARY_CLASSIFICATION, } xgb = XgbAlgorithm(params) xgb.fit(self.X, self.y) y_predicted = xgb.predict(self.X) loss = metric(self.y, y_predicted) xgb2 = XgbAlgorithm(params) self.assertTrue(xgb2.model is None) # model is set to None, while initialized xgb2 = xgb.copy() self.assertEqual(type(xgb), type(xgb2)) y_predicted = xgb2.predict(self.X) loss2 = metric(self.y, y_predicted) self.assertEqual(loss, loss2) self.assertNotEqual(id(xgb), id(xgb2)) def test_save_and_load(self): metric = Metric({"name": "logloss"}) params = { "objective": "binary:logistic", "eval_metric": "logloss", "ml_task": BINARY_CLASSIFICATION, } xgb = XgbAlgorithm(params) xgb.fit(self.X, self.y) y_predicted = xgb.predict(self.X) loss = metric(self.y, y_predicted) filename = os.path.join(tempfile.gettempdir(), os.urandom(12).hex()) xgb.save(filename) xgb2 = XgbAlgorithm(params) self.assertTrue(xgb2.model is None) xgb2.load(filename) # Finished with the file, delete it os.remove(filename) y_predicted = xgb2.predict(self.X) loss2 = metric(self.y, y_predicted) assert_almost_equal(loss, loss2) def test_save_and_load_with_early_stopping(self): metric = Metric({"name": "logloss"}) params = { "objective": "binary:logistic", "eval_metric": "logloss", "ml_task": BINARY_CLASSIFICATION, } xgb = XgbAlgorithm(params) xgb.fit(self.X, self.y, X_validation=self.X, y_validation=self.y) y_predicted = xgb.predict(self.X) loss = metric(self.y, y_predicted) filename = os.path.join(tempfile.gettempdir(), os.urandom(12).hex()) prev_best_iteration = xgb.model.best_iteration xgb.save(filename) xgb2 = XgbAlgorithm(params) self.assertTrue(xgb2.model is None) xgb2.load(filename) # Finished with the file, delete it os.remove(filename) y_predicted = xgb2.predict(self.X) loss2 = metric(self.y, y_predicted) assert_almost_equal(loss, loss2) self.assertEqual(prev_best_iteration, xgb2.model.best_iteration) def test_restricted_characters_in_feature_name(self): df = pd.DataFrame( { "y": np.random.randint(0, 2, size=100), "[test1]": np.random.uniform(0, 1, size=100), "test2 < 1": np.random.uniform(0, 1, size=100), } ) y = df.iloc[:, 0] X = df.iloc[:, 1:] metric = Metric({"name": "logloss"}) params = { "objective": "binary:logistic", "eval_metric": "logloss", "ml_task": BINARY_CLASSIFICATION, } xgb = XgbAlgorithm(params) xgb.fit(X, y) xgb.predict(X) def test_get_metric_name(self): params = { "objective": "binary:logistic", "eval_metric": "logloss", "ml_task": BINARY_CLASSIFICATION, } model = XgbAlgorithm(params) self.assertEqual(model.get_metric_name(), "logloss") params = {"eval_metric": "rmse"} model = XgbAlgorithm(params) self.assertEqual(model.get_metric_name(), "rmse") def test_is_fitted(self): params = { "objective": "binary:logistic", "eval_metric": "logloss", "ml_task": BINARY_CLASSIFICATION, } model = XgbAlgorithm(params) self.assertFalse(model.is_fitted()) model.fit(self.X, self.y) self.assertTrue(model.is_fitted()) ``` -------------------------------------------------------------------------------- /tests/tests_preprocessing/test_goldenfeatures_transformer.py: -------------------------------------------------------------------------------- ```python import shutil import tempfile import unittest import numpy as np import pandas as pd from sklearn import datasets from supervised.algorithms.registry import ( BINARY_CLASSIFICATION, MULTICLASS_CLASSIFICATION, REGRESSION, ) from supervised.preprocessing.goldenfeatures_transformer import ( GoldenFeaturesTransformer, ) class GoldenFeaturesTransformerTest(unittest.TestCase): automl_dir = "automl_testing" def tearDown(self): shutil.rmtree(self.automl_dir, ignore_errors=True) def test_transformer(self): X, y = datasets.make_classification( n_samples=100, n_features=10, n_informative=6, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) df = pd.DataFrame(X, columns=[f"f{i}" for i in range(X.shape[1])]) with tempfile.TemporaryDirectory() as tmpdir: gft = GoldenFeaturesTransformer(tmpdir, "binary_classification") gft.fit(df, y) df = gft.transform(df) gft3 = GoldenFeaturesTransformer(tmpdir, "binary_classification") gft3.from_json(gft.to_json(), tmpdir) def test_subsample_regression_10k(self): rows = 10000 X = np.random.rand(rows, 3) X = pd.DataFrame(X, columns=[f"f{i}" for i in range(3)]) y = pd.Series(np.random.rand(rows), name="target") gft3 = GoldenFeaturesTransformer(self.automl_dir, REGRESSION) X_train, X_test, y_train, y_test = gft3._subsample(X, y) self.assertTrue(X_train.shape[0], 2500) self.assertTrue(X_test.shape[0], 2500) self.assertTrue(y_train.shape[0], 2500) self.assertTrue(y_test.shape[0], 2500) def test_subsample_regression_4k(self): rows = 4000 X = np.random.rand(rows, 3) X = pd.DataFrame(X, columns=[f"f{i}" for i in range(3)]) y = pd.Series(np.random.rand(rows), name="target") gft3 = GoldenFeaturesTransformer(self.automl_dir, REGRESSION) X_train, X_test, y_train, y_test = gft3._subsample(X, y) self.assertTrue(X_train.shape[0], 2000) self.assertTrue(X_test.shape[0], 2000) self.assertTrue(y_train.shape[0], 2000) self.assertTrue(y_test.shape[0], 2000) def test_subsample_multiclass_10k(self): rows = 10000 X = np.random.rand(rows, 3) X = pd.DataFrame(X, columns=[f"f{i}" for i in range(3)]) y = pd.Series(np.random.randint(0, 4, rows), name="target") gft3 = GoldenFeaturesTransformer(self.automl_dir, MULTICLASS_CLASSIFICATION) X_train, X_test, y_train, y_test = gft3._subsample(X, y) self.assertTrue(X_train.shape[0], 2500) self.assertTrue(X_test.shape[0], 2500) self.assertTrue(y_train.shape[0], 2500) self.assertTrue(y_test.shape[0], 2500) for uni in [np.unique(y_train), np.unique(y_test)]: for i in range(4): self.assertTrue(i in uni) def test_subsample_multiclass_4k(self): rows = 4000 X = np.random.rand(rows, 3) X = pd.DataFrame(X, columns=[f"f{i}" for i in range(3)]) y = pd.Series(np.random.randint(0, 4, rows), name="target") gft3 = GoldenFeaturesTransformer(self.automl_dir, MULTICLASS_CLASSIFICATION) X_train, X_test, y_train, y_test = gft3._subsample(X, y) self.assertTrue(X_train.shape[0], 2000) self.assertTrue(X_test.shape[0], 2000) self.assertTrue(y_train.shape[0], 2000) self.assertTrue(y_test.shape[0], 2000) for uni in [np.unique(y_train), np.unique(y_test)]: for i in range(4): self.assertTrue(i in uni) def test_subsample_binclass_4k(self): rows = 4000 X = np.random.rand(rows, 3) X = pd.DataFrame(X, columns=[f"f{i}" for i in range(3)]) y = pd.Series(np.random.randint(0, 2, rows), name="target") gft3 = GoldenFeaturesTransformer(self.automl_dir, BINARY_CLASSIFICATION) X_train, X_test, y_train, y_test = gft3._subsample(X, y) self.assertTrue(X_train.shape[0], 2000) self.assertTrue(X_test.shape[0], 2000) self.assertTrue(y_train.shape[0], 2000) self.assertTrue(y_test.shape[0], 2000) for uni in [np.unique(y_train), np.unique(y_test)]: for i in range(2): self.assertTrue(i in uni) def test_features_count(self): N_COLS = 10 X, y = datasets.make_classification( n_samples=100, n_features=N_COLS, n_informative=6, n_redundant=1, n_classes=2, n_clusters_per_class=3, n_repeated=0, shuffle=False, random_state=0, ) df = pd.DataFrame(X, columns=[f"f{i}" for i in range(X.shape[1])]) with tempfile.TemporaryDirectory() as tmpdir: FEATURES_COUNT = 42 gft = GoldenFeaturesTransformer( tmpdir, "binary_classification", features_count=FEATURES_COUNT ) gft.fit(df, y) self.assertEqual(len(gft._new_features), FEATURES_COUNT) gft3 = GoldenFeaturesTransformer(tmpdir, "binary_classification") gft3.from_json(gft.to_json(), tmpdir) df = gft3.transform(df) self.assertEqual(df.shape[1], N_COLS + FEATURES_COUNT) ``` -------------------------------------------------------------------------------- /supervised/tuner/optuna/catboost.py: -------------------------------------------------------------------------------- ```python import optuna from catboost import CatBoostClassifier, CatBoostRegressor, Pool from supervised.algorithms.catboost import catboost_eval_metric, catboost_objective from supervised.algorithms.registry import ( BINARY_CLASSIFICATION, MULTICLASS_CLASSIFICATION, REGRESSION, ) from supervised.utils.metric import ( CatBoostEvalMetricAveragePrecision, CatBoostEvalMetricMSE, CatBoostEvalMetricPearson, CatBoostEvalMetricSpearman, CatBoostEvalMetricUserDefined, Metric, ) EPS = 1e-8 class CatBoostObjective: def __init__( self, ml_task, X_train, y_train, sample_weight, X_validation, y_validation, sample_weight_validation, eval_metric, cat_features_indices, n_jobs, random_state, ): self.ml_task = ml_task self.X_train = X_train self.y_train = y_train self.sample_weight = sample_weight self.X_validation = X_validation self.y_validation = y_validation self.eval_metric = eval_metric self.cat_features = cat_features_indices self.eval_set = Pool( data=X_validation, label=y_validation, cat_features=self.cat_features, weight=sample_weight_validation, ) self.n_jobs = n_jobs self.rounds = 1000 self.learning_rate = 0.0125 self.early_stopping_rounds = 50 self.seed = random_state self.objective = catboost_objective(ml_task, self.eval_metric.name) self.eval_metric_name = catboost_eval_metric(ml_task, self.eval_metric.name) self.custom_eval_metric = None if self.eval_metric_name == "spearman": self.custom_eval_metric = CatBoostEvalMetricSpearman() elif self.eval_metric_name == "pearson": self.custom_eval_metric = CatBoostEvalMetricPearson() elif self.eval_metric_name == "average_precision": self.custom_eval_metric = CatBoostEvalMetricAveragePrecision() elif self.eval_metric_name == "mse": self.custom_eval_metric = CatBoostEvalMetricMSE() elif self.eval_metric_name == "user_defined_metric": self.custom_eval_metric = CatBoostEvalMetricUserDefined() def __call__(self, trial): try: params = { "iterations": self.rounds, "learning_rate": trial.suggest_categorical( "learning_rate", [0.05, 0.1, 0.2] ), "depth": trial.suggest_int("depth", 2, 9), "l2_leaf_reg": trial.suggest_float( "l2_leaf_reg", 0.0001, 10.0, log=False ), "random_strength": trial.suggest_float( "random_strength", EPS, 10.0, log=False ), "rsm": trial.suggest_float("rsm", 0.1, 1), # colsample_bylevel=rsm "loss_function": self.objective, "eval_metric": self.eval_metric_name, "verbose": False, "allow_writing_files": False, "thread_count": self.n_jobs, "random_seed": self.seed, # "border_count": trial.suggest_int("border_count", 16, 2048), "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 1, 100), # "bootstrap_type": "Bernoulli" # trial.suggest_categorical( # "bootstrap_type", ["Bayesian", "Bernoulli", "MVS"] # ), } # if params["bootstrap_type"] == "Bayesian": # params["bagging_temperature"] = trial.suggest_float( # "bagging_temperature", 0, 10 # ) # elif params["bootstrap_type"] in ["Bernoulli", "MVS"]: # params["subsample"] = trial.suggest_float("subsample", 0.1, 1) Algorithm = ( CatBoostRegressor if self.ml_task == REGRESSION else CatBoostClassifier ) if self.custom_eval_metric is not None: params["eval_metric"] = self.custom_eval_metric model = Algorithm(**params) model.fit( self.X_train, self.y_train, sample_weight=self.sample_weight, early_stopping_rounds=self.early_stopping_rounds, eval_set=self.eval_set, verbose_eval=False, cat_features=self.cat_features, ) if self.ml_task == BINARY_CLASSIFICATION: preds = model.predict_proba( self.X_validation, ntree_end=model.best_iteration_ + 1 )[:, 1] elif self.ml_task == MULTICLASS_CLASSIFICATION: preds = model.predict_proba( self.X_validation, ntree_end=model.best_iteration_ + 1 ) else: # REGRESSION preds = model.predict( self.X_validation, ntree_end=model.best_iteration_ + 1 ) score = self.eval_metric(self.y_validation, preds) if Metric.optimize_negative(self.eval_metric.name): score *= -1.0 except optuna.exceptions.TrialPruned as e: raise e except Exception as e: print("Exception in CatBoostObjective", str(e)) # import traceback # print(traceback.format_exc()) return None return score ``` -------------------------------------------------------------------------------- /supervised/validation/validator_kfold.py: -------------------------------------------------------------------------------- ```python import gc import logging import os import warnings import numpy as np log = logging.getLogger(__name__) from sklearn.model_selection import KFold, StratifiedKFold from supervised.exceptions import AutoMLException from supervised.utils.utils import load_data from supervised.validation.validator_base import BaseValidator class KFoldValidator(BaseValidator): def __init__(self, params): BaseValidator.__init__(self, params) self.k_folds = self.params.get("k_folds", 5) self.shuffle = self.params.get("shuffle", True) self.stratify = self.params.get("stratify", False) self.random_seed = self.params.get("random_seed", 1906) self.repeats = self.params.get("repeats", 1) if not self.shuffle and self.repeats > 1: warnings.warn( "Disable repeats in validation because shuffle is disabled", UserWarning ) self.repeats = 1 self.skf = [] for r in range(self.repeats): random_seed = self.random_seed + r if self.shuffle else None if self.stratify: if self.shuffle: self.skf += [ StratifiedKFold( n_splits=self.k_folds, shuffle=self.shuffle, random_state=random_seed, ) ] else: self.skf += [ StratifiedKFold( n_splits=self.k_folds, shuffle=self.shuffle, random_state=random_seed, ) ] else: self.skf += [ KFold( n_splits=self.k_folds, shuffle=self.shuffle, random_state=random_seed, ) ] self._results_path = self.params.get("results_path") self._X_path = self.params.get("X_path") self._y_path = self.params.get("y_path") self._sample_weight_path = self.params.get("sample_weight_path") self._sensitive_features_path = self.params.get("sensitive_features_path") if self._X_path is None or self._y_path is None: raise AutoMLException("No data path set in KFoldValidator params") folds_path = os.path.join(self._results_path, "folds") if not os.path.exists(folds_path): os.mkdir(folds_path) X = load_data(self._X_path) y = load_data(self._y_path) y = y["target"] if isinstance(y[0], bytes): # see https://github.com/scikit-learn/scikit-learn/issues/16980 y = y.astype(str) for repeat_cnt, skf in enumerate(self.skf): for fold_cnt, (train_index, validation_index) in enumerate( skf.split(X, y) ): repeat_str = f"_repeat_{repeat_cnt}" if len(self.skf) > 1 else "" train_index_file = os.path.join( self._results_path, "folds", f"fold_{fold_cnt}{repeat_str}_train_indices.npy", ) validation_index_file = os.path.join( self._results_path, "folds", f"fold_{fold_cnt}{repeat_str}_validation_indices.npy", ) np.save(train_index_file, train_index) np.save(validation_index_file, validation_index) del X del y gc.collect() else: log.debug("Folds split already done, reuse it") def get_split(self, k, repeat=0): repeat_str = f"_repeat_{repeat}" if self.repeats > 1 else "" train_index_file = os.path.join( self._results_path, "folds", f"fold_{k}{repeat_str}_train_indices.npy" ) validation_index_file = os.path.join( self._results_path, "folds", f"fold_{k}{repeat_str}_validation_indices.npy" ) train_index = np.load(train_index_file) validation_index = np.load(validation_index_file) X = load_data(self._X_path) y = load_data(self._y_path) y = y["target"] sample_weight = None if self._sample_weight_path is not None: sample_weight = load_data(self._sample_weight_path) sample_weight = sample_weight["sample_weight"] sensitive_features = None if self._sensitive_features_path is not None: sensitive_features = load_data(self._sensitive_features_path) train_data = {"X": X.loc[train_index], "y": y.loc[train_index]} validation_data = {"X": X.loc[validation_index], "y": y.loc[validation_index]} if sample_weight is not None: train_data["sample_weight"] = sample_weight.loc[train_index] validation_data["sample_weight"] = sample_weight.loc[validation_index] if sensitive_features is not None: train_data["sensitive_features"] = sensitive_features.loc[train_index] validation_data["sensitive_features"] = sensitive_features.loc[ validation_index ] return (train_data, validation_data) def get_n_splits(self): return self.k_folds def get_repeats(self): return self.repeats ``` -------------------------------------------------------------------------------- /tests/tests_preprocessing/disable_eda.py: -------------------------------------------------------------------------------- ```python import os import shutil import unittest import numpy as np import pandas as pd from sklearn import datasets from supervised import AutoML from supervised.preprocessing.eda import EDA class EDATest(unittest.TestCase): automl_dir = "automl_tests" def tearDown(self): shutil.rmtree(self.automl_dir, ignore_errors=True) def test_explain_default(self): a = AutoML( results_path=self.automl_dir, total_time_limit=5, algorithms=["Baseline"], train_ensemble=False, explain_level=2, ) X, y = datasets.make_classification(n_samples=100, n_features=5) X = pd.DataFrame(X, columns=[f"f_{i}" for i in range(X.shape[1])]) y = pd.Series(y, name="class") a.fit(X, y) result_files = os.listdir(os.path.join(a._results_path, "EDA")) for col in X.columns: self.assertTrue(f"{col}.png" in result_files) self.assertTrue("target.png" in result_files) self.assertTrue("README.md" in result_files) def test_column_name_to_filename(self): """Valid feature name should be untouched""" col = "feature_1" self.assertEqual(EDA.prepare(col), col) self.tearDown() def test_extensive_eda(self): """ Test for extensive_eda feature """ X, y = datasets.make_regression(n_samples=100, n_features=5) X = pd.DataFrame(X, columns=[f"f_{i}" for i in range(X.shape[1])]) y = pd.Series(y, name="class") results_path = self.automl_dir EDA.extensive_eda(X, y, results_path) result_files = os.listdir(results_path) for col in X.columns: self.assertTrue(f"{col}_target.png" in result_files) self.assertTrue("heatmap.png" in result_files) self.assertTrue("Extensive_EDA.md" in result_files) X, y = datasets.make_classification(n_samples=100, n_features=5) X = pd.DataFrame(X, columns=[f"f_{i}" for i in range(X.shape[1])]) y = pd.Series(y, name="class") results_path = self.automl_dir EDA.extensive_eda(X, y, results_path) result_files = os.listdir(results_path) for col in X.columns: self.assertTrue(f"{col}_target.png" in result_files) self.assertTrue("heatmap.png" in result_files) self.assertTrue("Extensive_EDA.md" in result_files) self.tearDown() def test_extensive_eda_missing(self): """ Test for dataframe with missing values """ X, y = datasets.make_regression(n_samples=100, n_features=5) X = pd.DataFrame(X, columns=[f"f_{i}" for i in range(X.shape[1])]) y = pd.Series(y, name="class") ##add some nan values X.loc[np.random.randint(0, 100, 20), "f_0"] = np.nan results_path = self.automl_dir EDA.extensive_eda(X, y, results_path) result_files = os.listdir(results_path) for col in X.columns: self.assertTrue(f"{col}_target.png" in result_files) self.assertTrue("heatmap.png" in result_files) self.assertTrue("Extensive_EDA.md" in result_files) X, y = datasets.make_regression(n_samples=100, n_features=5) X = pd.DataFrame(X, columns=[f"f_{i}" for i in range(X.shape[1])]) y = pd.Series(y, name="class") ##add some nan values X.loc[np.random.randint(0, 100, 20), "f_0"] = np.nan results_path = self.automl_dir EDA.extensive_eda(X, y, results_path) result_files = os.listdir(results_path) for col in X.columns: self.assertTrue(f"{col}_target.png" in result_files) self.assertTrue("heatmap.png" in result_files) self.assertTrue("Extensive_EDA.md" in result_files) self.tearDown() def test_symbol_feature(self): """ Test for columns with forbidden filenames """ X, y = datasets.make_regression(n_samples=100, n_features=5) X = pd.DataFrame(X, columns=[f"f_{i}" for i in range(X.shape[1])]) X.rename({"f_0": "ff*", "f_1": "fg/"}, axis=1, inplace=True) y = pd.Series(y, name="class") results_path = self.automl_dir EDA.extensive_eda(X, y, results_path) result_files = os.listdir(results_path) for col in X.columns: self.assertTrue(EDA.plot_fname(f"{col}_target") in result_files) self.assertTrue("heatmap.png" in result_files) self.assertTrue("Extensive_EDA.md" in result_files) self.tearDown() def test_naughty_column_name_to_filename(self): """Test with naughty strings. String from https://github.com/minimaxir/big-list-of-naughty-strings""" os.mkdir(self.automl_dir) naughty_columns = [ "feature_1", "*", "😍", "¯\_(ツ)_/¯", "表", "𠜎𠜱𠝹𠱓", "عاملة بولندا", "Ṱ̺̺̕o͞ ̷" "🇸🇦🇫🇦🇲", "⁰⁴⁵", "∆˚¬…æ", "!@#$%^&*()`~", "onfocus=JaVaSCript:alert(123) autofocus", "`\"'><img src=xxx:x \x20onerror=javascript:alert(1)>", 'System("ls -al /")', 'Kernel.exec("ls -al /")', "لُلُصّبُلُل" "{% print 'x' * 64 * 1024**3 %}", '{{ "".__class__.__mro__[2].__subclasses__()[40]("/etc/passwd").read() }}', "ÜBER Über German Umlaut", "影師嗎", "C'est déjà l'été." "Nín hǎo. Wǒ shì zhōng guó rén", "Компьютер", "jaja---lol-méméméoo--a", ] for col in naughty_columns: fname = EDA.plot_path(self.automl_dir, col) with open(fname, "w") as fout: fout.write("ok") self.tearDown() ``` -------------------------------------------------------------------------------- /supervised/algorithms/linear.py: -------------------------------------------------------------------------------- ```python import logging import os import numpy as np import pandas as pd import sklearn from sklearn.base import ClassifierMixin, RegressorMixin from sklearn.linear_model import LinearRegression, LogisticRegression from supervised.algorithms.registry import ( BINARY_CLASSIFICATION, MULTICLASS_CLASSIFICATION, REGRESSION, AlgorithmsRegistry, ) from supervised.algorithms.sklearn import SklearnAlgorithm from supervised.utils.config import LOG_LEVEL logger = logging.getLogger(__name__) logger.setLevel(LOG_LEVEL) class LinearAlgorithm(ClassifierMixin, SklearnAlgorithm): algorithm_name = "Logistic Regression" algorithm_short_name = "Linear" def __init__(self, params): super(LinearAlgorithm, self).__init__(params) logger.debug("LinearAlgorithm.__init__") self.max_iters = 1 self.library_version = sklearn.__version__ self.model = LogisticRegression( max_iter=500, tol=5e-4, n_jobs=self.params.get("n_jobs", -1) ) def is_fitted(self): return ( hasattr(self.model, "coef_") and self.model.coef_ is not None and self.model.coef_.shape[0] > 0 ) def file_extension(self): return "linear" def interpret( self, X_train, y_train, X_validation, y_validation, model_file_path, learner_name, target_name=None, class_names=None, metric_name=None, ml_task=None, explain_level=2, ): super(LinearAlgorithm, self).interpret( X_train, y_train, X_validation, y_validation, model_file_path, learner_name, target_name, class_names, metric_name, ml_task, explain_level, ) if explain_level == 0: return if X_train.shape[1] > 100: # if too many columns, skip this step return coefs = self.model.coef_ intercept = self.model.intercept_ if self.params["ml_task"] == BINARY_CLASSIFICATION: df = pd.DataFrame( { "feature": ["intercept"] + X_train.columns.tolist(), "weight": [intercept[0]] + list(coefs[0, :]), } ) df.to_csv( os.path.join(model_file_path, f"{learner_name}_coefs.csv"), index=False ) elif self.params["ml_task"] == MULTICLASS_CLASSIFICATION: classes = list(class_names) if isinstance(class_names, dict): classes = class_names.values() if len(classes) > 20: # if there are too many classes, skip this step return df = pd.DataFrame( np.transpose(np.column_stack((intercept, coefs))), index=["intercept"] + X_train.columns.tolist(), columns=classes, ) df.to_csv( os.path.join(model_file_path, f"{learner_name}_coefs.csv"), index=True ) class LinearRegressorAlgorithm(RegressorMixin, SklearnAlgorithm): algorithm_name = "Linear Regression" algorithm_short_name = "Linear" def __init__(self, params): super(LinearRegressorAlgorithm, self).__init__(params) logger.debug("LinearRegressorAlgorithm.__init__") self.max_iters = 1 self.library_version = sklearn.__version__ self.model = LinearRegression(n_jobs=self.params.get("n_jobs", -1)) def is_fitted(self): return ( hasattr(self.model, "coef_") and self.model.coef_ is not None and self.model.coef_.shape[0] > 0 ) def file_extension(self): return "linear" def interpret( self, X_train, y_train, X_validation, y_validation, model_file_path, learner_name, target_name=None, class_names=None, metric_name=None, ml_task=None, explain_level=2, ): super(LinearRegressorAlgorithm, self).interpret( X_train, y_train, X_validation, y_validation, model_file_path, learner_name, target_name, class_names, metric_name, ml_task, explain_level, ) if explain_level == 0: return if X_train.shape[1] > 100: # if too many columns, skip this step return coefs = self.model.coef_ intercept = self.model.intercept_ df = pd.DataFrame( { "feature": ["intercept"] + X_train.columns.tolist(), "weight": [intercept] + list(coefs), } ) df.to_csv( os.path.join(model_file_path, f"{learner_name}_coefs.csv"), index=False ) additional = {"max_steps": 1, "max_rows_limit": None, "max_cols_limit": None} required_preprocessing = [ "missing_values_inputation", "convert_categorical", "datetime_transform", "text_transform", "scale", "target_as_integer", ] AlgorithmsRegistry.add( BINARY_CLASSIFICATION, LinearAlgorithm, {}, required_preprocessing, additional, {} ) AlgorithmsRegistry.add( MULTICLASS_CLASSIFICATION, LinearAlgorithm, {}, required_preprocessing, additional, {}, ) regression_required_preprocessing = [ "missing_values_inputation", "convert_categorical", "datetime_transform", "text_transform", "scale", "target_scale", ] AlgorithmsRegistry.add( REGRESSION, LinearRegressorAlgorithm, {}, regression_required_preprocessing, additional, {}, ) ``` -------------------------------------------------------------------------------- /supervised/tuner/optuna/lightgbm.py: -------------------------------------------------------------------------------- ```python import lightgbm as lgb import numpy as np import optuna import optuna_integration import pandas as pd from supervised.algorithms.lightgbm import lightgbm_eval_metric, lightgbm_objective from supervised.algorithms.registry import ( MULTICLASS_CLASSIFICATION, ) from supervised.utils.metric import ( Metric, lightgbm_eval_metric_accuracy, lightgbm_eval_metric_average_precision, lightgbm_eval_metric_f1, lightgbm_eval_metric_pearson, lightgbm_eval_metric_r2, lightgbm_eval_metric_spearman, lightgbm_eval_metric_user_defined, ) EPS = 1e-8 class LightgbmObjective: def __init__( self, ml_task, X_train, y_train, sample_weight, X_validation, y_validation, sample_weight_validation, eval_metric, cat_features_indices, n_jobs, random_state, ): self.X_train = X_train self.y_train = y_train self.sample_weight = sample_weight self.X_validation = X_validation self.y_validation = y_validation self.sample_weight_validation = sample_weight_validation self.dtrain = lgb.Dataset( self.X_train.to_numpy() if isinstance(self.X_train, pd.DataFrame) else self.X_train, label=self.y_train, weight=self.sample_weight, ) self.dvalid = lgb.Dataset( self.X_validation.to_numpy() if isinstance(self.X_validation, pd.DataFrame) else self.X_validation, label=self.y_validation, weight=self.sample_weight_validation, ) self.cat_features_indices = cat_features_indices self.eval_metric = eval_metric self.learning_rate = 0.025 self.rounds = 1000 self.early_stopping_rounds = 50 self.seed = random_state self.n_jobs = n_jobs if n_jobs == -1: self.n_jobs = 0 self.objective = "" self.eval_metric_name = "" self.eval_metric_name, self.custom_eval_metric_name = lightgbm_eval_metric( ml_task, eval_metric.name ) self.custom_eval_metric = None if self.eval_metric.name == "r2": self.custom_eval_metric = lightgbm_eval_metric_r2 elif self.eval_metric.name == "spearman": self.custom_eval_metric = lightgbm_eval_metric_spearman elif self.eval_metric.name == "pearson": self.custom_eval_metric = lightgbm_eval_metric_pearson elif self.eval_metric.name == "f1": self.custom_eval_metric = lightgbm_eval_metric_f1 elif self.eval_metric.name == "average_precision": self.custom_eval_metric = lightgbm_eval_metric_average_precision elif self.eval_metric.name == "accuracy": self.custom_eval_metric = lightgbm_eval_metric_accuracy elif self.eval_metric.name == "user_defined_metric": self.custom_eval_metric = lightgbm_eval_metric_user_defined self.num_class = ( len(np.unique(y_train)) if ml_task == MULTICLASS_CLASSIFICATION else None ) self.objective = lightgbm_objective(ml_task, eval_metric.name) def __call__(self, trial): param = { "objective": self.objective, "metric": self.eval_metric_name, "verbosity": -1, "boosting_type": "gbdt", "learning_rate": trial.suggest_categorical( "learning_rate", [0.0125, 0.025, 0.05, 0.1] ), "num_leaves": trial.suggest_int("num_leaves", 2, 2048), "lambda_l1": trial.suggest_float("lambda_l1", 1e-8, 10.0, log=True), "lambda_l2": trial.suggest_float("lambda_l2", 1e-8, 10.0, log=True), "feature_fraction": min( trial.suggest_float("feature_fraction", 0.3, 1.0 + EPS), 1.0 ), "bagging_fraction": min( trial.suggest_float("bagging_fraction", 0.3, 1.0 + EPS), 1.0 ), "bagging_freq": trial.suggest_int("bagging_freq", 1, 7), "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 1, 100), "feature_pre_filter": False, "seed": self.seed, "num_threads": self.n_jobs, "extra_trees": trial.suggest_categorical("extra_trees", [True, False]), } if self.cat_features_indices: param["cat_feature"] = self.cat_features_indices param["cat_l2"] = trial.suggest_float("cat_l2", EPS, 100.0) param["cat_smooth"] = trial.suggest_float("cat_smooth", EPS, 100.0) if self.num_class is not None: param["num_class"] = self.num_class try: metric_name = self.eval_metric_name if metric_name == "custom": metric_name = self.custom_eval_metric_name pruning_callback = optuna_integration.LightGBMPruningCallback( trial, metric_name, "validation" ) early_stopping_callback = lgb.early_stopping( self.early_stopping_rounds, verbose=False ) gbm = lgb.train( param, self.dtrain, valid_sets=[self.dvalid], valid_names=["validation"], callbacks=[pruning_callback, early_stopping_callback], num_boost_round=self.rounds, feval=self.custom_eval_metric, ) preds = gbm.predict(self.X_validation) score = self.eval_metric(self.y_validation, preds) if Metric.optimize_negative(self.eval_metric.name): score *= -1.0 except optuna.exceptions.TrialPruned as e: raise e except Exception as e: print("Exception in LightgbmObjective", str(e)) return None return score ``` -------------------------------------------------------------------------------- /supervised/tuner/preprocessing_tuner.py: -------------------------------------------------------------------------------- ```python from supervised.algorithms.registry import ( BINARY_CLASSIFICATION, MULTICLASS_CLASSIFICATION, REGRESSION, ) from supervised.preprocessing.preprocessing_categorical import PreprocessingCategorical from supervised.preprocessing.preprocessing_missing import PreprocessingMissingValues from supervised.preprocessing.scale import Scale class PreprocessingTuner: """ This class prepare configuration for data preprocessing """ CATEGORICALS_MIX = "categorical_mix" # mix int and one-hot CATEGORICALS_ALL_INT = "categoricals_all_integers" @staticmethod def get( required_preprocessing, data_info, machinelearning_task, categorical_strategy=CATEGORICALS_ALL_INT, ): columns_preprocessing = {} columns_info = data_info["columns_info"] for col, preprocessing_needed in columns_info.items(): preprocessing_to_apply = [] # remove empty columns and columns with only one variable if ( "empty_column" in preprocessing_needed or "constant_column" in preprocessing_needed ): preprocessing_to_apply += ["remove_column"] columns_preprocessing[col] = preprocessing_to_apply continue # always check for missing values if ( "missing_values_inputation" in required_preprocessing and "missing_values" in preprocessing_needed ): preprocessing_to_apply += [PreprocessingMissingValues.FILL_NA_MEDIAN] # convert to categorical only for categorical types convert_to_integer_will_be_applied = False if ( "convert_categorical" in required_preprocessing # the algorithm needs converted categoricals and "categorical" in preprocessing_needed # the feature is categorical ): if categorical_strategy == PreprocessingTuner.CATEGORICALS_MIX: if PreprocessingCategorical.MANY_CATEGORIES in preprocessing_needed: preprocessing_to_apply += [ PreprocessingCategorical.CONVERT_INTEGER ] convert_to_integer_will_be_applied = True # maybe scale needed else: preprocessing_to_apply += [ PreprocessingCategorical.CONVERT_ONE_HOT ] else: # all integers preprocessing_to_apply += [PreprocessingCategorical.CONVERT_INTEGER] convert_to_integer_will_be_applied = True # maybe scale needed """ if PreprocessingCategorical.CONVERT_ONE_HOT in preprocessing_needed: preprocessing_to_apply += [PreprocessingCategorical.CONVERT_ONE_HOT] elif PreprocessingCategorical.CONVERT_LOO in preprocessing_needed: preprocessing_to_apply += [PreprocessingCategorical.CONVERT_LOO] convert_to_integer_will_be_applied = True # maybe scale needed else: preprocessing_to_apply += [PreprocessingCategorical.CONVERT_INTEGER] convert_to_integer_will_be_applied = True # maybe scale needed """ if ( "datetime_transform" in required_preprocessing and "datetime_transform" in preprocessing_needed ): preprocessing_to_apply += ["datetime_transform"] if ( "text_transform" in required_preprocessing and "text_transform" in preprocessing_needed ): preprocessing_to_apply += ["text_transform"] if "scale" in required_preprocessing: if ( convert_to_integer_will_be_applied or "scale" in preprocessing_needed ): preprocessing_to_apply += [Scale.SCALE_NORMAL] # remeber which preprocessing we need to apply if preprocessing_to_apply: columns_preprocessing[col] = preprocessing_to_apply target_info = data_info["target_info"] target_preprocessing = [] # always remove missing values from target, # target with missing values might be in the train and in the validation datasets target_preprocessing += [PreprocessingMissingValues.NA_EXCLUDE] if "target_as_integer" in required_preprocessing: if machinelearning_task == BINARY_CLASSIFICATION: if "convert_0_1" in target_info: target_preprocessing += [PreprocessingCategorical.CONVERT_INTEGER] if machinelearning_task == MULTICLASS_CLASSIFICATION: # if PreprocessingUtils.is_categorical(y): # always convert to integer, there can be many situations that can break # for example, classes starting from 1, ... # or classes not for every number, for example 0,2,3,4 # just always convert target_preprocessing += [PreprocessingCategorical.CONVERT_INTEGER] elif "target_as_one_hot" in required_preprocessing: target_preprocessing += [PreprocessingCategorical.CONVERT_ONE_HOT] if ( machinelearning_task == REGRESSION and "target_scale" in required_preprocessing ): if "scale_log" in target_info: target_preprocessing += [Scale.SCALE_LOG_AND_NORMAL] elif "scale" in target_info: target_preprocessing += [Scale.SCALE_NORMAL] return { "columns_preprocessing": columns_preprocessing, "target_preprocessing": target_preprocessing, "ml_task": machinelearning_task, } ``` -------------------------------------------------------------------------------- /supervised/algorithms/sklearn.py: -------------------------------------------------------------------------------- ```python import copy import logging import time import warnings import joblib import numpy as np import pandas as pd from supervised.algorithms.algorithm import BaseAlgorithm from supervised.algorithms.registry import ( BINARY_CLASSIFICATION, MULTICLASS_CLASSIFICATION, REGRESSION, ) from supervised.utils.config import LOG_LEVEL logger = logging.getLogger(__name__) logger.setLevel(LOG_LEVEL) class SklearnAlgorithm(BaseAlgorithm): def __init__(self, params): super(SklearnAlgorithm, self).__init__(params) def fit( self, X, y, sample_weight=None, X_validation=None, y_validation=None, sample_weight_validation=None, log_to_file=None, max_time=None, ): with warnings.catch_warnings(): warnings.simplefilter(action="ignore") self.model.fit(X, y, sample_weight=sample_weight) if self.params["ml_task"] != REGRESSION: self.classes_ = np.unique(y) def copy(self): return copy.deepcopy(self) def save(self, model_file_path): logger.debug("SklearnAlgorithm save to {0}".format(model_file_path)) joblib.dump(self.model, model_file_path, compress=True) self.model_file_path = model_file_path def load(self, model_file_path): logger.debug("SklearnAlgorithm loading model from {0}".format(model_file_path)) self.model = joblib.load(model_file_path) self.model_file_path = model_file_path def is_fitted(self): return ( hasattr(self.model, "n_features_in_") and self.model.n_features_in_ is not None and self.model.n_features_in_ > 0 ) def predict(self, X): self.reload() if self.params["ml_task"] == BINARY_CLASSIFICATION: return self.model.predict_proba(X)[:, 1] elif self.params["ml_task"] == MULTICLASS_CLASSIFICATION: return self.model.predict_proba(X) return self.model.predict(X) from supervised.utils.metric import Metric def predict_proba_function_binary(estimator, X): return estimator.predict_proba(X)[:, 1] def predict_proba_function_multiclass(estimator, X): return estimator.predict_proba(X) class SklearnTreesEnsembleClassifierAlgorithm(SklearnAlgorithm): def __init__(self, params): super(SklearnTreesEnsembleClassifierAlgorithm, self).__init__(params) self.log_metric = Metric( {"name": self.params.get("eval_metric_name", "logloss")} ) self.max_iters = ( 1 # max iters is used by model_framework, max_steps is used internally ) if params.get("ml_task") == BINARY_CLASSIFICATION: self.predict_function = predict_proba_function_binary else: self.predict_function = predict_proba_function_multiclass def fit( self, X, y, sample_weight=None, X_validation=None, y_validation=None, sample_weight_validation=None, log_to_file=None, max_time=None, ): max_steps = self.max_steps n_estimators = 0 min_val = 10e12 min_e = 0 p_tr, p_vd = None, None result = {"iteration": [], "train": [], "validation": []} start_time = time.time() with warnings.catch_warnings(): warnings.simplefilter(action="ignore") for i in range(max_steps): self.model.fit(X, np.ravel(y), sample_weight=sample_weight) self.model.n_estimators += self.trees_in_step if X_validation is None or y_validation is None: continue estimators = self.model.estimators_ stop = False for e in range(n_estimators, len(estimators)): p = self.predict_function(estimators[e], X) if p_tr is None: p_tr = p else: p_tr += p p = self.predict_function(estimators[e], X_validation) if p_vd is None: p_vd = p else: p_vd += p tr = self.log_metric( y, p_tr / float(e + 1), sample_weight=sample_weight ) vd = self.log_metric( y_validation, p_vd / float(e + 1), sample_weight=sample_weight_validation, ) if vd < min_val: # optimize direction min_val = vd min_e = e if e - min_e >= self.early_stopping_rounds: stop = True break result["iteration"] += [e] result["train"] += [tr] result["validation"] += [vd] # disable for now ... # if max_time is not None and time.time()-start_time > max_time: # stop = True if stop: self.model.estimators_ = estimators[: (min_e + 1)] break n_estimators = len(estimators) if log_to_file is not None: df_result = pd.DataFrame(result) if self.log_metric.is_negative(): df_result["train"] *= -1.0 df_result["validation"] *= -1.0 df_result.to_csv(log_to_file, index=False, header=False) self.classes_ = np.unique(y) def get_metric_name(self): return self.params.get("eval_metric_name", "logloss") def predict_function(estimator, X): return estimator.predict(X) class SklearnTreesEnsembleRegressorAlgorithm(SklearnTreesEnsembleClassifierAlgorithm): def __init__(self, params): super(SklearnTreesEnsembleRegressorAlgorithm, self).__init__(params) self.log_metric = Metric({"name": self.params.get("eval_metric_name", "rmse")}) self.predict_function = predict_function def get_metric_name(self): return self.params.get("eval_metric_name", "rmse") ``` -------------------------------------------------------------------------------- /tests/tests_preprocessing/test_preprocessing_missing.py: -------------------------------------------------------------------------------- ```python import unittest import numpy as np import pandas as pd from supervised.preprocessing.preprocessing_missing import PreprocessingMissingValues class PreprocessingMissingValuesTest(unittest.TestCase): def test_preprocessing_constructor(self): """ Check if PreprocessingMissingValues object is properly initialized """ preprocess_missing = PreprocessingMissingValues( PreprocessingMissingValues.FILL_NA_MEDIAN ) self.assertEqual( preprocess_missing._na_fill_method, PreprocessingMissingValues.FILL_NA_MEDIAN, ) self.assertEqual(preprocess_missing._na_fill_params, {}) def test_get_fill_value(self): """ Check if correct value is returned for filling in case of different column type and fill method """ d = {"col1": [1, 2, 3, np.nan, np.nan], "col2": ["a", "a", np.nan, "b", "c"]} df = pd.DataFrame(data=d) # fill with median preprocess_missing = PreprocessingMissingValues( df.columns, PreprocessingMissingValues.FILL_NA_MEDIAN ) self.assertEqual(preprocess_missing._get_fill_value(df["col1"]), 2) self.assertEqual(preprocess_missing._get_fill_value(df["col2"]), "a") # fill with mean preprocess_missing = PreprocessingMissingValues( df.columns, PreprocessingMissingValues.FILL_NA_MEDIAN ) self.assertEqual(preprocess_missing._get_fill_value(df["col1"]), 2) self.assertEqual(preprocess_missing._get_fill_value(df["col2"]), "a") # fill with min preprocess_missing = PreprocessingMissingValues( df.columns, PreprocessingMissingValues.FILL_NA_MIN ) self.assertEqual(preprocess_missing._get_fill_value(df["col1"]), 0) self.assertEqual( preprocess_missing._get_fill_value(df["col2"]), "_missing_value_" ) # added new value def test_fit_na_fill(self): """ Check fit private method """ d = { "col1": [1, 2, 3, np.nan, np.nan], "col2": ["a", "a", np.nan, "b", "c"], "col3": ["a", "a", "d", "b", "c"], } df = pd.DataFrame(data=d) # fill with median preprocess_missing = PreprocessingMissingValues( df.columns, PreprocessingMissingValues.FILL_NA_MEDIAN ) preprocess_missing._fit_na_fill(df) self.assertTrue("col1" in preprocess_missing._na_fill_params) self.assertTrue("col2" in preprocess_missing._na_fill_params) self.assertTrue("col3" not in preprocess_missing._na_fill_params) self.assertEqual(2, preprocess_missing._na_fill_params["col1"]) self.assertEqual("a", preprocess_missing._na_fill_params["col2"]) # fill with mean preprocess_missing = PreprocessingMissingValues( df.columns, PreprocessingMissingValues.FILL_NA_MEAN ) preprocess_missing._fit_na_fill(df) self.assertTrue("col1" in preprocess_missing._na_fill_params) self.assertTrue("col2" in preprocess_missing._na_fill_params) self.assertTrue("col3" not in preprocess_missing._na_fill_params) self.assertEqual(2, preprocess_missing._na_fill_params["col1"]) self.assertEqual("a", preprocess_missing._na_fill_params["col2"]) # fill with min preprocess_missing = PreprocessingMissingValues( df.columns, PreprocessingMissingValues.FILL_NA_MIN ) preprocess_missing._fit_na_fill(df) self.assertTrue("col1" in preprocess_missing._na_fill_params) self.assertTrue("col2" in preprocess_missing._na_fill_params) self.assertTrue("col3" not in preprocess_missing._na_fill_params) self.assertEqual(0, preprocess_missing._na_fill_params["col1"]) self.assertEqual("_missing_value_", preprocess_missing._na_fill_params["col2"]) def test_transform(self): """ Check transform """ # training data d = { "col1": [1, 2, 3, np.nan, np.nan], "col2": ["a", "a", np.nan, "a", "c"], "col3": [1, 1, 3, 1, 1], "col4": ["a", "a", "a", "c", "a"], } df = pd.DataFrame(data=d) # test data d_test = { "col1": [1, 2, 3, np.nan, np.nan], "col2": ["b", "b", np.nan, "b", "c"], "col3": [1, 2, 2, np.nan, 2], "col4": ["b", "b", np.nan, "b", "c"], } df_test = pd.DataFrame(data=d_test) # fill with median preprocess_missing = PreprocessingMissingValues( df.columns, PreprocessingMissingValues.FILL_NA_MEDIAN ) preprocess_missing.fit(df) self.assertEqual( 2, len(preprocess_missing._na_fill_params) ) # there should be only two columns df_transformed = preprocess_missing.transform(df_test) self.assertTrue( np.isnan(df.loc[3, "col1"]) ) # training data frame is not filled self.assertEqual( 2, df_test.loc[3, "col1"] ) # data frame is filled after transform self.assertEqual("a", df_test.loc[2, "col2"]) # it is disabled, should be treated separately at the end of preprocessing # columns without missing values in training set are also filled # but they are filled based on their own values # self.assertEqual(2, df_test.loc[3, "col3"]) # self.assertEqual("b", df_test.loc[3, "col4"]) def test_transform_on_new_data(self): # training data d = { "col1": [1, 1, np.nan, 3], "col2": ["a", "a", np.nan, "a"], "col3": [1, 1, 1, 3], "col4": ["a", "a", "b", "c"], "y": [0, 1, 1, 1], } df = pd.DataFrame(data=d) X_train = df.loc[:, ["col1", "col2", "col3", "col4"]] y_train = df.loc[:, "y"] d_test = { "col1": [1, 1, np.nan, 3], "col2": ["a", "a", np.nan, "a"], "col3": [1, 1, 1, 3], "col4": ["a", "a", "b", "c"], "y": [np.nan, 1, np.nan, 1], } df_test = pd.DataFrame(data=d_test) X_test = df_test.loc[:, ["col1", "col2", "col3", "col4"]] y_test = df_test.loc[:, "y"] pm = PreprocessingMissingValues( X_train.columns, PreprocessingMissingValues.FILL_NA_MEDIAN ) pm.fit(X_train) X_train = pm.transform(X_train) X_test = pm.transform(X_test) self.assertEqual(1, X_test.loc[2, "col1"]) self.assertEqual("a", X_test.loc[2, "col2"]) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /tests/tests_preprocessing/test_categorical_integers.py: -------------------------------------------------------------------------------- ```python import unittest import pandas as pd from supervised.preprocessing.preprocessing_categorical import PreprocessingCategorical import warnings class CategoricalIntegersTest(unittest.TestCase): def test_constructor_preprocessing_categorical(self): """ Check if PreprocessingCategorical object is properly initialized """ categorical = PreprocessingCategorical( [], PreprocessingCategorical.CONVERT_INTEGER ) self.assertEqual( categorical._convert_method, PreprocessingCategorical.CONVERT_INTEGER ) self.assertEqual(categorical._convert_params, {}) def test_fit_integers(self): # training data d = { "col1": [1, 2, 3], "col2": ["a", "a", "c"], "col3": [1, 1, 3], "col4": ["a", "b", "c"], } df = pd.DataFrame(data=d) categorical = PreprocessingCategorical( df.columns, PreprocessingCategorical.CONVERT_INTEGER ) categorical.fit(df) self.assertTrue("col2" in categorical._convert_params) self.assertTrue("col4" in categorical._convert_params) self.assertTrue("a" in categorical._convert_params["col2"]) self.assertTrue("c" in categorical._convert_params["col2"]) self.assertTrue("b" not in categorical._convert_params["col2"]) self.assertTrue("a" in categorical._convert_params["col4"]) self.assertTrue("b" in categorical._convert_params["col4"]) self.assertTrue("c" in categorical._convert_params["col4"]) def test_fit_transform_integers(self): # training data d = { "col1": [1, 2, 3], "col2": ["a", "a", "c"], "col3": [1, 1, 3], "col4": ["a", "b", "c"], } df = pd.DataFrame(data=d) categorical = PreprocessingCategorical( df.columns, PreprocessingCategorical.CONVERT_INTEGER ) categorical.fit(df) df = categorical.transform(df) for col in ["col1", "col2", "col3", "col4"]: self.assertTrue(col in df.columns) self.assertEqual(df["col2"][0], 0) self.assertEqual(df["col2"][1], 0) self.assertEqual(df["col2"][2], 1) self.assertEqual(df["col4"][0], 0) self.assertEqual(df["col4"][1], 1) self.assertEqual(df["col4"][2], 2) def test_future_warning_pandas_transform(self): with warnings.catch_warnings(): warnings.simplefilter("error") # training data d = { "col1": [False, True, True], "col2": [False, False, True], "col3": [True, False, True], } df = pd.DataFrame(data=d) categorical = PreprocessingCategorical( df.columns, PreprocessingCategorical.CONVERT_INTEGER ) categorical.fit(df) df = categorical.transform(df).astype(int) def test_future_warning_pandas_inverse_transform(self): with warnings.catch_warnings(): warnings.simplefilter("error") # training data d = { "col1": [False, True, True], "col2": [False, False, True], "col3": [True, False, True], } df = pd.DataFrame(data=d) categorical = PreprocessingCategorical( df.columns, PreprocessingCategorical.CONVERT_INTEGER ) categorical.fit(df) df = categorical.transform(df).astype(int) df = categorical.inverse_transform(df) def test_fit_transform_inverse_transform_integers(self): # training data d = { "col1": [1, 2, 3], "col2": ["a", "a", "c"], "col3": [1, 1, 3], "col4": ["a", "b", "c"], } df = pd.DataFrame(data=d) categorical = PreprocessingCategorical( df.columns, PreprocessingCategorical.CONVERT_INTEGER ) categorical.fit(df) df_transform = categorical.transform(df).astype(int) df_inverse = categorical.inverse_transform(df_transform) for col in ["col1", "col2", "col3", "col4"]: self.assertTrue(col in df_inverse.columns) self.assertEqual(d["col2"][0], df_inverse["col2"][0]) self.assertEqual(d["col2"][1], df_inverse["col2"][1]) self.assertEqual(d["col2"][2], df_inverse["col2"][2]) self.assertEqual(d["col4"][0], df_inverse["col4"][0]) self.assertEqual(d["col4"][1], df_inverse["col4"][1]) self.assertEqual(d["col4"][2], df_inverse["col4"][2]) def test_fit_transform_integers_with_new_values(self): # training data d_train = { "col1": [1, 2, 3], "col2": ["a", "a", "c"], "col3": [1, 1, 3], "col4": ["a", "b", "c"], } df_train = pd.DataFrame(data=d_train) categorical = PreprocessingCategorical( df_train.columns, PreprocessingCategorical.CONVERT_INTEGER ) categorical.fit(df_train) # testing data d = { "col1": [1, 2, 3], "col2": ["a", "d", "f"], "col3": [1, 1, 3], "col4": ["e", "b", "z"], } df = pd.DataFrame(data=d) df = categorical.transform(df) for col in ["col1", "col2", "col3", "col4"]: self.assertTrue(col in df.columns) self.assertEqual(df["col2"][0], 0) self.assertEqual(df["col2"][1], 2) # new values get higher indexes self.assertEqual(df["col2"][2], 3) # new values get higher indexes self.assertEqual(df["col4"][0], 3) # new values get higher indexes self.assertEqual(df["col4"][1], 1) self.assertEqual(df["col4"][2], 4) # new values get higher indexes def test_to_and_from_json_convert_integers(self): # training data d = { "col1": [1, 2, 3], "col2": ["a", "a", "c"], "col3": [1, 1, 3], "col4": ["a", "b", "c"], } df = pd.DataFrame(data=d) cat1 = PreprocessingCategorical( df.columns, PreprocessingCategorical.CONVERT_INTEGER ) cat1.fit(df) cat2 = PreprocessingCategorical( df.columns, PreprocessingCategorical.CONVERT_INTEGER ) cat2.from_json(cat1.to_json()) df = cat2.transform(df) for col in ["col1", "col2", "col3", "col4"]: self.assertTrue(col in df.columns) self.assertEqual(df["col2"][0], 0) self.assertEqual(df["col2"][1], 0) self.assertEqual(df["col2"][2], 1) self.assertEqual(df["col4"][0], 0) self.assertEqual(df["col4"][1], 1) self.assertEqual(df["col4"][2], 2) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /tests/tests_validation/test_validator_kfold.py: -------------------------------------------------------------------------------- ```python import os import tempfile import unittest import pytest import numpy as np import pandas as pd from supervised.utils.utils import dump_data from supervised.validation.validator_kfold import KFoldValidator class KFoldValidatorTest(unittest.TestCase): def test_create(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array([[0, 0], [0, 1], [1, 0], [1, 1]]), columns=["a", "b"] ), "y": pd.DataFrame(np.array([0, 0, 1, 1]), columns=["target"]), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": False, "stratify": True, "k_folds": 2, "results_path": results_path, "X_path": X_path, "y_path": y_path, } vl = KFoldValidator(params) self.assertEqual(params["k_folds"], vl.get_n_splits()) # for train, validation in vl.split(): for k_fold in range(vl.get_n_splits()): train, validation = vl.get_split(k_fold) X_train, y_train = train.get("X"), train.get("y") X_validation, y_validation = validation.get("X"), validation.get("y") self.assertEqual(X_train.shape[0], 2) self.assertEqual(y_train.shape[0], 2) self.assertEqual(X_validation.shape[0], 2) self.assertEqual(y_validation.shape[0], 2) def test_missing_target_values(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array([[1, 0], [2, 1], [3, 0], [4, 1], [5, 1], [6, 1]]), columns=["a", "b"], ), "y": pd.DataFrame( np.array(["a", "b", "a", "b", np.nan, np.nan]), columns=["target"] ), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": False, "stratify": True, "k_folds": 2, "results_path": results_path, "X_path": X_path, "y_path": y_path, } vl = KFoldValidator(params) self.assertEqual(params["k_folds"], vl.get_n_splits()) for k_fold in range(vl.get_n_splits()): train, validation = vl.get_split(k_fold) X_train, y_train = train.get("X"), train.get("y") X_validation, y_validation = validation.get("X"), validation.get("y") self.assertEqual(X_train.shape[0], 3) self.assertEqual(y_train.shape[0], 3) self.assertEqual(X_validation.shape[0], 3) self.assertEqual(y_validation.shape[0], 3) def test_create_with_target_as_labels(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array([[0, 0], [0, 1], [1, 0], [1, 1]]), columns=["a", "b"] ), "y": pd.DataFrame(np.array(["a", "b", "a", "b"]), columns=["target"]), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": True, "stratify": True, "k_folds": 2, "results_path": results_path, "X_path": X_path, "y_path": y_path, } vl = KFoldValidator(params) self.assertEqual(params["k_folds"], vl.get_n_splits()) for k_fold in range(vl.get_n_splits()): train, validation = vl.get_split(k_fold) X_train, y_train = train.get("X"), train.get("y") X_validation, y_validation = validation.get("X"), validation.get("y") self.assertEqual(X_train.shape[0], 2) self.assertEqual(y_train.shape[0], 2) self.assertEqual(X_validation.shape[0], 2) self.assertEqual(y_validation.shape[0], 2) def test_repeats(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array([[0, 0], [0, 1], [1, 0], [1, 1]]), columns=["a", "b"] ), "y": pd.DataFrame(np.array([0, 0, 1, 1]), columns=["target"]), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": True, "stratify": False, "k_folds": 2, "repeats": 10, "results_path": results_path, "X_path": X_path, "y_path": y_path, "random_seed": 1, } vl = KFoldValidator(params) self.assertEqual(params["k_folds"], vl.get_n_splits()) self.assertEqual(params["repeats"], vl.get_repeats()) for repeat in range(vl.get_repeats()): for k_fold in range(vl.get_n_splits()): train, validation = vl.get_split(k_fold, repeat) X_train, y_train = train.get("X"), train.get("y") X_validation, y_validation = validation.get("X"), validation.get( "y" ) self.assertEqual(X_train.shape[0], 2) self.assertEqual(y_train.shape[0], 2) self.assertEqual(X_validation.shape[0], 2) self.assertEqual(y_validation.shape[0], 2) def test_disable_repeats_when_disabled_shuffle(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array([[0, 0], [0, 1], [1, 0], [1, 1]]), columns=["a", "b"] ), "y": pd.DataFrame(np.array([0, 0, 1, 1]), columns=["target"]), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": False, "stratify": False, "k_folds": 2, "repeats": 10, "results_path": results_path, "X_path": X_path, "y_path": y_path, "random_seed": 1, } with pytest.warns( expected_warning=UserWarning, match="Disable repeats in validation because shuffle is disabled", ) as record: vl = KFoldValidator(params) # check that only one warning was raised self.assertEqual(len(record), 1) self.assertEqual(params["k_folds"], vl.get_n_splits()) self.assertEqual(1, vl.get_repeats()) ``` -------------------------------------------------------------------------------- /tests/tests_validation/test_validator_split.py: -------------------------------------------------------------------------------- ```python import os import tempfile import unittest import pytest import numpy as np import pandas as pd from supervised.utils.utils import dump_data from supervised.validation.validator_split import SplitValidator class SplitValidatorTest(unittest.TestCase): def test_create(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array( [[0, 0], [0, 1], [1, 0], [0, 1], [1, 0], [0, 1], [1, 0], [1, 1]] ), columns=["a", "b"], ), "y": pd.DataFrame( np.array([0, 0, 1, 0, 1, 0, 1, 1]), columns=["target"] ), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": False, "stratify": False, "train_ratio": 0.5, "results_path": results_path, "X_path": X_path, "y_path": y_path, } vl = SplitValidator(params) self.assertEqual(1, vl.get_n_splits()) # for train, validation in vl.split(): for k_fold in range(vl.get_n_splits()): train, validation = vl.get_split(k_fold) X_train, y_train = train.get("X"), train.get("y") X_validation, y_validation = validation.get("X"), validation.get("y") self.assertEqual(X_train.shape[0], 4) self.assertEqual(y_train.shape[0], 4) self.assertEqual(X_validation.shape[0], 4) self.assertEqual(y_validation.shape[0], 4) def test_missing_target_values(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array([[1, 0], [2, 1], [3, 0], [4, 1], [5, 1], [6, 1]]), columns=["a", "b"], ), "y": pd.DataFrame( np.array(["a", "b", np.nan, "a", "b", np.nan]), columns=["target"] ), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": False, "stratify": False, "train_ratio": 0.5, "results_path": results_path, "X_path": X_path, "y_path": y_path, } vl = SplitValidator(params) self.assertEqual(1, vl.get_n_splits()) for k_fold in range(vl.get_n_splits()): train, validation = vl.get_split(k_fold) X_train, y_train = train.get("X"), train.get("y") X_validation, y_validation = validation.get("X"), validation.get("y") self.assertEqual(X_train.shape[0], 3) self.assertEqual(y_train.shape[0], 3) self.assertEqual(X_validation.shape[0], 3) self.assertEqual(y_validation.shape[0], 3) def test_create_with_target_as_labels(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array([[0, 0], [0, 1], [1, 0], [1, 1]]), columns=["a", "b"] ), "y": pd.DataFrame(np.array(["a", "b", "a", "b"]), columns=["target"]), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": True, "stratify": True, "train_ratio": 0.5, "results_path": results_path, "X_path": X_path, "y_path": y_path, } vl = SplitValidator(params) self.assertEqual(1, vl.get_n_splits()) for k_fold in range(vl.get_n_splits()): train, validation = vl.get_split(k_fold) X_train, y_train = train.get("X"), train.get("y") X_validation, y_validation = validation.get("X"), validation.get("y") self.assertEqual(X_train.shape[0], 2) self.assertEqual(y_train.shape[0], 2) self.assertEqual(X_validation.shape[0], 2) self.assertEqual(y_validation.shape[0], 2) def test_repeats(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array( [[0, 0], [0, 1], [1, 0], [0, 1], [1, 0], [0, 1], [1, 0], [1, 1]] ), columns=["a", "b"], ), "y": pd.DataFrame( np.array([0, 0, 1, 0, 1, 0, 1, 1]), columns=["target"] ), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": True, "stratify": False, "train_ratio": 0.5, "results_path": results_path, "X_path": X_path, "y_path": y_path, "repeats": 3, } vl = SplitValidator(params) self.assertEqual(1, vl.get_n_splits()) self.assertEqual(3, vl.get_repeats()) cnt = 0 for repeat in range(vl.get_repeats()): for k_fold in range(vl.get_n_splits()): train, validation = vl.get_split(k_fold, repeat) X_train, y_train = train.get("X"), train.get("y") X_validation, y_validation = validation.get("X"), validation.get( "y" ) self.assertEqual(X_train.shape[0], 4) self.assertEqual(y_train.shape[0], 4) self.assertEqual(X_validation.shape[0], 4) self.assertEqual(y_validation.shape[0], 4) cnt += 1 self.assertEqual(cnt, 3) def test_disable_repeats_when_disabled_shuffle(self): with tempfile.TemporaryDirectory() as results_path: data = { "X": pd.DataFrame( np.array( [[0, 0], [0, 1], [1, 0], [0, 1], [1, 0], [0, 1], [1, 0], [1, 1]] ), columns=["a", "b"], ), "y": pd.DataFrame( np.array([0, 0, 1, 0, 1, 0, 1, 1]), columns=["target"] ), } X_path = os.path.join(results_path, "X.data") y_path = os.path.join(results_path, "y.data") dump_data(X_path, data["X"]) dump_data(y_path, data["y"]) params = { "shuffle": False, "stratify": False, "train_ratio": 0.5, "results_path": results_path, "X_path": X_path, "y_path": y_path, "repeats": 3, } with pytest.warns( expected_warning=UserWarning, match="Disable repeats in validation because shuffle is disabled", ) as record: vl = SplitValidator(params) # check that only one warning was raised self.assertEqual(len(record), 1) self.assertEqual(1, vl.get_n_splits()) self.assertEqual(1, vl.get_repeats()) ```