diff --git a/h1st/model/xgboost/xgbclassifier.py b/h1st/model/ml/xgboost/classifier.py similarity index 59% rename from h1st/model/xgboost/xgbclassifier.py rename to h1st/model/ml/xgboost/classifier.py index afdcff07..16b50283 100644 --- a/h1st/model/xgboost/xgbclassifier.py +++ b/h1st/model/ml/xgboost/classifier.py @@ -1,4 +1,5 @@ -from h1st.model.xgboost.model import XGBRegressionModel +from h1st.model.ml.xgboost.regression import XGBRegressionModel, XGBRegressionModeler + class XGBClassifierModel(XGBRegressionModel): name = 'XGBClassifierModel' @@ -14,3 +15,10 @@ def apply_threshold(self, x): def set_threshold(self, threshold: float): self.stats['threshold'] = threshold + +class XGBClassifierModeler(XGBRegressionModeler): + model_class = XGBClassifierModel + + def __init__(self, threshold=0.5, **kwargs): + super().__init__(**kwargs) + self.stats['threshold'] = float(threshold) diff --git a/h1st/model/ml/xgboost/regression.py b/h1st/model/ml/xgboost/regression.py new file mode 100644 index 00000000..92695e3d --- /dev/null +++ b/h1st/model/ml/xgboost/regression.py @@ -0,0 +1,401 @@ +import pandas as pd +import numpy as np +import pytz +from loguru import logger +from typing import Any, Dict + +from datetime import datetime +from sklearn.preprocessing import StandardScaler +from xgboost import XGBRegressor + +from h1st.model.model import Model +from h1st.model.ml.xgboost.utils import extratree_rank_features, evaluate_regression_base_model + + +class XGBRegressionModel(Model): + + input_key = 'X' + output_key = 'predictions' + name = 'XGBRegressionModel' + + def __init__( + self, + result_key: str = 'result', + max_features: int = 50, + eta: float = 0.001, + n_estimators: int = 5, + max_depth: int = 3, + debug: bool = False, + ) -> None: + + super().__init__() + self.stats = { + 'result_key': result_key, + 'max_features': int(max_features), + 'eta': eta, + 'n_estimators': int(n_estimators), + 'max_depth': int(max_depth), + 'debug': debug, + } + + def predict(self, input_data: dict) -> dict: + X = input_data[self.input_key] + output_col = self.stats['result_key'] + results = {} + # Saving prediction time + now = pytz.UTC.localize(datetime.utcnow()) + results['prediction_time'] = now.isoformat() + # Scaling the input data + scaler = self.stats['scaling_model'] + features = self.stats['scaled_features'] + selected_features = self.stats['selected_features'] + X_prime = pd.DataFrame(scaler.transform(X[features]), columns=features) + X_prime = X_prime[selected_features] + + # Model Prediction + pred = self.base_model.predict(X_prime) + results[self.output_key] = pd.DataFrame( + pred, columns=[output_col], index=X.index + ) + return results + + + # TRAINING MODEL + def prepare_data(self, prepared_data: dict): + result_key = self.stats['result_key'] + + # NaN/Inf should be handled in preprocessing but just in case + X_train = prepared_data['X_train'].dropna() + y_train = prepared_data['y_train'].loc[X_train.index] + if 'X_test' in prepared_data: + X_test = prepared_data['X_test'].dropna() + y_test = prepared_data['y_test'].loc[X_test.index] + else: + X_test = None + y_test = None + + if result_key is None: + result_key = y_train.columns[0] + self.stats['result_key'] = result_key + + if isinstance(y_train, pd.DataFrame) and result_key in y_train.columns: + y_train = prepared_data['y_train'][result_key] + if y_test is not None: + y_test = prepared_data['y_test'][result_key] + elif not isinstance(y_train, (pd.Series, list, np.ndarray)): + raise ValueError( + 'y_train and y_test must be a DataFrame with ' + 'relevant column specified via result_key or ' + '1-D Array-like' + ) + + fit_data = {'X_train': X_train, 'y_train': y_train} + if X_test is not None: + fit_data['X_test'] = X_test + fit_data['y_test'] = y_test + + return fit_data + + def train_model(self, input_data: dict): + """ + This function can be used to build and train XGBRegression model. + It also performs gridsearch which helps us to get optimal model + parameters based on Mean Absolute Error. + + prepared_data requires keys: X_train, y_train, X_test, y_test + """ + prepared_data = self.prepare_data(input_data) + X_train = prepared_data['X_train'] + y_train = prepared_data['y_train'] + if 'X_test' in prepared_data: + X_test = prepared_data['X_test'] + y_test = prepared_data['y_test'] + else: + X_test = None + y_test = None + + result_key = self.stats['result_key'] + max_features = self.stats['max_features'] + logger.info(f'Fitting model {self.name} for {result_key}') + + self.stats['scaled_features'] = X_train.columns + sc_scaler = StandardScaler() + X_train = pd.DataFrame( + sc_scaler.fit_transform(X_train), + columns=X_train.columns, + index=X_train.index, + ) + if X_test is not None: + X_test = pd.DataFrame( + sc_scaler.transform(X_test), + columns=X_test.columns, + index=X_test.index, + ) + + fit_data = { + 'X_train': X_train, + 'y_train': y_train, + } + + ranked_features, feature_importance = extratree_rank_features( + fit_data['X_train'], fit_data['y_train'].values + ) + + # Keep the top N features + features = ranked_features[:max_features] + + self.stats.update( + { + 'ranked_features': ranked_features, + 'feature_importance': feature_importance, + 'selected_features': features, + 'scaling_model': sc_scaler, + } + ) + + fit_data['X_train'] = fit_data['X_train'][features] + + max_depth = self.stats['max_depth'] + eta = self.stats['eta'] + n_estimators = self.stats['n_estimators'] + + # Model Initialization using the above best parameters + model = XGBRegressor( + max_depth=max_depth, + n_estimators=n_estimators, + eta=eta, + seed=42, + verbosity=0, + ) + # Model Training + model.fit(fit_data['X_train'], fit_data['y_train']) + + # Calculating Model stats + self.stats.update( + { + 'total_training_points': fit_data['X_train'].shape[0], + } + ) + self.stats['input_features'] = features + return model + + def evaluate_model(self, input_data, trained_model): + """Calculate metrics""" + fit_data = self.prepare_data(input_data) + return evaluate_regression_base_model( + fit_data, + trained_model, + features=trained_model.stats['selected_features'], + ) + + def train(self, data: Dict[str, Any] = None) -> Model: + """ + Implement logic to create the corresponding MLModel, including both training and evaluation. + """ + + ml_model = self.train_model(data) + # Pass stats to the model + if self.stats is not None: + ml_model.stats = self.stats.copy() + # Compute metrics and pass to the model + ml_model.metrics = self.evaluate_model(data, ml_model) + return ml_model + + +class XGBRegressionModeler: + model_class = XGBRegressionModel + + def __init__( + self, + result_key: str = 'result', + max_features: int = 50, + eta: float = 0.001, + n_estimators: int = 5, + max_depth: int = 3, + debug: bool = False, + ) -> None: + super().__init__() + self.stats = { + 'result_key': result_key, + 'max_features': int(max_features), + 'eta': eta, + 'n_estimators': int(n_estimators), + 'max_depth': int(max_depth), + 'debug': debug, + } + + def train_base_model(self, input_data: dict) -> XGBRegressor: + """ + This function can be used to build and train XGBRegression model. + It also performs gridsearch which helps us to get optimal model + parameters based on Mean Absolute Error. + + prepared_data requires keys: X_train, y_train, X_test, y_test + """ + prepared_data = self.prepare_data(input_data) + X_train = prepared_data['X_train'] + y_train = prepared_data['y_train'] + if 'X_test' in prepared_data: + X_test = prepared_data['X_test'] + y_test = prepared_data['y_test'] + else: + X_test = None + y_test = None + + result_key = self.stats['result_key'] + max_features = self.stats['max_features'] + logger.info(f'Fitting model {self.model_class.name} for {result_key}') + + self.stats['scaled_features'] = X_train.columns + sc_scaler = StandardScaler() + X_train = pd.DataFrame( + sc_scaler.fit_transform(X_train), + columns=X_train.columns, + index=X_train.index, + ) + if X_test is not None: + X_test = pd.DataFrame( + sc_scaler.transform(X_test), + columns=X_test.columns, + index=X_test.index, + ) + + fit_data = { + 'X_train': X_train, + 'y_train': y_train, + } + + ranked_features, feature_importance = extratree_rank_features( + fit_data['X_train'], fit_data['y_train'].values + ) + + # Keep the top N features + features = ranked_features[:max_features] + + self.stats.update( + { + 'ranked_features': ranked_features, + 'feature_importance': feature_importance, + 'selected_features': features, + 'scaling_model': sc_scaler, + } + ) + + fit_data['X_train'] = fit_data['X_train'][features] + + max_depth = self.stats['max_depth'] + eta = self.stats['eta'] + n_estimators = self.stats['n_estimators'] + # if X_test is not None: + # fit_data['X_test'] = X_test[features] + # fit_data['y_test'] = y_test + # logger.info( + # 'Found test data, grid searching to ' 'optimize hyperparameters.' + # ) + # hyperparams = xgb_grid_search( + # fit_data, + # debug=self.stats['debug'], + # max_depth=max_depth, + # n_estimators=n_estimators, + # eta=eta, + # ) + # max_depth, n_estimators, eta = hyperparams + # logger.info( + # f'Best hyperparmeters found:\n' + # f'n_estimators: {n_estimators}\n' + # f'max_depth: {max_depth}\n' + # f'eta: {eta}\n' + # f'Replacing passed hyperparameters.' + # ) + # self.stats.update( + # {'max_depth': max_depth, 'n_estimators': n_estimators, 'eta': eta} + # ) + + # Model Initialization using the above best parameters + model = XGBRegressor( + max_depth=max_depth, + n_estimators=n_estimators, + eta=eta, + seed=42, + verbosity=0, + ) + # Model Training + model.fit(fit_data['X_train'], fit_data['y_train']) + + # Calculating Model stats + self.stats.update( + { + 'total_training_points': fit_data['X_train'].shape[0], + } + ) + self.stats['input_features'] = features + return model + + def prepare_data(self, prepared_data: dict): + result_key = self.stats['result_key'] + + # NaN/Inf should be handled in preprocessing but just in case + X_train = prepared_data['X_train'].dropna() + y_train = prepared_data['y_train'].loc[X_train.index] + if 'X_test' in prepared_data: + X_test = prepared_data['X_test'].dropna() + y_test = prepared_data['y_test'].loc[X_test.index] + else: + X_test = None + y_test = None + + if result_key is None: + result_key = y_train.columns[0] + self.stats['result_key'] = result_key + + if isinstance(y_train, pd.DataFrame) and result_key in y_train.columns: + y_train = prepared_data['y_train'][result_key] + if y_test is not None: + y_test = prepared_data['y_test'][result_key] + elif not isinstance(y_train, (pd.Series, list, np.ndarray)): + raise ValueError( + 'y_train and y_test must be a DataFrame with ' + 'relevant column specified via result_key or ' + '1-D Array-like' + ) + + fit_data = {'X_train': X_train, 'y_train': y_train} + if X_test is not None: + fit_data['X_test'] = X_test + fit_data['y_test'] = y_test + + return fit_data + + def evaluate_model(self, input_data, trained_model): + """Calculate metrics""" + fit_data = self.prepare_data(input_data) + return evaluate_regression_base_model( + fit_data, + trained_model.base_model, + features=trained_model.stats['selected_features'], + ) + + def train_model(self, data: Dict[str, Any] = None): + """ + Implement logic to create the corresponding MLModel, including both training and evaluation. + """ + if self.model_class is None: + raise ValueError('Model class not provided') + + if not data: + data = self.load_data() + + base_model = self.train_base_model(data) + + ml_model = self.model_class() + ml_model.base_model = base_model + + # Pass stats to the model + if self.stats is not None: + ml_model.stats = self.stats.copy() + # Compute metrics and pass to the model + ml_model.metrics = self.evaluate_model(data, ml_model) + return ml_model + + def build_model(self, data: Dict[str, Any] = None): + return self.train_model(data) diff --git a/h1st/model/xgboost/utils.py b/h1st/model/ml/xgboost/utils.py similarity index 100% rename from h1st/model/xgboost/utils.py rename to h1st/model/ml/xgboost/utils.py diff --git a/h1st/model/model.py b/h1st/model/model.py index 8fe2d587..6ca5808b 100644 --- a/h1st/model/model.py +++ b/h1st/model/model.py @@ -42,12 +42,18 @@ class MyModel(h1st.model.Model): my_model_2 = MyModel() my_model_2.load('1st_version') """ + def __init__(self): + super().__init__() + self.stats = {} + self.metrics = {} + self.base_model = None + def persist(self, version=None) -> str: """ Persist this model's properties to the ModelRepository. Currently, only `stats`, `metrics`, `model` properties are supported. `model` property could be single model, list or dict of models - Currently, only sklearn are supported, but you can extend this method to support any framework. + Currently, only sklearn and tensorflow-keras are supported. :param version: model version, leave blank for autogeneration :returns: model version @@ -76,6 +82,7 @@ def train(self, data: Dict[str, Any] = None) -> None: data = self.load_data() base_model = self.train_base_model(data) + self.base_model = base_model ml_model = self.model_class() ml_model.base_model = base_model diff --git a/h1st/model/oracle/ensembler_modelers.py b/h1st/model/oracle/ensembler_modelers.py deleted file mode 100644 index 78a08b03..00000000 --- a/h1st/model/oracle/ensembler_modelers.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import Any, Dict -from pandas import Series -from sklearn import metrics -from sklearn.neural_network import MLPClassifier -from sklearn.preprocessing import StandardScaler - -from h1st.model.ml_model import MLModel -from h1st.model.ml_modeler import MLModeler -from h1st.model.oracle.ensembler_models import MLPEnsembleModel - - -class MLPEnsembleModeler(MLModeler): - def __init__(self, model_class=None): - self.stats = {} - self.model_class = model_class if model_class is not None else MLPEnsembleModel - - def _preprocess(self, data): - self.stats['scaler'] = StandardScaler() - return self.stats['scaler'].fit_transform(data) - - def train_base_model(self, prepared_data: Dict[str, Any]) -> Any: - x = self._preprocess(prepared_data['X_train']) - y = prepared_data['y_train'] - model = MLPClassifier( - hidden_layer_sizes=(100, 100), random_state=1, max_iter=2000 - ) - model.fit(x, y) - return model - - def evaluate_model(self, prepared_data: dict, model: MLModel) -> dict: - super().evaluate_model(prepared_data, model) - x, y_true = prepared_data['X_test'], prepared_data['y_test'] - y_pred = Series(model.predict({'X': x, 'y': y_true})['predictions']) - return {'r2_score': metrics.r2_score(y_true, y_pred)} diff --git a/h1st/model/oracle/ensembler_models.py b/h1st/model/oracle/ensembler_models.py deleted file mode 100644 index 9e007593..00000000 --- a/h1st/model/oracle/ensembler_models.py +++ /dev/null @@ -1,33 +0,0 @@ -from pandas import DataFrame - -from h1st.model.ml_model import MLModel -from h1st.model.predictive_model import PredictiveModel - - -class MajorityVotingEnsembleModel(PredictiveModel): - ''' - Ensemble Model in Oracle framework - ''' - - def predict(self, input_data: dict) -> dict: - ''' - Combine output of teacher and students using majority voting by default. In case - when majority vote cannot be applied, use teacher's output as the final output. - Inherit and override this method to use your custom combining approach. - :param input_data: dictionary with `X` key and input data - :returns: a dictionary with key `predictions` containing the predictions - ''' - predictions = input_data['X'].mode(axis='columns', numeric_only=True)[0] - return {'predictions': predictions} - - -class MLPEnsembleModel(MLModel): - def predict(self, input_data: dict) -> dict: - if isinstance(input_data['X'], DataFrame): - x = input_data['X'].values - else: - x = input_data['X'] - - x = self.stats['scaler'].transform(input_data['X']) - y = self.base_model.predict(x) - return {'predictions': y} diff --git a/h1st/model/oracle/student_modelers.py b/h1st/model/oracle/student_modelers.py deleted file mode 100644 index 03cee7e5..00000000 --- a/h1st/model/oracle/student_modelers.py +++ /dev/null @@ -1,54 +0,0 @@ -from typing import Any, Dict -from sklearn.ensemble import RandomForestClassifier -from sklearn.linear_model import LogisticRegression -from sklearn.preprocessing import StandardScaler - -from h1st.model.oracle.student_models import RandomForestModel, LogisticRegressionModel - - -class RandomForestModeler: - ''' - Knowledge Generalization Modeler backed by a RandomForest algorithm. - ''' - - def __init__(self, model_class=None, result_key=None): - self.stats = {} - self.model_class = model_class if model_class is not None else RandomForestModel - - def _preprocess(self, data): - self.stats["scaler"] = StandardScaler() - return self.stats["scaler"].fit_transform(data) - - def train_base_model(self, prepared_data: Dict[str, Any]) -> Any: - X = self._preprocess(prepared_data['X_train']) - y = prepared_data['y_train'] - model = RandomForestClassifier(max_depth=20, random_state=1) - model.fit(X, y) - self.stats['input_features'] = list(prepared_data['X_train'].columns) - self.stats['output_labels'] = list(prepared_data['y_train'].columns) - return model - - -class LogisticRegressionModeler: - ''' - Knowledge Generalization Modeler backed by a Logistic Regression algorithm - ''' - - def __init__(self, model_class=None, result_key=None): - self.stats = {} - self.model_class = ( - model_class if model_class is not None else LogisticRegressionModel - ) - - def _preprocess(self, data): - self.stats["scaler"] = StandardScaler() - return self.stats["scaler"].fit_transform(data) - - def train_base_model(self, prepared_data: Dict[str, Any]) -> Any: - X = self._preprocess(prepared_data['X_train']) - y = prepared_data['y_train'] - model = LogisticRegression() - model.fit(X, y) - self.stats['input_features'] = list(prepared_data['X_train'].columns) - self.stats['output_labels'] = list(prepared_data['y_train'].columns) - return model diff --git a/h1st/model/oracle/student_models.py b/h1st/model/oracle/student_models.py deleted file mode 100644 index a9a266fb..00000000 --- a/h1st/model/oracle/student_models.py +++ /dev/null @@ -1,111 +0,0 @@ -from typing import Any, Dict -from sklearn.ensemble import RandomForestClassifier -from sklearn.linear_model import LogisticRegression -from sklearn.preprocessing import StandardScaler -from pandas import DataFrame -from h1st.model.model import Model - - -class RandomForestModel(Model): - name = 'RandomForestModel' - ''' - Knowledge Generalization Model backed by a RandomForest algorithm - ''' - - def __init__(self, result_key=None): - self.stats = {} - - def predict(self, input_data: dict) -> dict: - ''' - Implement logic to generate prediction from data - :params input_data: an dictionary with key `X` containing the data to get predictions. - :returns: a dictionary with key `predictions` containing the predictions - ''' - if self.stats['scaler'] is not None: - x = self.stats['scaler'].transform(input_data['X']) - else: - x = input_data['X'] - - predict_df = DataFrame( - self.base_model.predict(x), columns=self.stats['output_labels'] - ) - return {'predictions': predict_df} - - def predict_proba(self, input_data: dict) -> dict: - if self.stats['scaler'] is not None: - x = self.stats['scaler'].transform(input_data['X']) - else: - x = input_data['X'] - return {'predictions': self.base_model.predict_proba(x)} - - def _preprocess(self, data): - self.stats["scaler"] = StandardScaler() - return self.stats["scaler"].fit_transform(data) - - def train(self, prepared_data: Dict[str, Any]) -> Any: - X = self._preprocess(prepared_data['X_train']) - y = prepared_data['y_train'] - model = RandomForestClassifier(max_depth=20, random_state=1) - model.fit(X, y) - self.stats['input_features'] = list(prepared_data['X_train'].columns) - self.stats['output_labels'] = list(prepared_data['y_train'].columns) - self.base_model = model - - - if self.stats is not None: - self.base_model.stats = self.stats.copy() - # Compute metrics and pass to the model - # model.metrics = self.evaluate_model(data, model) - return model - - - -class LogisticRegressionModel(Model): - name = 'LogisticRegressionModel' - ''' - Knowledge Generalization Model backed by a Logistic Regression algorithm - ''' - - def __init__(self, model_class=None, result_key=None): - self.stats = {} - - def predict(self, input_data: dict) -> dict: - ''' - Implement logic to generate prediction from data - :params input_data: an dictionary with key `X` containing the data to get predictions. - :returns: a dictionary with key `predictions` containing the predictions - ''' - if self.stats['scaler'] is not None: - x = self.stats['scaler'].transform(input_data['X']) - else: - x = input_data['X'] - - predict_df = DataFrame( - self.base_model.predict(x), columns=self.stats['output_labels'] - ) - return {'predictions': predict_df} - - def predict_proba(self, input_data: dict) -> dict: - if self.stats['scaler'] is not None: - x = self.stats['scaler'].transform(input_data['X']) - else: - x = input_data['X'] - return {'predictions': self.base_model.predict_proba(x)} - - def _preprocess(self, data): - self.stats["scaler"] = StandardScaler() - return self.stats["scaler"].fit_transform(data) - - def train(self, prepared_data: Dict[str, Any]) -> Any: - X = self._preprocess(prepared_data['X_train']) - y = prepared_data['y_train'] - model = LogisticRegression() - model.fit(X, y) - self.stats['input_features'] = list(prepared_data['X_train'].columns) - self.stats['output_labels'] = list(prepared_data['y_train'].columns) - - self.base_model = model - - if self.stats is not None: - self.base_model.stats = self.stats.copy() - return model \ No newline at end of file diff --git a/h1st/model/repository/model_repository.py b/h1st/model/repository/model_repository.py index b582e65a..56c79db5 100644 --- a/h1st/model/repository/model_repository.py +++ b/h1st/model/repository/model_repository.py @@ -109,8 +109,7 @@ def serialize(self, model, path): :param model: H1ST Model :param path: path to save models to """ - from h1st.model.ml_model import MLModel - from h1st.model.knowledge_model import RuleBasedModel + from h1st.model.model import Model meta_info = {} @@ -124,7 +123,7 @@ def serialize(self, model, path): meta_info["stats"] = self.STATS_PATH self._serialize_dict(model.stats, path, self.STATS_PATH) - if isinstance(model, MLModel): + if isinstance(model, Model): if model.base_model: logger.info("Saving model property...") if type(model.base_model) == list: @@ -157,73 +156,73 @@ def serialize(self, model, path): else: logger.error(".base_model was not assigned.") - elif isinstance(model, RuleBasedModel): - - if model.rule_details is not None: - logger.info("Saving rule_details property...") - meta_info["rule_details"] = self.RULE_DETAILS_PATH - self._serialize_dict(model.rule_details, path, self.RULE_DETAILS_PATH) - - if model.rule_engine is not None: - logger.info("Saving rule_engine property...") - if type(model.rule_engine) in self._get_supported_rule_engines(): - rule_engine_type, rules_path = self._serialize_rule_engine( - model.rule_engine, path - ) - meta_info["rule_engine"] = [ - {"rules_type": rule_engine_type, "rules_path": rules_path} - ] - elif ( - type(model.rule_engine) == list - and type(model.rule_engine[0]) in self._get_supported_rule_engines() - ): - meta_info["rule_engine"] = [] - for i, rules in enumerate(model.rule_engine): - rule_engine_type, rules_path = self._serialize_rule_engine( - rules, path, f"rules_{i}" - ) - meta_info["rule_engine"].append( - {"rules_type": rule_engine_type, "rules_path": rules_path} - ) - elif ( - type(model.rule_engine) == dict - and type(list(model.rule_engine.values())[0]) - in self._get_supported_rule_engines() - ): - meta_info["rule_engine"] = {} - for key, rules in model.rule_engine.items(): - rule_engine_type, rules_path = self._serialize_rule_engine( - rules, path, f"rules_{i}" - ) - meta_info["rule_engine"][key] = { - "rules_type": rule_engine_type, - "rules_path": rules_path, - } - elif self._is_builtin_class_instance(model.rule_engine): - self._serialize_basic_obj( - model.rule_engine, path, self.RULE_ENGINE_PATH - ) - meta_info["rule_engine"] = { - "rules_type": type(model.rule_engine), - "rule_path": self.RULE_ENGINE_PATH, - } - else: - logger.warning( - ( - "This rule engine is custom, so may not work well with " - "joblib which is the python package that we use to persist rules." - ) - ) - self._serialize_basic_obj( - model.rule_engine, path, self.RULE_ENGINE_PATH - ) - meta_info["rule_engine"] = { - "rules_type": type(model.rule_engine), - "rule_path": self.RULE_ENGINE_PATH, - } - - else: - logger.warning(".rule_engine was not assigned.") + # elif isinstance(model, RuleBasedModel): + + # if model.rule_details is not None: + # logger.info("Saving rule_details property...") + # meta_info["rule_details"] = self.RULE_DETAILS_PATH + # self._serialize_dict(model.rule_details, path, self.RULE_DETAILS_PATH) + + # if model.rule_engine is not None: + # logger.info("Saving rule_engine property...") + # if type(model.rule_engine) in self._get_supported_rule_engines(): + # rule_engine_type, rules_path = self._serialize_rule_engine( + # model.rule_engine, path + # ) + # meta_info["rule_engine"] = [ + # {"rules_type": rule_engine_type, "rules_path": rules_path} + # ] + # elif ( + # type(model.rule_engine) == list + # and type(model.rule_engine[0]) in self._get_supported_rule_engines() + # ): + # meta_info["rule_engine"] = [] + # for i, rules in enumerate(model.rule_engine): + # rule_engine_type, rules_path = self._serialize_rule_engine( + # rules, path, f"rules_{i}" + # ) + # meta_info["rule_engine"].append( + # {"rules_type": rule_engine_type, "rules_path": rules_path} + # ) + # elif ( + # type(model.rule_engine) == dict + # and type(list(model.rule_engine.values())[0]) + # in self._get_supported_rule_engines() + # ): + # meta_info["rule_engine"] = {} + # for key, rules in model.rule_engine.items(): + # rule_engine_type, rules_path = self._serialize_rule_engine( + # rules, path, f"rules_{i}" + # ) + # meta_info["rule_engine"][key] = { + # "rules_type": rule_engine_type, + # "rules_path": rules_path, + # } + # elif self._is_builtin_class_instance(model.rule_engine): + # self._serialize_basic_obj( + # model.rule_engine, path, self.RULE_ENGINE_PATH + # ) + # meta_info["rule_engine"] = { + # "rules_type": type(model.rule_engine), + # "rule_path": self.RULE_ENGINE_PATH, + # } + # else: + # logger.warning( + # ( + # "This rule engine is custom, so may not work well with " + # "joblib which is the python package that we use to persist rules." + # ) + # ) + # self._serialize_basic_obj( + # model.rule_engine, path, self.RULE_ENGINE_PATH + # ) + # meta_info["rule_engine"] = { + # "rules_type": type(model.rule_engine), + # "rule_path": self.RULE_ENGINE_PATH, + # } + + # else: + # logger.warning(".rule_engine was not assigned.") elif hasattr(model, "base_model"): logger.warning( diff --git a/h1st/model/xgboost/model.py b/h1st/model/xgboost/model.py deleted file mode 100644 index 2dbb31ce..00000000 --- a/h1st/model/xgboost/model.py +++ /dev/null @@ -1,202 +0,0 @@ -import pandas as pd -import numpy as np -import pytz -from loguru import logger -from typing import Any, Dict - -from datetime import datetime -from sklearn.preprocessing import StandardScaler -from xgboost import XGBRegressor - -from h1st.model.model import Model -from h1st.model.xgboost.utils import extratree_rank_features, evaluate_regression_base_model - - -class XGBRegressionModel(Model): - - data_key = 'X' - output_key = 'predictions' - name = 'XGBRegressionModel' - - def __init__( - self, - result_key: str = 'result', - max_features: int = 50, - eta: float = 0.001, - n_estimators: int = 5, - max_depth: int = 3, - debug: bool = False, - ) -> None: - - super().__init__() - self.stats = { - 'result_key': result_key, - 'max_features': int(max_features), - 'eta': eta, - 'n_estimators': int(n_estimators), - 'max_depth': int(max_depth), - 'debug': debug, - } - - def predict(self, input_data: dict) -> dict: - X = input_data[self.data_key] - output_col = self.stats['result_key'] - results = {} - # Saving prediction time - now = pytz.UTC.localize(datetime.utcnow()) - results['prediction_time'] = now.isoformat() - # Scaling the input data - scaler = self.stats['scaling_model'] - features = self.stats['scaled_features'] - selected_features = self.stats['selected_features'] - X_prime = pd.DataFrame(scaler.transform(X[features]), columns=features) - X_prime = X_prime[selected_features] - - # Model Prediction - pred = self.base_model.predict(X_prime) - results[self.output_key] = pd.DataFrame( - pred, columns=[output_col], index=X.index - ) - return results - - - # TRAINING MODEL - def prepare_data(self, prepared_data: dict): - result_key = self.stats['result_key'] - - # NaN/Inf should be handled in preprocessing but just in case - X_train = prepared_data['X_train'].dropna() - y_train = prepared_data['y_train'].loc[X_train.index] - if 'X_test' in prepared_data: - X_test = prepared_data['X_test'].dropna() - y_test = prepared_data['y_test'].loc[X_test.index] - else: - X_test = None - y_test = None - - if result_key is None: - result_key = y_train.columns[0] - self.stats['result_key'] = result_key - - if isinstance(y_train, pd.DataFrame) and result_key in y_train.columns: - y_train = prepared_data['y_train'][result_key] - if y_test is not None: - y_test = prepared_data['y_test'][result_key] - elif not isinstance(y_train, (pd.Series, list, np.ndarray)): - raise ValueError( - 'y_train and y_test must be a DataFrame with ' - 'relevant column specified via result_key or ' - '1-D Array-like' - ) - - fit_data = {'X_train': X_train, 'y_train': y_train} - if X_test is not None: - fit_data['X_test'] = X_test - fit_data['y_test'] = y_test - - return fit_data - - def train_model(self, input_data: dict): - """ - This function can be used to build and train XGBRegression model. - It also performs gridsearch which helps us to get optimal model - parameters based on Mean Absolute Error. - - prepared_data requires keys: X_train, y_train, X_test, y_test - """ - prepared_data = self.prepare_data(input_data) - X_train = prepared_data['X_train'] - y_train = prepared_data['y_train'] - if 'X_test' in prepared_data: - X_test = prepared_data['X_test'] - y_test = prepared_data['y_test'] - else: - X_test = None - y_test = None - - result_key = self.stats['result_key'] - max_features = self.stats['max_features'] - logger.info(f'Fitting model {self.name} for {result_key}') - - self.stats['scaled_features'] = X_train.columns - sc_scaler = StandardScaler() - X_train = pd.DataFrame( - sc_scaler.fit_transform(X_train), - columns=X_train.columns, - index=X_train.index, - ) - if X_test is not None: - X_test = pd.DataFrame( - sc_scaler.transform(X_test), - columns=X_test.columns, - index=X_test.index, - ) - - fit_data = { - 'X_train': X_train, - 'y_train': y_train, - } - - ranked_features, feature_importance = extratree_rank_features( - fit_data['X_train'], fit_data['y_train'].values - ) - - # Keep the top N features - features = ranked_features[:max_features] - - self.stats.update( - { - 'ranked_features': ranked_features, - 'feature_importance': feature_importance, - 'selected_features': features, - 'scaling_model': sc_scaler, - } - ) - - fit_data['X_train'] = fit_data['X_train'][features] - - max_depth = self.stats['max_depth'] - eta = self.stats['eta'] - n_estimators = self.stats['n_estimators'] - - # Model Initialization using the above best parameters - model = XGBRegressor( - max_depth=max_depth, - n_estimators=n_estimators, - eta=eta, - seed=42, - verbosity=0, - ) - # Model Training - model.fit(fit_data['X_train'], fit_data['y_train']) - - # Calculating Model stats - self.stats.update( - { - 'total_training_points': fit_data['X_train'].shape[0], - } - ) - self.stats['input_features'] = features - return model - - def evaluate_model(self, input_data, trained_model): - """Calculate metrics""" - fit_data = self.prepare_data(input_data) - return evaluate_regression_base_model( - fit_data, - trained_model, - features=trained_model.stats['selected_features'], - ) - - def train(self, data: Dict[str, Any] = None) -> Model: - """ - Implement logic to create the corresponding MLModel, including both training and evaluation. - """ - - ml_model = self.train_model(data) - # Pass stats to the model - if self.stats is not None: - ml_model.stats = self.stats.copy() - # Compute metrics and pass to the model - ml_model.metrics = self.evaluate_model(data, ml_model) - return ml_model \ No newline at end of file diff --git a/h1st/model/xgboost/modeler.py b/h1st/model/xgboost/modeler.py deleted file mode 100644 index 0e4706ea..00000000 --- a/h1st/model/xgboost/modeler.py +++ /dev/null @@ -1,187 +0,0 @@ -import pandas as pd -import numpy as np - -from loguru import logger -from xgboost import XGBRegressor -from sklearn.preprocessing import StandardScaler - -from h1st.model.xgboost.model import XGBRegressionModel -from h1st.model.xgboost.utils import ( - xgb_grid_search, - extratree_rank_features, - evaluate_regression_base_model, -) - - -class XGBRegressionModeler: - model_class = XGBRegressionModel - - def __init__( - self, - result_key: str = 'result', - max_features: int = 50, - eta: float = 0.001, - n_estimators: int = 5, - max_depth: int = 3, - debug: bool = False, - ) -> None: - super().__init__() - self.stats = { - 'result_key': result_key, - 'max_features': int(max_features), - 'eta': eta, - 'n_estimators': int(n_estimators), - 'max_depth': int(max_depth), - 'debug': debug, - } - - def train_base_model(self, input_data: dict) -> XGBRegressor: - """ - This function can be used to build and train XGBRegression model. - It also performs gridsearch which helps us to get optimal model - parameters based on Mean Absolute Error. - - prepared_data requires keys: X_train, y_train, X_test, y_test - """ - prepared_data = self.prepare_data(input_data) - X_train = prepared_data['X_train'] - y_train = prepared_data['y_train'] - if 'X_test' in prepared_data: - X_test = prepared_data['X_test'] - y_test = prepared_data['y_test'] - else: - X_test = None - y_test = None - - result_key = self.stats['result_key'] - max_features = self.stats['max_features'] - logger.info(f'Fitting model {self.model_class.name} for {result_key}') - - self.stats['scaled_features'] = X_train.columns - sc_scaler = StandardScaler() - X_train = pd.DataFrame( - sc_scaler.fit_transform(X_train), - columns=X_train.columns, - index=X_train.index, - ) - if X_test is not None: - X_test = pd.DataFrame( - sc_scaler.transform(X_test), - columns=X_test.columns, - index=X_test.index, - ) - - fit_data = { - 'X_train': X_train, - 'y_train': y_train, - } - - ranked_features, feature_importance = extratree_rank_features( - fit_data['X_train'], fit_data['y_train'].values - ) - - # Keep the top N features - features = ranked_features[:max_features] - - self.stats.update( - { - 'ranked_features': ranked_features, - 'feature_importance': feature_importance, - 'selected_features': features, - 'scaling_model': sc_scaler, - } - ) - - fit_data['X_train'] = fit_data['X_train'][features] - - max_depth = self.stats['max_depth'] - eta = self.stats['eta'] - n_estimators = self.stats['n_estimators'] - # if X_test is not None: - # fit_data['X_test'] = X_test[features] - # fit_data['y_test'] = y_test - # logger.info( - # 'Found test data, grid searching to ' 'optimize hyperparameters.' - # ) - # hyperparams = xgb_grid_search( - # fit_data, - # debug=self.stats['debug'], - # max_depth=max_depth, - # n_estimators=n_estimators, - # eta=eta, - # ) - # max_depth, n_estimators, eta = hyperparams - # logger.info( - # f'Best hyperparmeters found:\n' - # f'n_estimators: {n_estimators}\n' - # f'max_depth: {max_depth}\n' - # f'eta: {eta}\n' - # f'Replacing passed hyperparameters.' - # ) - # self.stats.update( - # {'max_depth': max_depth, 'n_estimators': n_estimators, 'eta': eta} - # ) - - # Model Initialization using the above best parameters - model = XGBRegressor( - max_depth=max_depth, - n_estimators=n_estimators, - eta=eta, - seed=42, - verbosity=0, - ) - # Model Training - model.fit(fit_data['X_train'], fit_data['y_train']) - - # Calculating Model stats - self.stats.update( - { - 'total_training_points': fit_data['X_train'].shape[0], - } - ) - self.stats['input_features'] = features - return model - - def prepare_data(self, prepared_data: dict): - result_key = self.stats['result_key'] - - # NaN/Inf should be handled in preprocessing but just in case - X_train = prepared_data['X_train'].dropna() - y_train = prepared_data['y_train'].loc[X_train.index] - if 'X_test' in prepared_data: - X_test = prepared_data['X_test'].dropna() - y_test = prepared_data['y_test'].loc[X_test.index] - else: - X_test = None - y_test = None - - if result_key is None: - result_key = y_train.columns[0] - self.stats['result_key'] = result_key - - if isinstance(y_train, pd.DataFrame) and result_key in y_train.columns: - y_train = prepared_data['y_train'][result_key] - if y_test is not None: - y_test = prepared_data['y_test'][result_key] - elif not isinstance(y_train, (pd.Series, list, np.ndarray)): - raise ValueError( - 'y_train and y_test must be a DataFrame with ' - 'relevant column specified via result_key or ' - '1-D Array-like' - ) - - fit_data = {'X_train': X_train, 'y_train': y_train} - if X_test is not None: - fit_data['X_test'] = X_test - fit_data['y_test'] = y_test - - return fit_data - - def evaluate_model(self, input_data, trained_model): - """Calculate metrics""" - fit_data = self.prepare_data(input_data) - return evaluate_regression_base_model( - fit_data, - trained_model.base_model, - features=trained_model.stats['selected_features'], - ) diff --git a/h1st/model/xgboost/xgbclassifier_modeler.py b/h1st/model/xgboost/xgbclassifier_modeler.py deleted file mode 100644 index dd5de4b9..00000000 --- a/h1st/model/xgboost/xgbclassifier_modeler.py +++ /dev/null @@ -1,9 +0,0 @@ -from h1st.model.xgboost.xgbclassifier import XGBClassifierModel -from h1st.model.xgboost.modeler import XGBRegressionModeler - -class XGBClassifierModeler(XGBRegressionModeler): - model_class = XGBClassifierModel - - def __init__(self, threshold=0.5, **kwargs): - super().__init__(**kwargs) - self.stats['threshold'] = float(threshold) diff --git a/pyproject.toml b/pyproject.toml index bf099c2e..9fc952f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ pyarrow = ">=9.0.0" # Machine Learning / Deep Learning scikit-learn = ">=1.1.2" +xgboost = ">=1.7.2" # Trustworthy AI graphviz = ">=0.20.1"