diff --git a/nbs/timegpt.ipynb b/nbs/timegpt.ipynb index b4411b9f..713a1c63 100644 --- a/nbs/timegpt.ipynb +++ b/nbs/timegpt.ipynb @@ -398,11 +398,19 @@ " y = Y_df.to_dict(**to_dict_args)\n", " x = X_df.to_dict(**to_dict_args) if X_df is not None else None\n", " return y, x\n", + "\n", + " @staticmethod\n", + " def _call_api(method, kwargs):\n", + " response = method(**kwargs)\n", + " if 'data' in response:\n", + " response = response['data']\n", + " return response\n", " \n", " def set_model_params(self):\n", - " model_params = self.client.timegpt_model_params(request=SingleSeriesForecast(freq=self.freq))\n", - " if 'data' in model_params:\n", - " model_params = model_params['data']\n", + " model_params = self._call_api(\n", + " self.client.timegpt_model_params,\n", + " {'request': SingleSeriesForecast(freq=self.freq)},\n", + " )\n", " model_params = model_params['detail']\n", " self.input_size, self.model_horizon = model_params['input_size'], model_params['horizon']\n", " \n", @@ -449,17 +457,18 @@ " self.validate_input_size(Y_df=Y_df)\n", " y, x = self.dataframes_to_dict(Y_df, X_df)\n", " main_logger.info('Calling Forecast Endpoint...')\n", - " response_timegpt = self.client.timegpt_multi_series(\n", - " y=y,\n", - " x=x,\n", - " fh=self.h,\n", - " freq=self.freq,\n", - " level=self.level,\n", - " finetune_steps=self.finetune_steps,\n", - " clean_ex_first=self.clean_ex_first,\n", + " response_timegpt = self._call_api(\n", + " self.client.timegpt_multi_series,\n", + " dict(\n", + " y=y,\n", + " x=x,\n", + " fh=self.h,\n", + " freq=self.freq,\n", + " level=self.level,\n", + " finetune_steps=self.finetune_steps,\n", + " clean_ex_first=self.clean_ex_first,\n", + " ),\n", " )\n", - " if 'data' in response_timegpt:\n", - " response_timegpt = response_timegpt['data']\n", " if 'weights_x' in response_timegpt:\n", " self.weights_x = pd.DataFrame({\n", " 'features': self.x_cols,\n", @@ -469,16 +478,17 @@ " if add_history:\n", " main_logger.info('Calling Historical Forecast Endpoint...')\n", " self.validate_input_size(Y_df=Y_df)\n", - " if 'data' in response_timegpt:\n", - " response_timegpt = response_timegpt['data']\n", - " response_timegpt = self.client.timegpt_multi_series_historic(\n", - " y=y,\n", - " x=x,\n", - " freq=self.freq,\n", - " level=self.level,\n", - " clean_ex_first=self.clean_ex_first,\n", + " response_timegpt = self._call_api(\n", + " self.client.timegpt_multi_series_historic,\n", + " dict(\n", + " y=y,\n", + " x=x,\n", + " freq=self.freq,\n", + " level=self.level,\n", + " clean_ex_first=self.clean_ex_first,\n", + " ),\n", " )\n", - " fitted_df = pd.DataFrame(**response_timegpt['data']['forecast'])\n", + " fitted_df = pd.DataFrame(**response_timegpt['forecast'])\n", " fitted_df = fitted_df.drop(columns='y')\n", " fcst_df = pd.concat([fitted_df, fcst_df]).sort_values(['unique_id', 'ds'])\n", " fcst_df = self.transform_outputs(fcst_df)\n", @@ -497,15 +507,16 @@ " Y_df, X_df = self.preprocess_dataframes(df=df, X_df=None)\n", " main_logger.info('Calling Anomaly Detector Endpoint...')\n", " y, x = self.dataframes_to_dict(Y_df, X_df)\n", - " response_timegpt = self.client.timegpt_multi_series_anomalies(\n", - " y=y,\n", - " x=x,\n", - " freq=self.freq,\n", - " level=[self.level] if (isinstance(self.level, int) or isinstance(self.level, float)) else [self.level[0]],\n", - " clean_ex_first=self.clean_ex_first,\n", + " response_timegpt = self._call_api(\n", + " self.client.timegpt_multi_series_anomalies,\n", + " dict(\n", + " y=y,\n", + " x=x,\n", + " freq=self.freq,\n", + " level=[self.level] if (isinstance(self.level, int) or isinstance(self.level, float)) else [self.level[0]],\n", + " clean_ex_first=self.clean_ex_first,\n", + " ),\n", " )\n", - " if 'data' in response_timegpt:\n", - " response_timegpt = response_timegpt['data']\n", " if 'weights_x' in response_timegpt:\n", " self.weights_x = pd.DataFrame({\n", " 'features': self.x_cols,\n", @@ -1214,6 +1225,48 @@ "show_doc(TimeGPT.forecast, title_level=2)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide\n", + "#test same results custom url\n", + "timegpt_custom = TimeGPT(\n", + " token=os.environ['TIMEGPT_CUSTOM_URL_TOKEN'],\n", + " environment=os.environ['TIMEGPT_CUSTOM_URL'],\n", + ")\n", + "# forecast method\n", + "fcst_kwargs = dict(\n", + " df=df, \n", + " h=12, \n", + " level=[90, 95], \n", + " add_history=True, \n", + " time_col='timestamp', \n", + " target_col='value',\n", + ")\n", + "fcst_df = timegpt.forecast(**fcst_kwargs)\n", + "fcst_df_custom = timegpt_custom.forecast(**fcst_kwargs)\n", + "pd.testing.assert_frame_equal(\n", + " fcst_df,\n", + " fcst_df_custom,\n", + ")\n", + "# anomalies method\n", + "anomalies_kwargs = dict(\n", + " df=df, \n", + " level=99,\n", + " time_col='timestamp', \n", + " target_col='value',\n", + ")\n", + "anomalies_df = timegpt.detect_anomalies(**anomalies_kwargs)\n", + "anomalies_df_custom = timegpt.detect_anomalies(**anomalies_kwargs)\n", + "pd.testing.assert_frame_equal(\n", + " anomalies_df,\n", + " anomalies_df_custom,\n", + ")" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/nixtlats/_modidx.py b/nixtlats/_modidx.py index 6c4e3477..f8f2c28c 100644 --- a/nixtlats/_modidx.py +++ b/nixtlats/_modidx.py @@ -68,6 +68,8 @@ 'nixtlats/timegpt.py'), 'nixtlats.timegpt._TimeGPTModel': ('timegpt.html#_timegptmodel', 'nixtlats/timegpt.py'), 'nixtlats.timegpt._TimeGPTModel.__init__': ('timegpt.html#_timegptmodel.__init__', 'nixtlats/timegpt.py'), + 'nixtlats.timegpt._TimeGPTModel._call_api': ( 'timegpt.html#_timegptmodel._call_api', + 'nixtlats/timegpt.py'), 'nixtlats.timegpt._TimeGPTModel.add_date_features': ( 'timegpt.html#_timegptmodel.add_date_features', 'nixtlats/timegpt.py'), 'nixtlats.timegpt._TimeGPTModel.compute_date_feature': ( 'timegpt.html#_timegptmodel.compute_date_feature', diff --git a/nixtlats/timegpt.py b/nixtlats/timegpt.py index bb009286..1ab46dd1 100644 --- a/nixtlats/timegpt.py +++ b/nixtlats/timegpt.py @@ -345,12 +345,18 @@ def dataframes_to_dict(self, Y_df: pd.DataFrame, X_df: pd.DataFrame): x = X_df.to_dict(**to_dict_args) if X_df is not None else None return y, x + @staticmethod + def _call_api(method, kwargs): + response = method(**kwargs) + if "data" in response: + response = response["data"] + return response + def set_model_params(self): - model_params = self.client.timegpt_model_params( - request=SingleSeriesForecast(freq=self.freq) + model_params = self._call_api( + self.client.timegpt_model_params, + {"request": SingleSeriesForecast(freq=self.freq)}, ) - if "data" in model_params: - model_params = model_params["data"] model_params = model_params["detail"] self.input_size, self.model_horizon = ( model_params["input_size"], @@ -406,17 +412,18 @@ def forecast( self.validate_input_size(Y_df=Y_df) y, x = self.dataframes_to_dict(Y_df, X_df) main_logger.info("Calling Forecast Endpoint...") - response_timegpt = self.client.timegpt_multi_series( - y=y, - x=x, - fh=self.h, - freq=self.freq, - level=self.level, - finetune_steps=self.finetune_steps, - clean_ex_first=self.clean_ex_first, + response_timegpt = self._call_api( + self.client.timegpt_multi_series, + dict( + y=y, + x=x, + fh=self.h, + freq=self.freq, + level=self.level, + finetune_steps=self.finetune_steps, + clean_ex_first=self.clean_ex_first, + ), ) - if "data" in response_timegpt: - response_timegpt = response_timegpt["data"] if "weights_x" in response_timegpt: self.weights_x = pd.DataFrame( { @@ -428,16 +435,17 @@ def forecast( if add_history: main_logger.info("Calling Historical Forecast Endpoint...") self.validate_input_size(Y_df=Y_df) - if "data" in response_timegpt: - response_timegpt = response_timegpt["data"] - response_timegpt = self.client.timegpt_multi_series_historic( - y=y, - x=x, - freq=self.freq, - level=self.level, - clean_ex_first=self.clean_ex_first, + response_timegpt = self._call_api( + self.client.timegpt_multi_series_historic, + dict( + y=y, + x=x, + freq=self.freq, + level=self.level, + clean_ex_first=self.clean_ex_first, + ), ) - fitted_df = pd.DataFrame(**response_timegpt["data"]["forecast"]) + fitted_df = pd.DataFrame(**response_timegpt["forecast"]) fitted_df = fitted_df.drop(columns="y") fcst_df = pd.concat([fitted_df, fcst_df]).sort_values(["unique_id", "ds"]) fcst_df = self.transform_outputs(fcst_df) @@ -456,17 +464,18 @@ def detect_anomalies(self, df: pd.DataFrame): Y_df, X_df = self.preprocess_dataframes(df=df, X_df=None) main_logger.info("Calling Anomaly Detector Endpoint...") y, x = self.dataframes_to_dict(Y_df, X_df) - response_timegpt = self.client.timegpt_multi_series_anomalies( - y=y, - x=x, - freq=self.freq, - level=[self.level] - if (isinstance(self.level, int) or isinstance(self.level, float)) - else [self.level[0]], - clean_ex_first=self.clean_ex_first, + response_timegpt = self._call_api( + self.client.timegpt_multi_series_anomalies, + dict( + y=y, + x=x, + freq=self.freq, + level=[self.level] + if (isinstance(self.level, int) or isinstance(self.level, float)) + else [self.level[0]], + clean_ex_first=self.clean_ex_first, + ), ) - if "data" in response_timegpt: - response_timegpt = response_timegpt["data"] if "weights_x" in response_timegpt: self.weights_x = pd.DataFrame( {