diff --git a/ads/opctl/operator/lowcode/forecast/const.py b/ads/opctl/operator/lowcode/forecast/const.py index 4686ca86f..00b44c453 100644 --- a/ads/opctl/operator/lowcode/forecast/const.py +++ b/ads/opctl/operator/lowcode/forecast/const.py @@ -27,10 +27,12 @@ class SpeedAccuracyMode(str, metaclass=ExtendedEnumMeta): HIGH_ACCURACY = "HIGH_ACCURACY" BALANCED = "BALANCED" FAST_APPROXIMATE = "FAST_APPROXIMATE" + AUTOMLX = "AUTOMLX" ratio = {} ratio[HIGH_ACCURACY] = 1 # 100 % data used for generating explanations ratio[BALANCED] = 0.5 # 50 % data used for generating explanations ratio[FAST_APPROXIMATE] = 0 # constant + ratio[AUTOMLX] = 0 # constant class SupportedMetrics(str, metaclass=ExtendedEnumMeta): diff --git a/ads/opctl/operator/lowcode/forecast/model/automlx.py b/ads/opctl/operator/lowcode/forecast/model/automlx.py index d91a3cd83..1998b2f24 100644 --- a/ads/opctl/operator/lowcode/forecast/model/automlx.py +++ b/ads/opctl/operator/lowcode/forecast/model/automlx.py @@ -17,6 +17,7 @@ from ads.opctl.operator.lowcode.forecast.const import ( AUTOMLX_METRIC_MAP, ForecastOutputColumns, + SpeedAccuracyMode, SupportedModels, ) from ads.opctl.operator.lowcode.forecast.utils import _label_encode_dataframe @@ -241,18 +242,18 @@ def _generate_report(self): # If the key is present, call the "explain_model" method self.explain_model() - # Convert the global explanation data to a DataFrame - global_explanation_df = pd.DataFrame(self.global_explanation) + global_explanation_section = None + if self.spec.explanations_accuracy_mode != SpeedAccuracyMode.AUTOMLX: + # Convert the global explanation data to a DataFrame + global_explanation_df = pd.DataFrame(self.global_explanation) - self.formatted_global_explanation = ( - global_explanation_df / global_explanation_df.sum(axis=0) * 100 - ) - self.formatted_global_explanation = ( - self.formatted_global_explanation.rename( + self.formatted_global_explanation = ( + global_explanation_df / global_explanation_df.sum(axis=0) * 100 + ) + self.formatted_global_explanation = self.formatted_global_explanation.rename( {self.spec.datetime_column.name: ForecastOutputColumns.DATE}, axis=1, ) - ) aggregate_local_explanations = pd.DataFrame() for s_id, local_ex_df in self.local_explanation.items(): @@ -293,8 +294,11 @@ def _generate_report(self): ) # Append the global explanation text and section to the "other_sections" list + if global_explanation_section: + other_sections.append(global_explanation_section) + + # Append the local explanation text and section to the "other_sections" list other_sections = other_sections + [ - global_explanation_section, local_explanation_section, ] except Exception as e: @@ -375,3 +379,79 @@ def _custom_predict_automlx(self, data): return self.models.get(self.series_id).forecast( X=data_temp, periods=data_temp.shape[0] )[self.series_id] + + @runtime_dependency( + module="automlx", + err_msg=( + "Please run `python3 -m pip install automlx` to install the required dependencies for model explanation." + ), + ) + def explain_model(self): + """ + Generates explanations for the model using the AutoMLx library. + + Parameters + ---------- + None + + Returns + ------- + None + + Notes + ----- + This function works by generating local explanations for each series in the dataset. + It uses the ``MLExplainer`` class from the AutoMLx library to generate feature attributions + for each series. The feature attributions are then stored in the ``self.local_explanation`` dictionary. + + If the accuracy mode is set to AutoMLX, it uses the AutoMLx library to generate explanations. + Otherwise, it falls back to the default explanation generation method. + """ + import automlx + + # Loop through each series in the dataset + for s_id, data_i in self.datasets.get_data_by_series( + include_horizon=False + ).items(): + try: + if self.spec.explanations_accuracy_mode == SpeedAccuracyMode.AUTOMLX: + # Use the MLExplainer class from AutoMLx to generate explanations + explainer = automlx.MLExplainer( + self.models[s_id], + self.datasets.additional_data.get_data_for_series(series_id=s_id) + .drop(self.spec.datetime_column.name, axis=1) + .head(-self.spec.horizon) + if self.spec.additional_data + else None, + pd.DataFrame(data_i[self.spec.target_column]), + task="forecasting", + ) + + # Generate explanations for the forecast + explanations = explainer.explain_prediction( + X=self.datasets.additional_data.get_data_for_series(series_id=s_id) + .drop(self.spec.datetime_column.name, axis=1) + .tail(self.spec.horizon) + if self.spec.additional_data + else None, + forecast_timepoints=list(range(self.spec.horizon + 1)), + ) + + # Convert the explanations to a DataFrame + explanations_df = pd.concat( + [exp.to_dataframe() for exp in explanations] + ) + explanations_df["row"] = explanations_df.groupby("Feature").cumcount() + explanations_df = explanations_df.pivot( + index="row", columns="Feature", values="Attribution" + ) + explanations_df = explanations_df.reset_index(drop=True) + + # Store the explanations in the local_explanation dictionary + self.local_explanation[s_id] = explanations_df + else: + # Fall back to the default explanation generation method + super().explain_model() + except Exception as e: + logger.warning(f"Failed to generate explanations for series {s_id} with error: {e}.") + logger.debug(f"Full Traceback: {traceback.format_exc()}") diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index 0aba580b1..c178bd02b 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -48,7 +48,7 @@ SpeedAccuracyMode, SupportedMetrics, SupportedModels, - BACKTEST_REPORT_NAME + BACKTEST_REPORT_NAME, ) from ..operator_config import ForecastOperatorConfig, ForecastOperatorSpec from .forecast_datasets import ForecastDatasets @@ -266,7 +266,11 @@ def generate_report(self): output_dir = self.spec.output_directory.url file_path = f"{output_dir}/{BACKTEST_REPORT_NAME}" if self.spec.model == AUTO_SELECT: - backtest_sections.append(rc.Heading("Auto-Select Backtesting and Performance Metrics", level=2)) + backtest_sections.append( + rc.Heading( + "Auto-Select Backtesting and Performance Metrics", level=2 + ) + ) if not os.path.exists(file_path): failure_msg = rc.Text( "auto-select could not be executed. Please check the " @@ -275,15 +279,23 @@ def generate_report(self): backtest_sections.append(failure_msg) else: backtest_stats = pd.read_csv(file_path) - model_metric_map = backtest_stats.drop(columns=['metric', 'backtest']) - average_dict = {k: round(v, 4) for k, v in model_metric_map.mean().to_dict().items()} + model_metric_map = backtest_stats.drop( + columns=["metric", "backtest"] + ) + average_dict = { + k: round(v, 4) + for k, v in model_metric_map.mean().to_dict().items() + } best_model = min(average_dict, key=average_dict.get) summary_text = rc.Text( f"Overall, the average {self.spec.metric} scores for the models are {average_dict}, with" - f" {best_model} being identified as the top-performing model during backtesting.") + f" {best_model} being identified as the top-performing model during backtesting." + ) backtest_table = rc.DataTable(backtest_stats, index=True) liner_plot = get_auto_select_plot(backtest_stats) - backtest_sections.extend([backtest_table, summary_text, liner_plot]) + backtest_sections.extend( + [backtest_table, summary_text, liner_plot] + ) forecast_plots = [] if len(self.forecast_output.list_series_ids()) > 0: @@ -646,6 +658,13 @@ def _save_model(self, output_dir, storage_options): storage_options=storage_options, ) + def _validate_automlx_explanation_mode(self): + if self.spec.model != SupportedModels.AutoMLX and self.spec.explanations_accuracy_mode == SpeedAccuracyMode.AUTOMLX: + raise ValueError( + "AUTOMLX explanation accuracy mode is only supported for AutoMLX models. " + "Please select mode other than AUTOMLX from the available explanations_accuracy_mode options" + ) + @runtime_dependency( module="shap", err_msg=( @@ -674,6 +693,9 @@ def explain_model(self): ) ratio = SpeedAccuracyMode.ratio[self.spec.explanations_accuracy_mode] + # validate the automlx mode is use for automlx model + self._validate_automlx_explanation_mode() + for s_id, data_i in self.datasets.get_data_by_series( include_horizon=False ).items(): @@ -708,6 +730,14 @@ def explain_model(self): logger.warn( "No explanations generated. Ensure that additional data has been provided." ) + elif ( + self.spec.model == SupportedModels.AutoMLX + and self.spec.explanations_accuracy_mode + == SpeedAccuracyMode.AUTOMLX + ): + logger.warning( + "Global explanations not available for AutoMLX models with inherent explainability" + ) else: self.global_explanation[s_id] = dict( zip( diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index e0c722ae4..3394a6c30 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -332,6 +332,7 @@ spec: - HIGH_ACCURACY - BALANCED - FAST_APPROXIMATE + - AUTOMLX generate_report: type: boolean diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 4e4337d3d..7f535cac7 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -591,6 +591,8 @@ def test_all_series_failure(model): yaml_i["spec"]["preprocessing"] = {"enabled": True, "steps": preprocessing_steps} if yaml_i["spec"].get("additional_data") is not None and model != "autots": yaml_i["spec"]["generate_explanations"] = True + else: + yaml_i["spec"]["generate_explanations"] = False if model == "autots": yaml_i["spec"]["model_kwargs"] = {"model_list": "superfast"} if model == "automlx": @@ -672,6 +674,7 @@ def test_arima_automlx_errors(operator_setup, model): yaml_i["spec"]["model_kwargs"] = {"model_list": "superfast"} if model == "automlx": yaml_i["spec"]["model_kwargs"] = {"time_budget": 1} + yaml_i["spec"]["explanations_accuracy_mode"] = "AUTOMLX" run_yaml( tmpdirname=tmpdirname, @@ -699,21 +702,15 @@ def test_arima_automlx_errors(operator_setup, model): in error_content["13"]["error"] ), "Error message mismatch" - if model not in ["autots", "automlx"]: # , "lgbforecast" - global_fn = f"{tmpdirname}/results/global_explanation.csv" - assert os.path.exists( - global_fn - ), f"Global explanation file not found at {report_path}" + if model not in ["autots"]: # , "lgbforecast" + if yaml_i["spec"].get("explanations_accuracy_mode") != "AUTOMLX": + global_fn = f"{tmpdirname}/results/global_explanation.csv" + assert os.path.exists(global_fn), f"Global explanation file not found at {report_path}" + assert not pd.read_csv(global_fn, index_col=0).empty local_fn = f"{tmpdirname}/results/local_explanation.csv" - assert os.path.exists( - local_fn - ), f"Local explanation file not found at {report_path}" - - glb_expl = pd.read_csv(global_fn, index_col=0) - loc_expl = pd.read_csv(local_fn) - assert not glb_expl.empty - assert not loc_expl.empty + assert os.path.exists(local_fn), f"Local explanation file not found at {report_path}" + assert not pd.read_csv(local_fn).empty def test_smape_error():