From 6e61031be280595f7a2a1c9d527787706fb3029e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Morales?= Date: Fri, 1 Nov 2024 17:58:51 -0600 Subject: [PATCH 1/4] feat: support custom and integer frequencies --- nbs/src/nixtla_client.ipynb | 197 ++++++++++++++++++++++++------------ nixtla/nixtla_client.py | 148 +++++++++++++++------------ setup.py | 3 +- 3 files changed, 223 insertions(+), 125 deletions(-) diff --git a/nbs/src/nixtla_client.ipynb b/nbs/src/nixtla_client.ipynb index 5bf9b8cf..920ef0c9 100644 --- a/nbs/src/nixtla_client.ipynb +++ b/nbs/src/nixtla_client.ipynb @@ -22,7 +22,7 @@ "metadata": {}, "outputs": [], "source": [ - "#| hide \n", + "#| hide\n", "%load_ext autoreload\n", "%autoreload 2" ] @@ -171,6 +171,8 @@ "_Loss = Literal[\"default\", \"mae\", \"mse\", \"rmse\", \"mape\", \"smape\"]\n", "_Model = Literal[\"azureai\", \"timegpt-1\", \"timegpt-1-long-horizon\"]\n", "_Finetune_Depth = Literal[1, 2, 3, 4, 5]\n", + "_Freq = Union[str, int, pd.offsets.BaseOffset]\n", + "_FreqType = TypeVar(\"_FreqType\", str, int, pd.offsets.BaseOffset)\n", "\n", "_date_features_by_freq = {\n", " # Daily frequencies\n", @@ -256,11 +258,11 @@ "\n", "def _maybe_infer_freq(\n", " df: DataFrame,\n", - " freq: Optional[str],\n", + " freq: Optional[_FreqType],\n", " id_col: str,\n", " time_col: str,\n", - ") -> str:\n", - " if freq is not None and freq not in ['W', 'M', 'Q', 'Y', 'A']:\n", + ") -> _FreqType:\n", + " if freq is not None:\n", " return freq\n", " if isinstance(df, pl_DataFrame):\n", " raise ValueError(\n", @@ -280,19 +282,28 @@ " 'to inconsistent intervals. Please check your data for missing, '\n", " 'duplicated or irregular timestamps'\n", " )\n", - " if freq is not None:\n", - " # check we have the same base frequency\n", - " # except when we have yearly frequency (A, and Y means the same)\n", - " if (\n", - " (freq[0] != inferred_freq[0] and freq[0] not in ('A', 'Y'))\n", - " or (freq[0] in ('A', 'Y') and inferred_freq[0] not in ('A', 'Y'))\n", - " ):\n", - " raise RuntimeError(f'Failed to infer special date, inferred freq {inferred_freq}')\n", " logger.info(f'Inferred freq: {inferred_freq}')\n", " return inferred_freq\n", "\n", - "def _standardize_freq(freq: str) -> str:\n", - " return freq.replace('mo', 'MS')\n", + "def _standardize_freq(freq: _Freq, processed: ufp.ProcessedDF) -> str:\n", + " if isinstance(freq, str):\n", + " freq = freq.replace('mo', 'MS')\n", + " else:\n", + " from coreforecast.grouped_array import GroupedArray\n", + "\n", + " ga = GroupedArray(processed.data[:, 0], processed.indptr)\n", + " series_season_lengths = ga._greatest_autocovariance(54).astype('uint8')\n", + " season_lengths, counts = np.unique(series_season_lengths, return_counts=True)\n", + " season_length = season_lengths[np.argmax(counts)].item()\n", + " if season_length <= 7:\n", + " freq = 'D'\n", + " elif season_length <= 12:\n", + " freq = 'M'\n", + " elif season_length <= 24:\n", + " freq = 'H'\n", + " else:\n", + " freq = 'W'\n", + " return freq\n", "\n", "def _array_tails(\n", " x: np.ndarray,\n", @@ -357,13 +368,13 @@ " X_df: Optional[DFType],\n", " features: Union[bool, Sequence[Union[str, Callable]]],\n", " one_hot: Union[bool, list[str]],\n", - " freq: str,\n", + " freq: _Freq,\n", " h: int,\n", " id_col: str,\n", " time_col: str,\n", " target_col: str,\n", ") -> tuple[DFType, Optional[DFType]]:\n", - " if not features:\n", + " if not features or not isinstance(freq, str):\n", " return df, X_df\n", " if isinstance(features, list):\n", " date_features: Sequence[Union[str, Callable]] = features\n", @@ -462,12 +473,11 @@ " return df, X_df\n", "\n", "def _validate_input_size(\n", - " df: DataFrame,\n", - " id_col: str,\n", + " processed: ufp.ProcessedDF,\n", " model_input_size: int,\n", " model_horizon: int,\n", ") -> None:\n", - " min_size = ufp.counts_by_id(df, id_col)['counts'].min()\n", + " min_size = np.diff(processed.indptr).min().item()\n", " if min_size < model_input_size + model_horizon:\n", " raise ValueError(\n", " 'Some series are too short. '\n", @@ -890,8 +900,8 @@ " target_col: str,\n", " model: _Model,\n", " validate_api_key: bool,\n", - " freq: Optional[str],\n", - " ) -> tuple[DFType, Optional[DFType], bool, str]:\n", + " freq: Optional[_FreqType],\n", + " ) -> tuple[DFType, Optional[DFType], bool, _FreqType]:\n", " if validate_api_key and not self.validate_api_key(log=False):\n", " raise Exception('API Key not valid, please email ops@nixtla.io')\n", " if model not in self.supported_models:\n", @@ -917,15 +927,28 @@ " if ufp.is_nan_or_none(df[target_col]).any():\n", " raise ValueError(f'Target column ({target_col}) cannot contain missing values.')\n", " freq = _maybe_infer_freq(df, freq=freq, id_col=id_col, time_col=time_col)\n", - " expected_ids_times = id_time_grid(\n", - " df,\n", - " freq=freq,\n", - " start=\"per_serie\",\n", - " end=\"per_serie\",\n", - " id_col=id_col,\n", - " time_col=time_col,\n", - " )\n", - " if len(df) != len(expected_ids_times):\n", + " if isinstance(freq, (str, int)):\n", + " expected_ids_times = id_time_grid(\n", + " df,\n", + " freq=freq,\n", + " start=\"per_serie\",\n", + " end=\"per_serie\",\n", + " id_col=id_col,\n", + " time_col=time_col,\n", + " )\n", + " freq_ok = len(df) == len(expected_ids_times)\n", + " elif isinstance(freq, pd.offsets.BaseOffset):\n", + " times_by_id = df.groupby(id_col, observed=True)[time_col].agg(['min', 'max', 'size'])\n", + " with warnings.catch_warnings():\n", + " warnings.filterwarnings('ignore', category=pd.errors.PerformanceWarning)\n", + " expected_ends = times_by_id['min'] + freq * (times_by_id['size'] - 1)\n", + " freq_ok = (expected_ends == times_by_id['max']).all()\n", + " else:\n", + " raise ValueError(\n", + " \"`freq` should be a string, integer or pandas offset, \"\n", + " f\"got {type(freq).__name__}.\"\n", + " )\n", + " if not freq_ok:\n", " raise ValueError(\n", " \"Series contain missing or duplicate timestamps, or the timestamps \"\n", " \"do not match the provided frequency.\\n\"\n", @@ -956,7 +979,7 @@ " self,\n", " df: DistributedDFType,\n", " h: _PositiveInt,\n", - " freq: Optional[str],\n", + " freq: Optional[_Freq],\n", " id_col: str,\n", " time_col: str,\n", " target_col: str,\n", @@ -1040,7 +1063,7 @@ " self,\n", " df: AnyDFType,\n", " h: _PositiveInt,\n", - " freq: Optional[str] = None, \n", + " freq: Optional[_Freq] = None, \n", " id_col: str = 'unique_id',\n", " time_col: str = 'ds',\n", " target_col: str = 'y',\n", @@ -1078,8 +1101,8 @@ " corresponds to a unique time series.\n", " h : int\n", " Forecast horizon.\n", - " freq : str\n", - " Frequency of the data. By default, the freq will be inferred automatically.\n", + " freq : str, int or pandas offset, optional (default=None).\n", + " Frequency of the timestamps. If `None`, it will be inferred automatically.\n", " See [pandas' available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).\n", " id_col : str (default='unique_id')\n", " Column that identifies each serie.\n", @@ -1189,29 +1212,29 @@ " hist_exog=hist_exog_list,\n", " )\n", " level, quantiles = _prepare_level_and_quantiles(level, quantiles)\n", - " standard_freq = _standardize_freq(freq)\n", - " model_input_size, model_horizon = self._get_model_params(model, standard_freq)\n", - " if finetune_steps > 0 or level is not None or add_history:\n", - " _validate_input_size(df, id_col, model_input_size, model_horizon)\n", - " if h > model_horizon:\n", - " logger.warning(\n", - " 'The specified horizon \"h\" exceeds the model horizon. '\n", - " 'This may lead to less accurate forecasts. '\n", - " 'Please consider using a smaller horizon.' \n", - " )\n", "\n", " logger.info('Preprocessing dataframes...')\n", " processed, X_future, x_cols, futr_cols = _preprocess(\n", " df=df,\n", " X_df=X_df,\n", " h=h,\n", - " freq=standard_freq,\n", + " freq=freq,\n", " date_features=date_features,\n", " date_features_to_one_hot=date_features_to_one_hot,\n", " id_col=id_col,\n", " time_col=time_col,\n", " target_col=target_col,\n", " )\n", + " standard_freq = _standardize_freq(freq, processed)\n", + " model_input_size, model_horizon = self._get_model_params(model, standard_freq)\n", + " if finetune_steps > 0 or level is not None or add_history:\n", + " _validate_input_size(processed, model_input_size, model_horizon)\n", + " if h > model_horizon:\n", + " logger.warning(\n", + " 'The specified horizon \"h\" exceeds the model horizon, '\n", + " 'this may lead to less accurate forecasts. '\n", + " 'Please consider using a smaller horizon.' \n", + " )\n", " restrict_input = finetune_steps == 0 and not x_cols and not add_history\n", " if restrict_input:\n", " logger.info('Restricting input...')\n", @@ -1322,7 +1345,7 @@ " def _distributed_detect_anomalies(\n", " self,\n", " df: DistributedDFType,\n", - " freq: Optional[str],\n", + " freq: Optional[_Freq],\n", " id_col: str,\n", " time_col: str,\n", " target_col: str,\n", @@ -1372,7 +1395,7 @@ " def detect_anomalies(\n", " self,\n", " df: AnyDFType,\n", - " freq: Optional[str] = None, \n", + " freq: Optional[_Freq] = None, \n", " id_col: str = 'unique_id',\n", " time_col: str = 'ds',\n", " target_col: str = 'y',\n", @@ -1400,8 +1423,8 @@ " - id_col:\n", " Column name in `df` that identifies unique time series. Each unique value in this column\n", " corresponds to a unique time series.\n", - " freq : str\n", - " Frequency of the data. By default, the freq will be inferred automatically.\n", + " freq : str, int or pandas offset, optional (default=None).\n", + " Frequency of the timestamps. If `None`, it will be inferred automatically.\n", " See [pandas' available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).\n", " id_col : str (default='unique_id')\n", " Column that identifies each serie.\n", @@ -1468,21 +1491,21 @@ " model=model,\n", " freq=freq,\n", " )\n", - " standard_freq = _standardize_freq(freq)\n", - " model_input_size, model_horizon = self._get_model_params(model, standard_freq)\n", "\n", " logger.info('Preprocessing dataframes...')\n", " processed, _, x_cols, _ = _preprocess(\n", " df=df,\n", " X_df=None,\n", " h=0,\n", - " freq=standard_freq,\n", + " freq=freq,\n", " date_features=date_features,\n", " date_features_to_one_hot=date_features_to_one_hot,\n", " id_col=id_col,\n", " time_col=time_col,\n", " target_col=target_col,\n", " )\n", + " standard_freq = _standardize_freq(freq, processed)\n", + " model_input_size, model_horizon = self._get_model_params(model, standard_freq)\n", " if processed.data.shape[1] > 1:\n", " X = processed.data[:, 1:].T\n", " logger.info(f'Using the following exogenous features: {x_cols}')\n", @@ -1528,7 +1551,7 @@ " self,\n", " df: DistributedDFType,\n", " h: _PositiveInt,\n", - " freq: Optional[str],\n", + " freq: Optional[_Freq],\n", " id_col: str,\n", " time_col: str,\n", " target_col: str,\n", @@ -1592,7 +1615,7 @@ " self,\n", " df: AnyDFType,\n", " h: _PositiveInt,\n", - " freq: Optional[str] = None,\n", + " freq: Optional[_Freq] = None,\n", " id_col: str = \"unique_id\",\n", " time_col: str = \"ds\",\n", " target_col: str = \"y\",\n", @@ -1628,8 +1651,8 @@ " corresponds to a unique time series.\n", " h : int\n", " Forecast horizon.\n", - " freq : str\n", - " Frequency of the data. By default, the freq will be inferred automatically.\n", + " freq : str, int or pandas offset, optional (default=None).\n", + " Frequency of the timestamps. If `None`, it will be inferred automatically.\n", " See [pandas' available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).\n", " id_col : str (default='unique_id')\n", " Column that identifies each serie.\n", @@ -1721,9 +1744,7 @@ " model=model,\n", " freq=freq,\n", " )\n", - " standard_freq = _standardize_freq(freq)\n", " level, quantiles = _prepare_level_and_quantiles(level, quantiles)\n", - " model_input_size, model_horizon = self._get_model_params(model, standard_freq)\n", " if step_size is None:\n", " step_size = h\n", "\n", @@ -1732,13 +1753,15 @@ " df=df,\n", " X_df=None,\n", " h=0,\n", - " freq=standard_freq,\n", + " freq=freq,\n", " date_features=date_features,\n", " date_features_to_one_hot=date_features_to_one_hot,\n", " id_col=id_col,\n", " time_col=time_col,\n", " target_col=target_col,\n", " )\n", + " standard_freq = _standardize_freq(freq, processed)\n", + " model_input_size, model_horizon = self._get_model_params(model, standard_freq)\n", " if isinstance(df, pd.DataFrame):\n", " # in pandas<2.2 to_numpy can lead to an object array if\n", " # the type is a pandas nullable type, e.g. pd.Float64Dtype\n", @@ -1943,7 +1966,7 @@ " df: pd.DataFrame,\n", " client: NixtlaClient,\n", " h: _PositiveInt,\n", - " freq: Optional[str],\n", + " freq: Optional[_Freq],\n", " id_col: str,\n", " time_col: str,\n", " target_col: str,\n", @@ -1993,7 +2016,7 @@ "def _detect_anomalies_wrapper(\n", " df: pd.DataFrame,\n", " client: NixtlaClient,\n", - " freq: Optional[str],\n", + " freq: Optional[_Freq],\n", " id_col: str,\n", " time_col: str,\n", " target_col: str,\n", @@ -2024,7 +2047,7 @@ " df: pd.DataFrame,\n", " client: NixtlaClient,\n", " h: _PositiveInt,\n", - " freq: Optional[str],\n", + " freq: Optional[_Freq],\n", " id_col: str,\n", " time_col: str,\n", " target_col: str,\n", @@ -2205,6 +2228,56 @@ ")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide\n", + "# custom freq\n", + "client = NixtlaClient()\n", + "custom_business_hours = pd.tseries.offsets.CustomBusinessHour(\n", + " start='09:00',\n", + " end='16:00',\n", + " holidays=[\n", + " '2022-12-25', # Christmas\n", + " '2022-01-01', # New Year's Day\n", + " ]\n", + ")\n", + "series = pd.DataFrame({\n", + " 'unique_id': 1,\n", + " 'ds': pd.date_range(start='2000-01-03 09', freq=custom_business_hours, periods=100),\n", + " 'y': np.arange(100) % 7,\n", + "})\n", + "series = pd.concat([series.assign(unique_id=i) for i in range(10)]).reset_index(drop=True)\n", + "client.detect_anomalies(df=series, freq=custom_business_hours, level=90)\n", + "client.cross_validation(df=series, freq=custom_business_hours, h=7)\n", + "fcst = client.forecast(df=series, freq=custom_business_hours, h=7)\n", + "assert sorted(fcst['ds'].dt.hour.unique().tolist()) == list(range(9, 16))\n", + "assert list(client._model_params.keys()) == [('timegpt-1', 'D')]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide\n", + "# integer freq\n", + "client = NixtlaClient()\n", + "series = generate_series(5, freq='H', min_length=200)\n", + "series['ds'] = series.groupby('unique_id', observed=True)['ds'].cumcount()\n", + "client.detect_anomalies(df=series, level=90, freq=1)\n", + "client.cross_validation(df=series, h=7, freq=1)\n", + "fcst = client.forecast(df=series, h=7, freq=1)\n", + "train_ends = series.groupby('unique_id', observed=True)['ds'].max()\n", + "fcst_ends = fcst.groupby('unique_id', observed=True)['ds'].max()\n", + "pd.testing.assert_series_equal(fcst_ends, train_ends + 7)\n", + "assert list(client._model_params.keys()) == [('timegpt-1', 'H')]" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/nixtla/nixtla_client.py b/nixtla/nixtla_client.py index 514323f1..d0e86342 100644 --- a/nixtla/nixtla_client.py +++ b/nixtla/nixtla_client.py @@ -100,6 +100,8 @@ _Loss = Literal["default", "mae", "mse", "rmse", "mape", "smape"] _Model = Literal["azureai", "timegpt-1", "timegpt-1-long-horizon"] _Finetune_Depth = Literal[1, 2, 3, 4, 5] +_Freq = Union[str, int, pd.offsets.BaseOffset] +_FreqType = TypeVar("_FreqType", str, int, pd.offsets.BaseOffset) _date_features_by_freq = { # Daily frequencies @@ -184,11 +186,11 @@ def after_retry(retry_state: RetryCallState) -> None: def _maybe_infer_freq( df: DataFrame, - freq: Optional[str], + freq: Optional[_FreqType], id_col: str, time_col: str, -) -> str: - if freq is not None and freq not in ["W", "M", "Q", "Y", "A"]: +) -> _FreqType: + if freq is not None: return freq if isinstance(df, pl_DataFrame): raise ValueError( @@ -208,21 +210,29 @@ def _maybe_infer_freq( "to inconsistent intervals. Please check your data for missing, " "duplicated or irregular timestamps" ) - if freq is not None: - # check we have the same base frequency - # except when we have yearly frequency (A, and Y means the same) - if (freq[0] != inferred_freq[0] and freq[0] not in ("A", "Y")) or ( - freq[0] in ("A", "Y") and inferred_freq[0] not in ("A", "Y") - ): - raise RuntimeError( - f"Failed to infer special date, inferred freq {inferred_freq}" - ) logger.info(f"Inferred freq: {inferred_freq}") return inferred_freq -def _standardize_freq(freq: str) -> str: - return freq.replace("mo", "MS") +def _standardize_freq(freq: _Freq, processed: ufp.ProcessedDF) -> str: + if isinstance(freq, str): + freq = freq.replace("mo", "MS") + else: + from coreforecast.grouped_array import GroupedArray + + ga = GroupedArray(processed.data[:, 0], processed.indptr) + series_season_lengths = ga._greatest_autocovariance(54).astype("uint8") + season_lengths, counts = np.unique(series_season_lengths, return_counts=True) + season_length = season_lengths[np.argmax(counts)].item() + if season_length <= 7: + freq = "D" + elif season_length <= 12: + freq = "M" + elif season_length <= 24: + freq = "H" + else: + freq = "W" + return freq def _array_tails( @@ -288,13 +298,13 @@ def _maybe_add_date_features( X_df: Optional[DFType], features: Union[bool, Sequence[Union[str, Callable]]], one_hot: Union[bool, list[str]], - freq: str, + freq: _Freq, h: int, id_col: str, time_col: str, target_col: str, ) -> tuple[DFType, Optional[DFType]]: - if not features: + if not features or not isinstance(freq, str): return df, X_df if isinstance(features, list): date_features: Sequence[Union[str, Callable]] = features @@ -393,12 +403,11 @@ def _validate_exog( def _validate_input_size( - df: DataFrame, - id_col: str, + processed: ufp.ProcessedDF, model_input_size: int, model_horizon: int, ) -> None: - min_size = ufp.counts_by_id(df, id_col)["counts"].min() + min_size = np.diff(processed.indptr).min().item() if min_size < model_input_size + model_horizon: raise ValueError( "Some series are too short. " @@ -820,8 +829,8 @@ def _run_validations( target_col: str, model: _Model, validate_api_key: bool, - freq: Optional[str], - ) -> tuple[DFType, Optional[DFType], bool, str]: + freq: Optional[_FreqType], + ) -> tuple[DFType, Optional[DFType], bool, _FreqType]: if validate_api_key and not self.validate_api_key(log=False): raise Exception("API Key not valid, please email ops@nixtla.io") if model not in self.supported_models: @@ -849,15 +858,30 @@ def _run_validations( f"Target column ({target_col}) cannot contain missing values." ) freq = _maybe_infer_freq(df, freq=freq, id_col=id_col, time_col=time_col) - expected_ids_times = id_time_grid( - df, - freq=freq, - start="per_serie", - end="per_serie", - id_col=id_col, - time_col=time_col, - ) - if len(df) != len(expected_ids_times): + if isinstance(freq, (str, int)): + expected_ids_times = id_time_grid( + df, + freq=freq, + start="per_serie", + end="per_serie", + id_col=id_col, + time_col=time_col, + ) + freq_ok = len(df) == len(expected_ids_times) + elif isinstance(freq, pd.offsets.BaseOffset): + times_by_id = df.groupby(id_col, observed=True)[time_col].agg( + ["min", "max", "size"] + ) + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=pd.errors.PerformanceWarning) + expected_ends = times_by_id["min"] + freq * (times_by_id["size"] - 1) + freq_ok = (expected_ends == times_by_id["max"]).all() + else: + raise ValueError( + "`freq` should be a string, integer or pandas offset, " + f"got {type(freq).__name__}." + ) + if not freq_ok: raise ValueError( "Series contain missing or duplicate timestamps, or the timestamps " "do not match the provided frequency.\n" @@ -887,7 +911,7 @@ def _distributed_forecast( self, df: DistributedDFType, h: _PositiveInt, - freq: Optional[str], + freq: Optional[_Freq], id_col: str, time_col: str, target_col: str, @@ -972,7 +996,7 @@ def forecast( self, df: AnyDFType, h: _PositiveInt, - freq: Optional[str] = None, + freq: Optional[_Freq] = None, id_col: str = "unique_id", time_col: str = "ds", target_col: str = "y", @@ -1010,8 +1034,8 @@ def forecast( corresponds to a unique time series. h : int Forecast horizon. - freq : str - Frequency of the data. By default, the freq will be inferred automatically. + freq : str, int or pandas offset, optional (default=None). + Frequency of the timestamps. If `None`, it will be inferred automatically. See [pandas' available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases). id_col : str (default='unique_id') Column that identifies each serie. @@ -1121,29 +1145,29 @@ def forecast( hist_exog=hist_exog_list, ) level, quantiles = _prepare_level_and_quantiles(level, quantiles) - standard_freq = _standardize_freq(freq) - model_input_size, model_horizon = self._get_model_params(model, standard_freq) - if finetune_steps > 0 or level is not None or add_history: - _validate_input_size(df, id_col, model_input_size, model_horizon) - if h > model_horizon: - logger.warning( - 'The specified horizon "h" exceeds the model horizon. ' - "This may lead to less accurate forecasts. " - "Please consider using a smaller horizon." - ) logger.info("Preprocessing dataframes...") processed, X_future, x_cols, futr_cols = _preprocess( df=df, X_df=X_df, h=h, - freq=standard_freq, + freq=freq, date_features=date_features, date_features_to_one_hot=date_features_to_one_hot, id_col=id_col, time_col=time_col, target_col=target_col, ) + standard_freq = _standardize_freq(freq, processed) + model_input_size, model_horizon = self._get_model_params(model, standard_freq) + if finetune_steps > 0 or level is not None or add_history: + _validate_input_size(processed, model_input_size, model_horizon) + if h > model_horizon: + logger.warning( + 'The specified horizon "h" exceeds the model horizon, ' + "this may lead to less accurate forecasts. " + "Please consider using a smaller horizon." + ) restrict_input = finetune_steps == 0 and not x_cols and not add_history if restrict_input: logger.info("Restricting input...") @@ -1260,7 +1284,7 @@ def forecast( def _distributed_detect_anomalies( self, df: DistributedDFType, - freq: Optional[str], + freq: Optional[_Freq], id_col: str, time_col: str, target_col: str, @@ -1310,7 +1334,7 @@ def _distributed_detect_anomalies( def detect_anomalies( self, df: AnyDFType, - freq: Optional[str] = None, + freq: Optional[_Freq] = None, id_col: str = "unique_id", time_col: str = "ds", target_col: str = "y", @@ -1338,8 +1362,8 @@ def detect_anomalies( - id_col: Column name in `df` that identifies unique time series. Each unique value in this column corresponds to a unique time series. - freq : str - Frequency of the data. By default, the freq will be inferred automatically. + freq : str, int or pandas offset, optional (default=None). + Frequency of the timestamps. If `None`, it will be inferred automatically. See [pandas' available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases). id_col : str (default='unique_id') Column that identifies each serie. @@ -1406,21 +1430,21 @@ def detect_anomalies( model=model, freq=freq, ) - standard_freq = _standardize_freq(freq) - model_input_size, model_horizon = self._get_model_params(model, standard_freq) logger.info("Preprocessing dataframes...") processed, _, x_cols, _ = _preprocess( df=df, X_df=None, h=0, - freq=standard_freq, + freq=freq, date_features=date_features, date_features_to_one_hot=date_features_to_one_hot, id_col=id_col, time_col=time_col, target_col=target_col, ) + standard_freq = _standardize_freq(freq, processed) + model_input_size, model_horizon = self._get_model_params(model, standard_freq) if processed.data.shape[1] > 1: X = processed.data[:, 1:].T logger.info(f"Using the following exogenous features: {x_cols}") @@ -1468,7 +1492,7 @@ def _distributed_cross_validation( self, df: DistributedDFType, h: _PositiveInt, - freq: Optional[str], + freq: Optional[_Freq], id_col: str, time_col: str, target_col: str, @@ -1532,7 +1556,7 @@ def cross_validation( self, df: AnyDFType, h: _PositiveInt, - freq: Optional[str] = None, + freq: Optional[_Freq] = None, id_col: str = "unique_id", time_col: str = "ds", target_col: str = "y", @@ -1568,8 +1592,8 @@ def cross_validation( corresponds to a unique time series. h : int Forecast horizon. - freq : str - Frequency of the data. By default, the freq will be inferred automatically. + freq : str, int or pandas offset, optional (default=None). + Frequency of the timestamps. If `None`, it will be inferred automatically. See [pandas' available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases). id_col : str (default='unique_id') Column that identifies each serie. @@ -1661,9 +1685,7 @@ def cross_validation( model=model, freq=freq, ) - standard_freq = _standardize_freq(freq) level, quantiles = _prepare_level_and_quantiles(level, quantiles) - model_input_size, model_horizon = self._get_model_params(model, standard_freq) if step_size is None: step_size = h @@ -1672,13 +1694,15 @@ def cross_validation( df=df, X_df=None, h=0, - freq=standard_freq, + freq=freq, date_features=date_features, date_features_to_one_hot=date_features_to_one_hot, id_col=id_col, time_col=time_col, target_col=target_col, ) + standard_freq = _standardize_freq(freq, processed) + model_input_size, model_horizon = self._get_model_params(model, standard_freq) if isinstance(df, pd.DataFrame): # in pandas<2.2 to_numpy can lead to an object array if # the type is a pandas nullable type, e.g. pd.Float64Dtype @@ -1880,7 +1904,7 @@ def _forecast_wrapper( df: pd.DataFrame, client: NixtlaClient, h: _PositiveInt, - freq: Optional[str], + freq: Optional[_Freq], id_col: str, time_col: str, target_col: str, @@ -1931,7 +1955,7 @@ def _forecast_wrapper( def _detect_anomalies_wrapper( df: pd.DataFrame, client: NixtlaClient, - freq: Optional[str], + freq: Optional[_Freq], id_col: str, time_col: str, target_col: str, @@ -1963,7 +1987,7 @@ def _cross_validation_wrapper( df: pd.DataFrame, client: NixtlaClient, h: _PositiveInt, - freq: Optional[str], + freq: Optional[_Freq], id_col: str, time_col: str, target_col: str, diff --git a/setup.py b/setup.py index 3bdec9ea..e6929c79 100644 --- a/setup.py +++ b/setup.py @@ -42,12 +42,13 @@ python_requires=">=3.9", install_requires=[ "annotated-types", + "coreforecast>=0.0.14", "httpx", "orjson", "pandas", "tenacity", "tqdm", - "utilsforecast>=0.2.3", + "utilsforecast>=0.2.8", ], extras_require={ "dev": dev + plotting + date_extras, From bbf519e08da713cc307ff09a86a8f9dde7247ba6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Morales?= Date: Tue, 5 Nov 2024 17:41:53 -0600 Subject: [PATCH 2/4] remove bound from extra --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e6929c79..d9dd91b4 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ "tabulate", ] distributed = ["fugue[dask,ray,spark]>=0.8.7", "pandas<2.2", "ray<2.6.3"] -plotting = ["utilsforecast[plotting]>=0.2.7"] +plotting = ["utilsforecast[plotting]"] date_extras = ["holidays"] setuptools.setup( From 7e2ff19569bc75db155dd1f6365af42cf294d9b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Morales?= Date: Wed, 27 Nov 2024 13:10:44 -0600 Subject: [PATCH 3/4] deterministic freq --- nbs/src/nixtla_client.ipynb | 30 ++++++++++++------------------ nixtla/nixtla_client.py | 22 ++++++++-------------- setup.py | 1 - 3 files changed, 20 insertions(+), 33 deletions(-) diff --git a/nbs/src/nixtla_client.ipynb b/nbs/src/nixtla_client.ipynb index 920ef0c9..5cce8957 100644 --- a/nbs/src/nixtla_client.ipynb +++ b/nbs/src/nixtla_client.ipynb @@ -287,22 +287,16 @@ "\n", "def _standardize_freq(freq: _Freq, processed: ufp.ProcessedDF) -> str:\n", " if isinstance(freq, str):\n", + " # polars uses 'mo' for months, all other strings are compatible with pandas\n", " freq = freq.replace('mo', 'MS')\n", + " elif isinstance(freq, pd.offsets.BaseOffset):\n", + " freq = freq.freqstr\n", + " elif isinstance(freq, int):\n", + " freq = 'MS'\n", " else:\n", - " from coreforecast.grouped_array import GroupedArray\n", - "\n", - " ga = GroupedArray(processed.data[:, 0], processed.indptr)\n", - " series_season_lengths = ga._greatest_autocovariance(54).astype('uint8')\n", - " season_lengths, counts = np.unique(series_season_lengths, return_counts=True)\n", - " season_length = season_lengths[np.argmax(counts)].item()\n", - " if season_length <= 7:\n", - " freq = 'D'\n", - " elif season_length <= 12:\n", - " freq = 'M'\n", - " elif season_length <= 24:\n", - " freq = 'H'\n", - " else:\n", - " freq = 'W'\n", + " raise ValueError(\n", + " f\"`freq` must be a string, int or pandas offset, got {type(freq).__name__}\"\n", + " )\n", " return freq\n", "\n", "def _array_tails(\n", @@ -2247,15 +2241,15 @@ ")\n", "series = pd.DataFrame({\n", " 'unique_id': 1,\n", - " 'ds': pd.date_range(start='2000-01-03 09', freq=custom_business_hours, periods=100),\n", - " 'y': np.arange(100) % 7,\n", + " 'ds': pd.date_range(start='2000-01-03 09', freq=custom_business_hours, periods=200),\n", + " 'y': np.arange(200) % 7,\n", "})\n", "series = pd.concat([series.assign(unique_id=i) for i in range(10)]).reset_index(drop=True)\n", "client.detect_anomalies(df=series, freq=custom_business_hours, level=90)\n", "client.cross_validation(df=series, freq=custom_business_hours, h=7)\n", "fcst = client.forecast(df=series, freq=custom_business_hours, h=7)\n", "assert sorted(fcst['ds'].dt.hour.unique().tolist()) == list(range(9, 16))\n", - "assert list(client._model_params.keys()) == [('timegpt-1', 'D')]" + "assert list(client._model_params.keys()) == [('timegpt-1', 'cbh')]" ] }, { @@ -2275,7 +2269,7 @@ "train_ends = series.groupby('unique_id', observed=True)['ds'].max()\n", "fcst_ends = fcst.groupby('unique_id', observed=True)['ds'].max()\n", "pd.testing.assert_series_equal(fcst_ends, train_ends + 7)\n", - "assert list(client._model_params.keys()) == [('timegpt-1', 'H')]" + "assert list(client._model_params.keys()) == [('timegpt-1', 'MS')]" ] }, { diff --git a/nixtla/nixtla_client.py b/nixtla/nixtla_client.py index d0e86342..274b1531 100644 --- a/nixtla/nixtla_client.py +++ b/nixtla/nixtla_client.py @@ -216,22 +216,16 @@ def _maybe_infer_freq( def _standardize_freq(freq: _Freq, processed: ufp.ProcessedDF) -> str: if isinstance(freq, str): + # polars uses 'mo' for months, all other strings are compatible with pandas freq = freq.replace("mo", "MS") + elif isinstance(freq, pd.offsets.BaseOffset): + freq = freq.freqstr + elif isinstance(freq, int): + freq = "MS" else: - from coreforecast.grouped_array import GroupedArray - - ga = GroupedArray(processed.data[:, 0], processed.indptr) - series_season_lengths = ga._greatest_autocovariance(54).astype("uint8") - season_lengths, counts = np.unique(series_season_lengths, return_counts=True) - season_length = season_lengths[np.argmax(counts)].item() - if season_length <= 7: - freq = "D" - elif season_length <= 12: - freq = "M" - elif season_length <= 24: - freq = "H" - else: - freq = "W" + raise ValueError( + f"`freq` must be a string, int or pandas offset, got {type(freq).__name__}" + ) return freq diff --git a/setup.py b/setup.py index 827a52ee..8238e83f 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,6 @@ python_requires=">=3.9", install_requires=[ "annotated-types", - "coreforecast>=0.0.14", "httpx", "orjson", "pandas", From 39a36e61700ec6da3e2b275066e008ddf2a11d79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Morales?= Date: Mon, 2 Dec 2024 15:31:05 -0600 Subject: [PATCH 4/4] handle pandas<2.2 alias --- nbs/src/nixtla_client.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nbs/src/nixtla_client.ipynb b/nbs/src/nixtla_client.ipynb index 3587f242..8794da15 100644 --- a/nbs/src/nixtla_client.ipynb +++ b/nbs/src/nixtla_client.ipynb @@ -2383,7 +2383,7 @@ "client.cross_validation(df=series, freq=custom_business_hours, h=7)\n", "fcst = client.forecast(df=series, freq=custom_business_hours, h=7)\n", "assert sorted(fcst['ds'].dt.hour.unique().tolist()) == list(range(9, 16))\n", - "assert list(client._model_params.keys()) == [('timegpt-1', 'cbh')]" + "assert [(model, freq.lower()) for (model, freq) in client._model_params.keys()] == [('timegpt-1', 'cbh')]" ] }, {