diff --git a/nbs/src/core/core.ipynb b/nbs/src/core/core.ipynb index 3509ef98d..087934fda 100644 --- a/nbs/src/core/core.ipynb +++ b/nbs/src/core/core.ipynb @@ -1908,8 +1908,8 @@ " )\n", " assert df is not None\n", " engine = make_execution_engine(infer_by=[df])\n", - " backend = make_backend(engine)\n", - " return backend.forecast(\n", + " self._backend = make_backend(engine)\n", + " return self._backend.forecast(\n", " models=self.models,\n", " fallback_model=self.fallback_model, \n", " freq=self.freq, \n", @@ -1923,6 +1923,13 @@ " time_col=time_col,\n", " target_col=target_col,\n", " )\n", + "\n", + " def forecast_fitted_values(self):\n", + " if hasattr(self, '_backend'):\n", + " res = self._backend.forecast_fitted_values()\n", + " else:\n", + " res = super().forecast_vitted_values()\n", + " return res\n", " \n", " def cross_validation(\n", " self,\n", diff --git a/nbs/src/core/distributed.fugue.ipynb b/nbs/src/core/distributed.fugue.ipynb index 9588cd026..9182521d0 100644 --- a/nbs/src/core/distributed.fugue.ipynb +++ b/nbs/src/core/distributed.fugue.ipynb @@ -57,8 +57,9 @@ "source": [ "#| export\n", "import inspect\n", - "from typing import Any, Dict, List, Optional, Union\n", + "from typing import Any, Dict, Iterable, List, Optional, Tuple, Union\n", "\n", + "import cloudpickle\n", "import fugue.api as fa\n", "import numpy as np\n", "import pandas as pd\n", @@ -150,6 +151,209 @@ " def __getstate__(self) -> Dict[str, Any]:\n", " return {}\n", "\n", + " def _forecast(\n", + " self,\n", + " *,\n", + " df: pd.DataFrame,\n", + " X_df: Optional[pd.DataFrame],\n", + " models,\n", + " fallback_model, \n", + " freq,\n", + " h,\n", + " level,\n", + " prediction_intervals,\n", + " id_col,\n", + " time_col,\n", + " target_col,\n", + " fitted,\n", + " ) -> Tuple[_StatsForecast, pd.DataFrame]:\n", + " model = _StatsForecast(\n", + " models=models,\n", + " freq=freq, \n", + " fallback_model=fallback_model,\n", + " n_jobs=1,\n", + " )\n", + " result = model.forecast(\n", + " df=df,\n", + " h=h,\n", + " X_df=X_df,\n", + " level=level,\n", + " fitted=fitted,\n", + " prediction_intervals=prediction_intervals,\n", + " id_col=id_col,\n", + " time_col=time_col,\n", + " target_col=target_col,\n", + " )\n", + " if _id_as_idx():\n", + " result = result.reset_index()\n", + " return model, result\n", + "\n", + " def _forecast_series(\n", + " self,\n", + " df: pd.DataFrame,\n", + " *,\n", + " models,\n", + " fallback_model, \n", + " freq,\n", + " h,\n", + " level,\n", + " prediction_intervals,\n", + " id_col,\n", + " time_col,\n", + " target_col,\n", + " ) -> pd.DataFrame:\n", + " _, result = self._forecast(\n", + " df=df,\n", + " X_df=None,\n", + " models=models,\n", + " fallback_model=fallback_model,\n", + " freq=freq,\n", + " h=h,\n", + " level=level,\n", + " fitted=False,\n", + " prediction_intervals=prediction_intervals,\n", + " id_col=id_col,\n", + " time_col=time_col,\n", + " target_col=target_col,\n", + " )\n", + " return result\n", + "\n", + " def _forecast_series_fitted(\n", + " self,\n", + " df: pd.DataFrame,\n", + " *,\n", + " models,\n", + " fallback_model, \n", + " freq,\n", + " h,\n", + " level,\n", + " prediction_intervals,\n", + " id_col,\n", + " time_col,\n", + " target_col,\n", + " ) -> List[List[Any]]:\n", + " model, result = self._forecast(\n", + " df=df,\n", + " X_df=None,\n", + " models=models,\n", + " fallback_model=fallback_model,\n", + " freq=freq,\n", + " h=h,\n", + " level=level,\n", + " fitted=True, \n", + " prediction_intervals=prediction_intervals,\n", + " id_col=id_col,\n", + " time_col=time_col,\n", + " target_col=target_col,\n", + " )\n", + " fitted_vals = model.forecast_fitted_values()\n", + " return [[cloudpickle.dumps(result), cloudpickle.dumps(fitted_vals)]]\n", + "\n", + " def _forecast_series_X(\n", + " self,\n", + " df: pd.DataFrame,\n", + " X_df: pd.DataFrame,\n", + " *,\n", + " models,\n", + " fallback_model, \n", + " freq,\n", + " h,\n", + " level,\n", + " prediction_intervals,\n", + " id_col,\n", + " time_col,\n", + " target_col,\n", + " ) -> pd.DataFrame:\n", + " _, result = self._forecast(\n", + " df=df,\n", + " X_df=X_df,\n", + " models=models,\n", + " fallback_model=fallback_model,\n", + " freq=freq,\n", + " h=h,\n", + " level=level,\n", + " fitted=False,\n", + " prediction_intervals=prediction_intervals,\n", + " id_col=id_col,\n", + " time_col=time_col,\n", + " target_col=target_col,\n", + " )\n", + " return result\n", + "\n", + " def _forecast_series_X_fitted(\n", + " self,\n", + " df: pd.DataFrame,\n", + " X_df: pd.DataFrame,\n", + " *,\n", + " models,\n", + " fallback_model, \n", + " freq,\n", + " h,\n", + " level,\n", + " prediction_intervals,\n", + " id_col,\n", + " time_col,\n", + " target_col,\n", + " ) -> List[List[Any]]:\n", + " model = _StatsForecast(\n", + " models=models,\n", + " freq=freq, \n", + " fallback_model=fallback_model,\n", + " n_jobs=1,\n", + " )\n", + " result = model.forecast(\n", + " df=df,\n", + " X_df=X_df,\n", + " h=h,\n", + " level=level,\n", + " fitted=True,\n", + " prediction_intervals=prediction_intervals,\n", + " id_col=id_col,\n", + " time_col=time_col,\n", + " target_col=target_col, \n", + " )\n", + " if _id_as_idx():\n", + " result = result.reset_index()\n", + " return result \n", + "\n", + " def _get_output_schema(\n", + " self,\n", + " *,\n", + " df,\n", + " models,\n", + " level,\n", + " mode,\n", + " id_col,\n", + " time_col,\n", + " target_col,\n", + " ) -> Schema:\n", + " keep_schema = fa.get_schema(df).extract([id_col, time_col])\n", + " cols: List[Any] = []\n", + " if level is None:\n", + " level = []\n", + " for model in models:\n", + " has_levels = (\n", + " \"level\" in inspect.signature(getattr(model, \"forecast\")).parameters\n", + " and len(level) > 0\n", + " )\n", + " cols.append((repr(model), np.float32))\n", + " if has_levels:\n", + " cols.extend([(f\"{repr(model)}-lo-{l}\", np.float32) for l in reversed(level)])\n", + " cols.extend([(f\"{repr(model)}-hi-{l}\", np.float32) for l in level])\n", + " if mode == \"cv\":\n", + " cols = [(\"cutoff\", keep_schema[time_col].type), (target_col, np.float32)] + cols\n", + " return keep_schema + Schema(cols)\n", + "\n", + " @staticmethod\n", + " def _retrieve_forecast_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]:\n", + " for serialized_fcst_df, _ in items:\n", + " yield cloudpickle.loads(serialized_fcst_df)\n", + "\n", + " @staticmethod\n", + " def _retrieve_fitted_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]:\n", + " for _, serialized_fitted_df in items:\n", + " yield cloudpickle.loads(serialized_fitted_df) \n", + "\n", " def forecast(\n", " self,\n", " *,\n", @@ -200,7 +404,7 @@ " method documentation.\n", " Or the list of available [StatsForecast's models](https://nixtla.github.io/statsforecast/src/core/models.html).\n", " \"\"\"\n", - " schema = self._get_output_schema(\n", + " self._fcst_schema = self._get_output_schema(\n", " df=df,\n", " models=models,\n", " level=level,\n", @@ -209,43 +413,104 @@ " time_col=time_col,\n", " target_col=target_col,\n", " )\n", + " tfm_schema = 'a:binary, b:binary' if fitted else self._fcst_schema\n", " params = dict(\n", " models=models,\n", " freq=freq,\n", " fallback_model=fallback_model,\n", " h=h,\n", " level=level,\n", - " fitted=fitted,\n", " prediction_intervals=prediction_intervals,\n", " id_col=id_col,\n", " time_col=time_col,\n", " target_col=target_col,\n", " )\n", - " if X_df is None:\n", - " res = transform(\n", - " df,\n", - " self._forecast_series,\n", - " params=params,\n", - " schema=schema,\n", - " partition={\"by\": id_col},\n", - " engine=self._engine,\n", - " engine_conf=self._conf,\n", - " )\n", + " tfm_kwargs = dict(\n", + " params=params,\n", + " schema=tfm_schema,\n", + " partition={\"by\": id_col},\n", + " engine=self._engine,\n", + " engine_conf=self._conf, \n", + " )\n", + " if not fitted:\n", + " if X_df is None:\n", + " res = transform(df, self._forecast_series, **tfm_kwargs)\n", + " else:\n", + " res = _cotransform(df, X_df, self._forecast_series_X, **tfm_kwargs)\n", " else:\n", - " res = _cotransform(\n", - " df,\n", - " X_df,\n", - " self._forecast_series_X,\n", - " params=params,\n", - " schema=schema,\n", - " partition={\"by\": id_col},\n", + " if X_df is None:\n", + " res_with_fitted = transform(df, _forecast_series_fitted, **tfm_kwargs)\n", + " else:\n", + " res_with_fitted = _cotransform(\n", + " df, X_df, self._forecast_series_X_fitted, **tfm_kwargs\n", + " )\n", + " self._results = res_with_fitted\n", + " res = transform(\n", + " self._results,\n", + " FugueBackend._retrieve_forecast_df,\n", + " schema=self._fcst_schema,\n", " engine=self._engine,\n", - " engine_conf=self._conf,\n", " )\n", " return res\n", "\n", " forecast.__doc__ = forecast.__doc__.format(**_param_descriptions) # type: ignore[union-attr]\n", "\n", + " def forecast_fitted_values(self):\n", + " \"\"\"Retrieve in-sample predictions\"\"\"\n", + " if not hasattr(self, '_results'):\n", + " raise ValueError('You must first call forecast with `fitted=True`.')\n", + " return transform(\n", + " self._results,\n", + " FugueBackend._retrieve_fitted_df,\n", + " schema=self._fcst_schema,\n", + " engine=self._engine,\n", + " )\n", + "\n", + " def _cv(\n", + " self,\n", + " df: pd.DataFrame,\n", + " *,\n", + " models,\n", + " freq,\n", + " fallback_model,\n", + " h,\n", + " n_windows,\n", + " step_size,\n", + " test_size,\n", + " input_size,\n", + " level,\n", + " refit,\n", + " fitted,\n", + " prediction_intervals,\n", + " id_col,\n", + " time_col,\n", + " target_col,\n", + " ) -> pd.DataFrame:\n", + " model = _StatsForecast(\n", + " models=models,\n", + " freq=freq, \n", + " fallback_model=fallback_model,\n", + " n_jobs=1,\n", + " )\n", + " result = model.cross_validation(\n", + " df=df,\n", + " h=h,\n", + " n_windows=n_windows,\n", + " step_size=step_size,\n", + " test_size=test_size,\n", + " input_size=input_size,\n", + " level=level,\n", + " fitted=fitted,\n", + " refit=refit,\n", + " prediction_intervals=prediction_intervals,\n", + " id_col=id_col,\n", + " time_col=time_col,\n", + " target_col=target_col,\n", + " )\n", + " if _id_as_idx():\n", + " result = result.reset_index()\n", + " return result \n", + "\n", " def cross_validation(\n", " self,\n", " *,\n", @@ -345,152 +610,6 @@ "\n", " cross_validation.__doc__ = cross_validation.__doc__.format(**_param_descriptions) # type: ignore[union-attr]\n", "\n", - " def _forecast_series(\n", - " self,\n", - " df: pd.DataFrame,\n", - " *,\n", - " models,\n", - " fallback_model, \n", - " freq,\n", - " h,\n", - " level,\n", - " fitted,\n", - " prediction_intervals,\n", - " id_col,\n", - " time_col,\n", - " target_col,\n", - " ) -> pd.DataFrame:\n", - " model = _StatsForecast(\n", - " models=models,\n", - " freq=freq, \n", - " fallback_model=fallback_model,\n", - " n_jobs=1,\n", - " )\n", - " result = model.forecast(\n", - " df=df,\n", - " h=h,\n", - " X_df=None,\n", - " level=level,\n", - " fitted=fitted,\n", - " prediction_intervals=prediction_intervals,\n", - " id_col=id_col,\n", - " time_col=time_col,\n", - " target_col=target_col,\n", - " )\n", - " if _id_as_idx():\n", - " result = result.reset_index()\n", - " return result\n", - "\n", - " def _forecast_series_X(\n", - " self,\n", - " df: pd.DataFrame,\n", - " X_df: pd.DataFrame,\n", - " *,\n", - " models,\n", - " fallback_model, \n", - " freq,\n", - " h,\n", - " level,\n", - " fitted,\n", - " prediction_intervals,\n", - " id_col,\n", - " time_col,\n", - " target_col,\n", - " ) -> pd.DataFrame:\n", - " model = _StatsForecast(\n", - " models=models,\n", - " freq=freq, \n", - " fallback_model=fallback_model,\n", - " n_jobs=1,\n", - " )\n", - " result = model.forecast(\n", - " df=df,\n", - " X_df=X_df,\n", - " h=h,\n", - " level=level,\n", - " fitted=fitted,\n", - " prediction_intervals=prediction_intervals,\n", - " id_col=id_col,\n", - " time_col=time_col,\n", - " target_col=target_col, \n", - " )\n", - " if _id_as_idx():\n", - " result = result.reset_index()\n", - " return result\n", - "\n", - " def _cv(\n", - " self,\n", - " df: pd.DataFrame,\n", - " *,\n", - " models,\n", - " freq,\n", - " fallback_model,\n", - " h,\n", - " n_windows,\n", - " step_size,\n", - " test_size,\n", - " input_size,\n", - " level,\n", - " refit,\n", - " fitted,\n", - " prediction_intervals,\n", - " id_col,\n", - " time_col,\n", - " target_col,\n", - " ) -> pd.DataFrame:\n", - " model = _StatsForecast(\n", - " models=models,\n", - " freq=freq, \n", - " fallback_model=fallback_model,\n", - " n_jobs=1,\n", - " )\n", - " result = model.cross_validation(\n", - " df=df,\n", - " h=h,\n", - " n_windows=n_windows,\n", - " step_size=step_size,\n", - " test_size=test_size,\n", - " input_size=input_size,\n", - " level=level,\n", - " fitted=fitted,\n", - " refit=refit,\n", - " prediction_intervals=prediction_intervals,\n", - " id_col=id_col,\n", - " time_col=time_col,\n", - " target_col=target_col,\n", - " )\n", - " if _id_as_idx():\n", - " result = result.reset_index()\n", - " return result\n", - "\n", - " def _get_output_schema(\n", - " self,\n", - " *,\n", - " df,\n", - " models,\n", - " level,\n", - " mode,\n", - " id_col,\n", - " time_col,\n", - " target_col,\n", - " ) -> Schema:\n", - " keep_schema = fa.get_schema(df).extract([id_col, time_col])\n", - " cols: List[Any] = []\n", - " if level is None:\n", - " level = []\n", - " for model in models:\n", - " has_levels = (\n", - " \"level\" in inspect.signature(getattr(model, \"forecast\")).parameters\n", - " and len(level) > 0\n", - " )\n", - " cols.append((repr(model), np.float32))\n", - " if has_levels:\n", - " cols.extend([(f\"{repr(model)}-lo-{l}\", np.float32) for l in reversed(level)])\n", - " cols.extend([(f\"{repr(model)}-hi-{l}\", np.float32) for l in level])\n", - " if mode == \"cv\":\n", - " cols = [(\"cutoff\", keep_schema[time_col].type), (target_col, np.float32)] + cols\n", - " return keep_schema + Schema(cols)\n", - "\n", "\n", "@make_backend.candidate(lambda obj, *args, **kwargs: isinstance(obj, ExecutionEngine))\n", "def _make_fugue_backend(obj:ExecutionEngine, *args, **kwargs) -> ParallelBackend:\n", @@ -504,6 +623,8 @@ "metadata": {}, "outputs": [], "source": [ + "import os\n", + "\n", "from statsforecast.core import StatsForecast\n", "from statsforecast.models import ( \n", " AutoARIMA,\n", @@ -512,6 +633,16 @@ "from statsforecast.utils import generate_series" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "4288f41b-c210-4331-89bc-4adf77e5e066", + "metadata": {}, + "outputs": [], + "source": [ + "os.environ['NIXTLA_ID_AS_COL'] = '1'" + ] + }, { "cell_type": "code", "execution_count": null, @@ -546,7 +677,7 @@ { "cell_type": "code", "execution_count": null, - "id": "85bd5dc4-54c8-4022-a85d-69583fc6bc37", + "id": "9aeb2a65-efbf-473e-b326-ad108428abc7", "metadata": {}, "outputs": [], "source": [ @@ -556,9 +687,21 @@ "series['unique_id'] = series['unique_id'].astype(str)\n", "\n", "# Convert to Spark\n", - "sdf = spark.createDataFrame(series)\n", - "\n", + "sdf = spark.createDataFrame(series)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1298a105-126d-464f-a46f-49a9ce10955e", + "metadata": {}, + "outputs": [], + "source": [ "# Returns a Spark DataFrame\n", + "sf = StatsForecast(\n", + " models=[AutoETS(season_length=7)],\n", + " freq='D',\n", + ")\n", "sf.cross_validation(df=sdf, h=horizon, step_size = 24,\n", " n_windows = 2, level=[90]).show()" ] @@ -629,7 +772,7 @@ "outputs": [], "source": [ "# Generate Synthetic Panel Data\n", - "df = generate_series(10).reset_index()\n", + "df = generate_series(10)\n", "df['unique_id'] = df['unique_id'].astype(str)\n", "df = dd.from_pandas(df, npartitions=10)\n", "\n", @@ -700,8 +843,27 @@ "sf = StatsForecast(models=[Naive()], freq='D', fallback_model=Naive())\n", "dask_fcst = sf.forecast(df=df, h=12).compute()\n", "fcst_stats = sf.forecast(df=df.compute(), h=12)\n", - "test_eq(dask_fcst.sort_values(by=['unique_id', 'ds']).reset_index(drop=True).astype({\"unique_id\": str}), \n", - " fcst_stats.reset_index().astype({\"unique_id\": str}))" + "pd.testing.assert_frame_equal(\n", + " (\n", + " dask_fcst\n", + " .sort_values(by=['unique_id', 'ds'])\n", + " .reset_index(drop=True)\n", + " .astype({'ds': 'datetime64[ns]', 'Naive': 'float32'})\n", + " ),\n", + " fcst_stats\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6ab9844a-df80-4894-a3a5-ffb8b117a358", + "metadata": {}, + "outputs": [], + "source": [ + "sf = StatsForecast(models=[Naive()], freq='D')\n", + "xx = sf.forecast(df=df, h=12, fitted=True).compute()\n", + "yy = sf.forecast_fitted_values().compute()" ] }, { @@ -763,17 +925,20 @@ "#| eval: false\n", "\n", "# Distributed exogenous regressors\n", - "fcst_x = StatsForecast(models=[ReturnX()], freq='D')\n", + "fcst_x = StatsForecast(models=[ReturnX()], freq=1)\n", "res = fcst_x.forecast(df=train_df, \n", " X_df=xreg, \n", " h=4).compute()\n", "expected_res = xreg.rename(columns={'x': 'ReturnX'}).compute()\n", - "# we expect strings for unique_id, and ds using exogenous\n", - "expected_res[['unique_id', 'ds']] = expected_res[['unique_id', 'ds']].astype(str)\n", - "pd.testing.assert_frame_equal(res.sort_values('unique_id').reset_index(drop=True), \n", - " expected_res, \n", - " check_dtype=False, \n", - " check_index_type=False)" + "pd.testing.assert_frame_equal(\n", + " (\n", + " res\n", + " .sort_values('unique_id')\n", + " .reset_index(drop=True)\n", + " .astype(expected_res.dtypes)\n", + " ),\n", + " expected_res,\n", + ")" ] }, { @@ -829,14 +994,14 @@ "# Distribute predictions.\n", "sf = StatsForecast(models=[Naive()], freq='D')\n", "fcst_fugue = sf.forecast(df=df, h=12).compute().sort_values(['unique_id', 'ds']).reset_index(drop=True)\n", - "fcst_stats = sf.forecast(df=df.compute(), h=12).reset_index().astype({\"unique_id\": str})\n", - "test_eq(fcst_fugue, fcst_stats)\n", + "fcst_stats = sf.forecast(df=df.compute(), h=12).astype({\"unique_id\": str})\n", + "test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)\n", "\n", "# Distribute cross-validation predictions.\n", "sf = StatsForecast(models=[Naive()], freq='D')\n", "fcst_fugue = sf.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)\n", - "fcst_stats = sf.cross_validation(df=df.compute(), h=12).reset_index().astype({\"unique_id\": str})\n", - "test_eq(fcst_fugue, fcst_stats)\n", + "fcst_stats = sf.cross_validation(df=df.compute(), h=12).astype({\"unique_id\": str})\n", + "test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)\n", "\n", "# fallback model\n", "class FailNaive:\n", @@ -848,8 +1013,8 @@ "#cross validation fallback model\n", "fcst = StatsForecast(models=[FailNaive()], freq='D', fallback_model=Naive())\n", "fcst_fugue = fcst.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)\n", - "fcst_stats = sf.cross_validation(df=df.compute(), h=12).reset_index().astype({\"unique_id\": str})\n", - "test_eq(fcst_fugue, fcst_stats)" + "fcst_stats = sf.cross_validation(df=df.compute(), h=12).astype({\"unique_id\": str})\n", + "test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)" ] }, { diff --git a/settings.ini b/settings.ini index 265ee2695..48e8b48ae 100644 --- a/settings.ini +++ b/settings.ini @@ -15,7 +15,7 @@ language = English custom_sidebar = True license = apache2 status = 2 -requirements = numba>=0.55.0 numpy>=1.21.6 pandas>=1.3.5 scipy>=1.7.3 statsmodels>=0.13.2 tqdm fugue>=0.8.1 utilsforecast>=0.0.20 +requirements = cloudpickle numba>=0.55.0 numpy>=1.21.6 pandas>=1.3.5 scipy>=1.7.3 statsmodels>=0.13.2 tqdm fugue>=0.8.1 utilsforecast>=0.0.20 polars_requirements = polars ray_requirements = fugue[ray]>=0.8.1 protobuf>=3.15.3,<4.0.0 ray<2.8 dask_requirements = fugue[dask]>=0.8.1 diff --git a/statsforecast/_modidx.py b/statsforecast/_modidx.py index 07482a735..0c23c341d 100644 --- a/statsforecast/_modidx.py +++ b/statsforecast/_modidx.py @@ -125,6 +125,8 @@ 'statsforecast/core.py'), 'statsforecast.core.StatsForecast.forecast': ( 'src/core/core.html#statsforecast.forecast', 'statsforecast/core.py'), + 'statsforecast.core.StatsForecast.forecast_fitted_values': ( 'src/core/core.html#statsforecast.forecast_fitted_values', + 'statsforecast/core.py'), 'statsforecast.core._StatsForecast': ('src/core/core.html#_statsforecast', 'statsforecast/core.py'), 'statsforecast.core._StatsForecast.__init__': ( 'src/core/core.html#_statsforecast.__init__', 'statsforecast/core.py'), @@ -192,16 +194,28 @@ 'statsforecast/distributed/fugue.py'), 'statsforecast.distributed.fugue.FugueBackend._cv': ( 'src/core/distributed.fugue.html#fuguebackend._cv', 'statsforecast/distributed/fugue.py'), + 'statsforecast.distributed.fugue.FugueBackend._forecast': ( 'src/core/distributed.fugue.html#fuguebackend._forecast', + 'statsforecast/distributed/fugue.py'), 'statsforecast.distributed.fugue.FugueBackend._forecast_series': ( 'src/core/distributed.fugue.html#fuguebackend._forecast_series', 'statsforecast/distributed/fugue.py'), 'statsforecast.distributed.fugue.FugueBackend._forecast_series_X': ( 'src/core/distributed.fugue.html#fuguebackend._forecast_series_x', 'statsforecast/distributed/fugue.py'), + 'statsforecast.distributed.fugue.FugueBackend._forecast_series_X_fitted': ( 'src/core/distributed.fugue.html#fuguebackend._forecast_series_x_fitted', + 'statsforecast/distributed/fugue.py'), + 'statsforecast.distributed.fugue.FugueBackend._forecast_series_fitted': ( 'src/core/distributed.fugue.html#fuguebackend._forecast_series_fitted', + 'statsforecast/distributed/fugue.py'), 'statsforecast.distributed.fugue.FugueBackend._get_output_schema': ( 'src/core/distributed.fugue.html#fuguebackend._get_output_schema', 'statsforecast/distributed/fugue.py'), + 'statsforecast.distributed.fugue.FugueBackend._retrieve_fitted_df': ( 'src/core/distributed.fugue.html#fuguebackend._retrieve_fitted_df', + 'statsforecast/distributed/fugue.py'), + 'statsforecast.distributed.fugue.FugueBackend._retrieve_forecast_df': ( 'src/core/distributed.fugue.html#fuguebackend._retrieve_forecast_df', + 'statsforecast/distributed/fugue.py'), 'statsforecast.distributed.fugue.FugueBackend.cross_validation': ( 'src/core/distributed.fugue.html#fuguebackend.cross_validation', 'statsforecast/distributed/fugue.py'), 'statsforecast.distributed.fugue.FugueBackend.forecast': ( 'src/core/distributed.fugue.html#fuguebackend.forecast', 'statsforecast/distributed/fugue.py'), + 'statsforecast.distributed.fugue.FugueBackend.forecast_fitted_values': ( 'src/core/distributed.fugue.html#fuguebackend.forecast_fitted_values', + 'statsforecast/distributed/fugue.py'), 'statsforecast.distributed.fugue._cotransform': ( 'src/core/distributed.fugue.html#_cotransform', 'statsforecast/distributed/fugue.py'), 'statsforecast.distributed.fugue._make_fugue_backend': ( 'src/core/distributed.fugue.html#_make_fugue_backend', diff --git a/statsforecast/core.py b/statsforecast/core.py index 7c1d22c57..3a442212e 100644 --- a/statsforecast/core.py +++ b/statsforecast/core.py @@ -1526,8 +1526,8 @@ def forecast( ) assert df is not None engine = make_execution_engine(infer_by=[df]) - backend = make_backend(engine) - return backend.forecast( + self._backend = make_backend(engine) + return self._backend.forecast( models=self.models, fallback_model=self.fallback_model, freq=self.freq, @@ -1542,6 +1542,13 @@ def forecast( target_col=target_col, ) + def forecast_fitted_values(self): + if hasattr(self, "_backend"): + res = self._backend.forecast_fitted_values() + else: + res = super().forecast_vitted_values() + return res + def cross_validation( self, h: int, diff --git a/statsforecast/distributed/fugue.py b/statsforecast/distributed/fugue.py index 24af76692..21c998f44 100644 --- a/statsforecast/distributed/fugue.py +++ b/statsforecast/distributed/fugue.py @@ -5,8 +5,9 @@ # %% ../../nbs/src/core/distributed.fugue.ipynb 4 import inspect -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +import cloudpickle import fugue.api as fa import numpy as np import pandas as pd @@ -84,6 +85,214 @@ def __init__(self, engine: Any = None, conf: Any = None, **transform_kwargs: Any def __getstate__(self) -> Dict[str, Any]: return {} + def _forecast( + self, + *, + df: pd.DataFrame, + X_df: Optional[pd.DataFrame], + models, + fallback_model, + freq, + h, + level, + prediction_intervals, + id_col, + time_col, + target_col, + fitted, + ) -> Tuple[_StatsForecast, pd.DataFrame]: + model = _StatsForecast( + models=models, + freq=freq, + fallback_model=fallback_model, + n_jobs=1, + ) + result = model.forecast( + df=df, + h=h, + X_df=X_df, + level=level, + fitted=fitted, + prediction_intervals=prediction_intervals, + id_col=id_col, + time_col=time_col, + target_col=target_col, + ) + if _id_as_idx(): + result = result.reset_index() + return model, result + + def _forecast_series( + self, + df: pd.DataFrame, + *, + models, + fallback_model, + freq, + h, + level, + prediction_intervals, + id_col, + time_col, + target_col, + ) -> pd.DataFrame: + _, result = self._forecast( + df=df, + X_df=None, + models=models, + fallback_model=fallback_model, + freq=freq, + h=h, + level=level, + fitted=False, + prediction_intervals=prediction_intervals, + id_col=id_col, + time_col=time_col, + target_col=target_col, + ) + return result + + def _forecast_series_fitted( + self, + df: pd.DataFrame, + *, + models, + fallback_model, + freq, + h, + level, + prediction_intervals, + id_col, + time_col, + target_col, + ) -> List[List[Any]]: + model, result = self._forecast( + df=df, + X_df=None, + models=models, + fallback_model=fallback_model, + freq=freq, + h=h, + level=level, + fitted=True, + prediction_intervals=prediction_intervals, + id_col=id_col, + time_col=time_col, + target_col=target_col, + ) + fitted_vals = model.forecast_fitted_values() + return [[cloudpickle.dumps(result), cloudpickle.dumps(fitted_vals)]] + + def _forecast_series_X( + self, + df: pd.DataFrame, + X_df: pd.DataFrame, + *, + models, + fallback_model, + freq, + h, + level, + prediction_intervals, + id_col, + time_col, + target_col, + ) -> pd.DataFrame: + _, result = self._forecast( + df=df, + X_df=X_df, + models=models, + fallback_model=fallback_model, + freq=freq, + h=h, + level=level, + fitted=False, + prediction_intervals=prediction_intervals, + id_col=id_col, + time_col=time_col, + target_col=target_col, + ) + return result + + def _forecast_series_X_fitted( + self, + df: pd.DataFrame, + X_df: pd.DataFrame, + *, + models, + fallback_model, + freq, + h, + level, + prediction_intervals, + id_col, + time_col, + target_col, + ) -> List[List[Any]]: + model = _StatsForecast( + models=models, + freq=freq, + fallback_model=fallback_model, + n_jobs=1, + ) + result = model.forecast( + df=df, + X_df=X_df, + h=h, + level=level, + fitted=True, + prediction_intervals=prediction_intervals, + id_col=id_col, + time_col=time_col, + target_col=target_col, + ) + if _id_as_idx(): + result = result.reset_index() + return result + + def _get_output_schema( + self, + *, + df, + models, + level, + mode, + id_col, + time_col, + target_col, + ) -> Schema: + keep_schema = fa.get_schema(df).extract([id_col, time_col]) + cols: List[Any] = [] + if level is None: + level = [] + for model in models: + has_levels = ( + "level" in inspect.signature(getattr(model, "forecast")).parameters + and len(level) > 0 + ) + cols.append((repr(model), np.float32)) + if has_levels: + cols.extend( + [(f"{repr(model)}-lo-{l}", np.float32) for l in reversed(level)] + ) + cols.extend([(f"{repr(model)}-hi-{l}", np.float32) for l in level]) + if mode == "cv": + cols = [ + ("cutoff", keep_schema[time_col].type), + (target_col, np.float32), + ] + cols + return keep_schema + Schema(cols) + + @staticmethod + def _retrieve_forecast_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]: + for serialized_fcst_df, _ in items: + yield cloudpickle.loads(serialized_fcst_df) + + @staticmethod + def _retrieve_fitted_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]: + for _, serialized_fitted_df in items: + yield cloudpickle.loads(serialized_fitted_df) + def forecast( self, *, @@ -134,7 +343,7 @@ def forecast( method documentation. Or the list of available [StatsForecast's models](https://nixtla.github.io/statsforecast/src/core/models.html). """ - schema = self._get_output_schema( + self._fcst_schema = self._get_output_schema( df=df, models=models, level=level, @@ -143,43 +352,104 @@ def forecast( time_col=time_col, target_col=target_col, ) + tfm_schema = "a:binary, b:binary" if fitted else self._fcst_schema params = dict( models=models, freq=freq, fallback_model=fallback_model, h=h, level=level, - fitted=fitted, prediction_intervals=prediction_intervals, id_col=id_col, time_col=time_col, target_col=target_col, ) - if X_df is None: - res = transform( - df, - self._forecast_series, - params=params, - schema=schema, - partition={"by": id_col}, - engine=self._engine, - engine_conf=self._conf, - ) + tfm_kwargs = dict( + params=params, + schema=tfm_schema, + partition={"by": id_col}, + engine=self._engine, + engine_conf=self._conf, + ) + if not fitted: + if X_df is None: + res = transform(df, self._forecast_series, **tfm_kwargs) + else: + res = _cotransform(df, X_df, self._forecast_series_X, **tfm_kwargs) else: - res = _cotransform( - df, - X_df, - self._forecast_series_X, - params=params, - schema=schema, - partition={"by": id_col}, + if X_df is None: + res_with_fitted = transform(df, _forecast_series_fitted, **tfm_kwargs) + else: + res_with_fitted = _cotransform( + df, X_df, self._forecast_series_X_fitted, **tfm_kwargs + ) + self._results = res_with_fitted + res = transform( + self._results, + FugueBackend._retrieve_forecast_df, + schema=self._fcst_schema, engine=self._engine, - engine_conf=self._conf, ) return res forecast.__doc__ = forecast.__doc__.format(**_param_descriptions) # type: ignore[union-attr] + def forecast_fitted_values(self): + """Retrieve in-sample predictions""" + if not hasattr(self, "_results"): + raise ValueError("You must first call forecast with `fitted=True`.") + return transform( + self._results, + FugueBackend._retrieve_fitted_df, + schema=self._fcst_schema, + engine=self._engine, + ) + + def _cv( + self, + df: pd.DataFrame, + *, + models, + freq, + fallback_model, + h, + n_windows, + step_size, + test_size, + input_size, + level, + refit, + fitted, + prediction_intervals, + id_col, + time_col, + target_col, + ) -> pd.DataFrame: + model = _StatsForecast( + models=models, + freq=freq, + fallback_model=fallback_model, + n_jobs=1, + ) + result = model.cross_validation( + df=df, + h=h, + n_windows=n_windows, + step_size=step_size, + test_size=test_size, + input_size=input_size, + level=level, + fitted=fitted, + refit=refit, + prediction_intervals=prediction_intervals, + id_col=id_col, + time_col=time_col, + target_col=target_col, + ) + if _id_as_idx(): + result = result.reset_index() + return result + def cross_validation( self, *, @@ -279,157 +549,6 @@ def cross_validation( cross_validation.__doc__ = cross_validation.__doc__.format(**_param_descriptions) # type: ignore[union-attr] - def _forecast_series( - self, - df: pd.DataFrame, - *, - models, - fallback_model, - freq, - h, - level, - fitted, - prediction_intervals, - id_col, - time_col, - target_col, - ) -> pd.DataFrame: - model = _StatsForecast( - models=models, - freq=freq, - fallback_model=fallback_model, - n_jobs=1, - ) - result = model.forecast( - df=df, - h=h, - X_df=None, - level=level, - fitted=fitted, - prediction_intervals=prediction_intervals, - id_col=id_col, - time_col=time_col, - target_col=target_col, - ) - if _id_as_idx(): - result = result.reset_index() - return result - - def _forecast_series_X( - self, - df: pd.DataFrame, - X_df: pd.DataFrame, - *, - models, - fallback_model, - freq, - h, - level, - fitted, - prediction_intervals, - id_col, - time_col, - target_col, - ) -> pd.DataFrame: - model = _StatsForecast( - models=models, - freq=freq, - fallback_model=fallback_model, - n_jobs=1, - ) - result = model.forecast( - df=df, - X_df=X_df, - h=h, - level=level, - fitted=fitted, - prediction_intervals=prediction_intervals, - id_col=id_col, - time_col=time_col, - target_col=target_col, - ) - if _id_as_idx(): - result = result.reset_index() - return result - - def _cv( - self, - df: pd.DataFrame, - *, - models, - freq, - fallback_model, - h, - n_windows, - step_size, - test_size, - input_size, - level, - refit, - fitted, - prediction_intervals, - id_col, - time_col, - target_col, - ) -> pd.DataFrame: - model = _StatsForecast( - models=models, - freq=freq, - fallback_model=fallback_model, - n_jobs=1, - ) - result = model.cross_validation( - df=df, - h=h, - n_windows=n_windows, - step_size=step_size, - test_size=test_size, - input_size=input_size, - level=level, - fitted=fitted, - refit=refit, - prediction_intervals=prediction_intervals, - id_col=id_col, - time_col=time_col, - target_col=target_col, - ) - if _id_as_idx(): - result = result.reset_index() - return result - - def _get_output_schema( - self, - *, - df, - models, - level, - mode, - id_col, - time_col, - target_col, - ) -> Schema: - keep_schema = fa.get_schema(df).extract([id_col, time_col]) - cols: List[Any] = [] - if level is None: - level = [] - for model in models: - has_levels = ( - "level" in inspect.signature(getattr(model, "forecast")).parameters - and len(level) > 0 - ) - cols.append((repr(model), np.float32)) - if has_levels: - cols.extend( - [(f"{repr(model)}-lo-{l}", np.float32) for l in reversed(level)] - ) - cols.extend([(f"{repr(model)}-hi-{l}", np.float32) for l in level]) - if mode == "cv": - cols = [ - ("cutoff", keep_schema[time_col].type), - (target_col, np.float32), - ] + cols - return keep_schema + Schema(cols) - @make_backend.candidate(lambda obj, *args, **kwargs: isinstance(obj, ExecutionEngine)) def _make_fugue_backend(obj: ExecutionEngine, *args, **kwargs) -> ParallelBackend: