diff --git a/experiments/amazon-chronos/README.md b/experiments/amazon-chronos/README.md new file mode 100644 index 00000000..3ea5d229 --- /dev/null +++ b/experiments/amazon-chronos/README.md @@ -0,0 +1,48 @@ +# A Statistical Ensemble of traditional methods is 10% more accurate and 5x faster than Amazon Chronos + +We present a comprehensive evaluation showcasing that a Statistical Ensemble, consisting of AutoARIMA, AutoETS, AutoCES, and DynamicOptimizedTheta, outperforms Amazon Chronos—a foundational model for time series forecasting with over 710 million parameters. Specifically, the **Statistical Ensemble demonstrates 10%, 10%, and 11% superior performance in CRPS, MASE, and SMAPE metrics, respectively**, and it is **5x faster**. This analysis spans over 50,000 unique time series across M1, M3, M4, and Tourism datasets, robustly comparing these models. + +# Introduction + +The rise of foundational models in time series forecasting, such as Amazon Chronos, represents a significant leap forward, leveraging deep learning and massive datasets for model pre-training to enhance predictive accuracy. Amazon Chronos, in particular, is noteworthy for its extensive parameterization and ambitious scope. However, our study shows that a comparatively simpler approach, employing a Statistical Ensemble of traditional forecasting methods, yields better accuracy and computational efficiency. + +## Empirical Evaluation + +This study considers over 50,000 unique time series from the M1, M3, M4, and Tourism datasets, spanning various time series frequencies. Chronos did not use these datasets in the training phase. We have also included comparisons to the Seasonal Naive model to provide a benchmark for traditional forecasting methods. + +## Results + +Our findings are shown in the following table, showcasing the performance across different metrics: CRPS, MASE, SMAPE, and computational time (in seconds). The best results are highlighted in **bold** for ease of reference. + +image + + +## Reproducibility + +To ensure the reproducibility of our findings, the Statistical Ensemble experiments were conducted on an AWS c5a.24xlarge instance, equipped with 96 vCPUs and 192 GiB of RAM. In contrast, the experiments for Amazon Chronos were carried out on an AWS g5.4xlarge GPU instance, which includes 16 vCPUs, 64 GiB of RAM, and an NVIDIA A10G Tensor Core GPU with 24 GiB. All necessary code and detailed instructions for reproducing the experiments are available in this directory. + +### Instructions + +1. Set up a Python environment: + +```bash +mamba env create -f environment.yml +conda activate amazon-chronos +``` + +2. Run the experiments as reported in the table: + +```bash +python -m src.main --mode fcst_statsforecast +python -m src.main --mode fcst_chronos +``` + +3. Evaluate the results using: + +```bash +python -m src.main --mode evaluation +``` + +### References +- **Statistical Ensemble Paper**: [A Simple Combination of Univariate Models](https://www.sciencedirect.com/science/article/abs/pii/S0169207019300585?via%3Dihub) +- **Amazon Chronos Paper**: [Chronos: Learning the Language of Time Series](https://arxiv.org/abs/2403.07815) diff --git a/experiments/amazon-chronos/environment.yml b/experiments/amazon-chronos/environment.yml new file mode 100644 index 00000000..a028a4dd --- /dev/null +++ b/experiments/amazon-chronos/environment.yml @@ -0,0 +1,20 @@ +name: amazon-chronos +channels: + - conda-forge + - defaults + - anaconda +dependencies: + - jupyterlab + - pip + - python=3.10 + - pip: + - datasetsforecast + - fire + - gluonts + - huggingface_hub[cli] + - neuralforecast + - orjson + - statsforecast + - utilsforecast + - git+https://github.com/amazon-science/chronos-forecasting.git + diff --git a/experiments/amazon-chronos/src/amazon_chronos/forecaster.py b/experiments/amazon-chronos/src/amazon_chronos/forecaster.py new file mode 100644 index 00000000..85e52b6c --- /dev/null +++ b/experiments/amazon-chronos/src/amazon_chronos/forecaster.py @@ -0,0 +1,122 @@ +import logging +from typing import Iterable, List + +import numpy as np +import pandas as pd +import torch +from chronos import ChronosPipeline +from utilsforecast.processing import make_future_dataframe + +logging.basicConfig(level=logging.INFO) +main_logger = logging.getLogger(__name__) + + +class TimeSeriesDataset: + def __init__( + self, + data: torch.Tensor, + uids: Iterable, + last_times: Iterable, + batch_size: int, + ): + self.data = data + self.uids = uids + self.last_times = last_times + self.batch_size = batch_size + self.n_batches = len(data) // self.batch_size + ( + 0 if len(data) % self.batch_size == 0 else 1 + ) + self.current_batch = 0 + + @classmethod + def from_df(cls, df: pd.DataFrame, batch_size: int): + num_unique_ids = df["unique_id"].nunique() + max_series_length = df["unique_id"].value_counts().max() + padded_tensor = torch.full( + size=(num_unique_ids, max_series_length), + fill_value=torch.nan, + dtype=torch.bfloat16, + ) # type: ignore + df_sorted = df.sort_values(by=["unique_id", "ds"]) + for idx, (_, group) in enumerate(df_sorted.groupby("unique_id")): + series_length = len(group) + padded_tensor[idx, -series_length:] = torch.tensor( + group["y"].values, + dtype=torch.bfloat16, + ) + uids = df_sorted["unique_id"].unique() + last_times = df_sorted.groupby("unique_id")["ds"].tail(1) + return cls(padded_tensor, uids, last_times, batch_size) + + def __len__(self): + return len(self.data) + + def make_future_dataframe(self, h: int, freq: str) -> pd.DataFrame: + return make_future_dataframe( + uids=self.uids, + last_times=pd.to_datetime(self.last_times), + h=h, + freq=freq, + ) # type: ignore + + def __iter__(self): + self.current_batch = 0 # Reset for new iteration + return self + + def __next__(self): + if self.current_batch < self.n_batches: + start_idx = self.current_batch * self.batch_size + end_idx = start_idx + self.batch_size + self.current_batch += 1 + return self.data[start_idx:end_idx] + else: + raise StopIteration + + +class AmazonChronos: + def __init__(self, model_name: str): + self.model_name = model_name + self.model = ChronosPipeline.from_pretrained( + model_name, + device_map="auto", + torch_dtype=torch.bfloat16, + ) + + def forecast( + self, + df: pd.DataFrame, + h: int, + freq: str, + batch_size: int = 32, + quantiles: List[float] | None = None, + **predict_kwargs, + ) -> pd.DataFrame: + main_logger.info("transforming dataframe to tensor") + dataset = TimeSeriesDataset.from_df(df, batch_size=batch_size) + main_logger.info("forecasting") + fcsts = [self.model.predict(batch, prediction_length=h, **predict_kwargs) for batch in dataset] + fcst = torch.cat(fcsts) + main_logger.info("transforming forecast to dataframe") + fcst = fcst.numpy() + fcst_df = dataset.make_future_dataframe(h=h, freq=freq) + fcst_df[self.model_name] = np.median(fcst, axis=1).reshape(-1, 1) + if quantiles is not None: + for q in quantiles: + q_col = f"{self.model_name}-q-{q}" + fcst_df[q_col] = np.quantile(fcst, q, axis=1).reshape(-1, 1) + return fcst_df + + +if __name__ == "__main__": + import pandas as pd + + df = pd.read_csv( + "https://raw.githubusercontent.com/AileenNielsen/TimeSeriesAnalysisWithPython/master/data/AirPassengers.csv" + ) + df = df.rename(columns={"#Passengers": "y", "Month": "ds"}) + df["ds"] = pd.to_datetime(df["ds"]) + df.insert(0, "unique_id", "AirPassengers") + df = pd.concat([df, df.assign(unique_id="AirPassengers2")]) + model = AmazonChronos("amazon/chronos-t5-small") + fcst_df = model.forecast(df, h=12, freq="MS") + print(fcst_df) diff --git a/experiments/amazon-chronos/src/amazon_chronos/pipeline.py b/experiments/amazon-chronos/src/amazon_chronos/pipeline.py new file mode 100644 index 00000000..3a26dbb5 --- /dev/null +++ b/experiments/amazon-chronos/src/amazon_chronos/pipeline.py @@ -0,0 +1,51 @@ +import os +from time import time +from typing import List, Tuple + +import fire +import pandas as pd + + +from ..utils import ExperimentHandler +from .forecaster import AmazonChronos + + +def run_amazon_chronos( + train_df: pd.DataFrame, + model_name: str, + horizon: int, + freq: str, + quantiles: List[float], +) -> Tuple[pd.DataFrame, float, str]: + ac = AmazonChronos(model_name) + init_time = time() + fcsts_df = ac.forecast( + df=train_df, + h=horizon, + freq=freq, + batch_size=8, + quantiles=quantiles, + # parameters as in https://github.com/amazon-science/chronos-forecasting/blob/73be25042f5f587823d46106d372ba133152fb00/README.md?plain=1#L62-L65 + num_samples=20, + temperature=1.0, + top_k=50, + top_p=1.0, + ) + total_time = time() - init_time + return fcsts_df, total_time, model_name + + +def main(dataset: str, model_name: str): + exp = ExperimentHandler(dataset) + fcst_df, total_time, model_name = run_amazon_chronos( + train_df=exp.train_df, + model_name=model_name, + horizon=exp.horizon, + freq=exp.freq, + quantiles=exp.quantiles, + ) + exp.save_results(fcst_df, total_time, model_name) + + +if __name__ == "__main__": + fire.Fire(main) diff --git a/experiments/amazon-chronos/src/main.py b/experiments/amazon-chronos/src/main.py new file mode 100644 index 00000000..caba9536 --- /dev/null +++ b/experiments/amazon-chronos/src/main.py @@ -0,0 +1,76 @@ +import logging +import subprocess + +import fire +import pandas as pd + +from src.utils import ExperimentHandler + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +datasets = [ + "m1_yearly", + "m1_quarterly", + "m1_monthly", + "m3_yearly", + "m3_quarterly", + "m3_monthly", + "m3_other", + "tourism_yearly", + "tourism_quarterly", + "tourism_monthly", + "m4_yearly", + "m4_quarterly", +] + +amazon_chronos_models = [ + "amazon/chronos-t5-large", + "amazon/chronos-t5-tiny", + "amazon/chronos-t5-mini", + "amazon/chronos-t5-small", + "amazon/chronos-t5-base", +] + + +def main(mode: str): + prefix_process = ["python", "-m"] + + eval_df = None + for dataset in datasets: + logger.info(f"Evaluating {dataset}...") + if mode in ["fcst_statsforecast", "fcst_chronos"]: + suffix_process = ["--dataset", dataset] + + def process(middle_process): + return prefix_process + middle_process + suffix_process + + if mode == "fcst_statsforecast": + logger.info("Running StatisticalEnsemble") + subprocess.run(process(["src.statsforecast_pipeline"])) + elif mode == "fcst_chronos": + for model in amazon_chronos_models: + logger.info(f"Running Amazon Chronos {model}") + chronos_process = process(["src.amazon_chronos.pipeline"]) + chronos_process.extend(["--model_name", model]) + subprocess.run(chronos_process) + elif mode == "evaluation": + if eval_df is None: + eval_df = [] + logger.info("Running dataset evaluation") + exp = ExperimentHandler(dataset) + try: + eval_dataset_df = exp.evaluate_models( + amazon_chronos_models + ["StatisticalEnsemble", "SeasonalNaive"] + ) + print(eval_dataset_df) + eval_df.append(eval_dataset_df) + except Exception as e: + logger.error(e) + if eval_df is not None: + eval_df = pd.concat(eval_df).reset_index(drop=True) + exp.save_dataframe(eval_df, "complete-results.csv") + + +if __name__ == "__main__": + fire.Fire(main) diff --git a/experiments/amazon-chronos/src/statsforecast_pipeline.py b/experiments/amazon-chronos/src/statsforecast_pipeline.py new file mode 100644 index 00000000..c773e12e --- /dev/null +++ b/experiments/amazon-chronos/src/statsforecast_pipeline.py @@ -0,0 +1,132 @@ +import os +from time import time +from typing import List, Tuple + +os.environ["NIXTLA_NUMBA_RELEASE_GIL"] = "1" +os.environ["NIXTLA_NUMBA_CACHE"] = "1" + +import fire +import numpy as np +import pandas as pd +from scipy.stats import norm +from statsforecast import StatsForecast +from statsforecast.models import ( + AutoARIMA, + AutoETS, + AutoCES, + DynamicOptimizedTheta, + SeasonalNaive, +) + +from src.utils import ExperimentHandler + + +def run_seasonal_naive( + train_df: pd.DataFrame, + horizon: int, + freq: str, + seasonality: int, + level: List[int], +) -> Tuple[pd.DataFrame, float, str]: + os.environ["NIXTLA_ID_AS_COL"] = "true" + sf = StatsForecast( + models=[SeasonalNaive(season_length=seasonality)], + freq=freq, + n_jobs=-1, + ) + model = sf + init_time = time() + fcsts_df = model.forecast(df=train_df, h=horizon, level=level) + total_time = time() - init_time + return fcsts_df, total_time, "SeasonalNaive" + + +def ensemble_forecasts( + fcsts_df: pd.DataFrame, + quantiles: List[float], + name_models: List[str], + model_name: str, +) -> pd.DataFrame: + fcsts_df[model_name] = fcsts_df[name_models].mean(axis=1).values # type: ignore + # compute quantiles based on the mean of the forecasts + sigma_models = [] + for model in name_models: + fcsts_df[f"sigma_{model}"] = fcsts_df[f"{model}-hi-68.27"] - fcsts_df[model] + sigma_models.append(f"sigma_{model}") + fcsts_df[f"std_{model_name}"] = ( + fcsts_df[sigma_models].pow(2).sum(axis=1).div(len(sigma_models) ** 2).pow(0.5) + ) + z = norm.ppf(quantiles) + q_cols = [] + for q, zq in zip(quantiles, z): + q_col = f"{model_name}-q-{q}" + fcsts_df[q_col] = fcsts_df[model_name] + zq * fcsts_df[f"std_{model_name}"] + q_cols.append(q_col) + fcsts_df = fcsts_df[["unique_id", "ds"] + [model_name] + q_cols] + return fcsts_df + + +def run_statistical_ensemble( + train_df: pd.DataFrame, + horizon: int, + freq: str, + seasonality: int, + quantiles: List[float], +) -> Tuple[pd.DataFrame, float, str]: + os.environ["NIXTLA_ID_AS_COL"] = "true" + models = [ + AutoARIMA(season_length=seasonality), + AutoETS(season_length=seasonality), + AutoCES(season_length=seasonality), + DynamicOptimizedTheta(season_length=seasonality), + ] + init_time = time() + series_per_core = 15 + n_series = train_df["unique_id"].nunique() + n_jobs = min(n_series // series_per_core, os.cpu_count()) + sf = StatsForecast( + models=models, + freq=freq, + n_jobs=n_jobs, + ) + fcsts_df = sf.forecast(df=train_df, h=horizon, level=[68.27]) + name_models = [repr(model) for model in models] + model_name = "StatisticalEnsemble" + fcsts_df = ensemble_forecasts( + fcsts_df, + quantiles, + name_models, + model_name, + ) + total_time = time() - init_time + return fcsts_df, total_time, model_name + + +def main(dataset: str): + exp = ExperimentHandler(dataset) + # seasonal naive benchmark + fcst_df, total_time, model_name = run_seasonal_naive( + train_df=exp.train_df, + horizon=exp.horizon, + freq=exp.freq, + seasonality=exp.seasonality, + level=exp.level, + ) + fcst_df = exp.fcst_from_level_to_quantiles(fcst_df, model_name) + exp.save_results(fcst_df, total_time, model_name) + # statistical ensemble + fcst_df, total_time, model_name = run_statistical_ensemble( + train_df=exp.train_df, + horizon=exp.horizon, + freq=exp.freq, + seasonality=exp.seasonality, + quantiles=exp.quantiles, + ) + exp.save_results(fcst_df, total_time, model_name) + + +if __name__ == "__main__": + from statsforecast.utils import AirPassengers as ap + + AutoARIMA(season_length=12).forecast(ap.astype(np.float32), h=12) + fire.Fire(main) diff --git a/experiments/amazon-chronos/src/utils.py b/experiments/amazon-chronos/src/utils.py new file mode 100644 index 00000000..5e1172c2 --- /dev/null +++ b/experiments/amazon-chronos/src/utils.py @@ -0,0 +1,218 @@ +from functools import partial +from pathlib import Path +from typing import List + +import numpy as np +import pandas as pd +from gluonts.dataset import Dataset +from gluonts.dataset.repository.datasets import ( + get_dataset, + dataset_names as gluonts_datasets, +) +from gluonts.time_feature.seasonality import get_seasonality +from utilsforecast.evaluation import evaluate +from utilsforecast.losses import mase, smape + + +def quantile_loss( + df: pd.DataFrame, + models: list, + q: float = 0.5, + id_col: str = "unique_id", + target_col: str = "y", +) -> pd.DataFrame: + delta_y = df[models].sub(df[target_col], axis=0) + res = ( + np.maximum(q * delta_y, (q - 1) * delta_y) + .groupby(df[id_col], observed=True) + .mean() + ) + res.index.name = id_col + res = res.reset_index() + return res + + +class ExperimentHandler: + def __init__( + self, + dataset: str, + quantiles: List[float] = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], + results_dir: str = "./results", + models_dir: str = "./models", + ): + if dataset not in gluonts_datasets: + raise Exception( + f"dataset {dataset} not found in gluonts " + f"available datasets: {', '.join(gluonts_datasets)}" + ) + self.dataset = dataset + self.quantiles = quantiles + self.level = self._transform_quantiles_to_levels(quantiles) + self.results_dir = results_dir + self.models_dir = models_dir + # defining datasets + self._maybe_download_m3_file(self.dataset) + gluonts_dataset = get_dataset(self.dataset) + self.horizon = gluonts_dataset.metadata.prediction_length + if self.horizon is None: + raise Exception( + f"horizon not found for dataset {self.dataset} " + "experiment cannot be run" + ) + self.freq = gluonts_dataset.metadata.freq + self.seasonality = get_seasonality(self.freq) + self.gluonts_train_dataset = gluonts_dataset.train + self.gluonts_test_dataset = gluonts_dataset.test + self._create_dir_if_not_exists(self.results_dir) + + @staticmethod + def _maybe_download_m3_file(dataset: str): + if dataset[:2] == "m3": + m3_file = Path.home() / ".gluonts" / "datasets" / "M3C.xls" + if not m3_file.exists(): + from datasetsforecast.m3 import M3 + from datasetsforecast.utils import download_file + + download_file(m3_file.parent, M3.source_url) + + @staticmethod + def _transform_quantiles_to_levels(quantiles: List[float]) -> List[int]: + level = [ + int(100 - 200 * q) for q in quantiles if q < 0.5 + ] # in this case mean=mediain + level = sorted(list(set(level))) + return level + + @staticmethod + def _create_dir_if_not_exists(directory: str): + Path(directory).mkdir(parents=True, exist_ok=True) + + @staticmethod + def _transform_gluonts_instance_to_df( + ts: dict, + last_n: int | None = None, + ) -> pd.DataFrame: + start_period = ts["start"] + start_ds, freq = start_period.to_timestamp(), start_period.freq + target = ts["target"] + ds = pd.date_range(start=start_ds, freq=freq, periods=len(target)) + if last_n is not None: + target = target[-last_n:] + ds = ds[-last_n:] + ts_df = pd.DataFrame({"unique_id": ts["item_id"], "ds": ds, "y": target}) + return ts_df + + @staticmethod + def _transform_gluonts_dataset_to_df( + gluonts_dataset: Dataset, + last_n: int | None = None, + ) -> pd.DataFrame: + df = pd.concat( + [ + ExperimentHandler._transform_gluonts_instance_to_df(ts, last_n=last_n) + for ts in gluonts_dataset + ] + ) + df = df.reset_index(drop=True) + return df + + @property + def train_df(self) -> pd.DataFrame: + train_df = self._transform_gluonts_dataset_to_df(self.gluonts_train_dataset) + return train_df + + @property + def test_df(self) -> pd.DataFrame: + test_df = self._transform_gluonts_dataset_to_df( + self.gluonts_test_dataset, + last_n=self.horizon, + ) + return test_df + + def save_dataframe(self, df: pd.DataFrame, file_name: str): + df.to_csv(f"{self.results_dir}/{file_name}", index=False) + + def save_results(self, fcst_df: pd.DataFrame, total_time: float, model_name: str): + self.save_dataframe( + fcst_df, + f"{model_name}-{self.dataset}-fcst.csv", + ) + time_df = pd.DataFrame({"time": [total_time], "model": model_name}) + self.save_dataframe( + time_df, + f"{model_name}-{self.dataset}-time.csv", + ) + + def fcst_from_level_to_quantiles( + self, + fcst_df: pd.DataFrame, + model_name: str, + ) -> pd.DataFrame: + fcst_df = fcst_df.copy() + cols = ["unique_id", "ds", model_name] + for q in self.quantiles: + if q == 0.5: + col = f"{model_name}" + else: + lv = int(100 - 200 * q) + hi_or_lo = "lo" if lv > 0 else "hi" + lv = abs(lv) + col = f"{model_name}-{hi_or_lo}-{lv}" + q_col = f"{model_name}-q-{q}" + fcst_df[q_col] = fcst_df[col].values + cols.append(q_col) + return fcst_df[cols] + + def evaluate_models(self, models: List[str]) -> pd.DataFrame: + test_df = self.test_df + train_df = self.train_df + fcsts_df = [] + times_df = [] + for model in models: + fcst_method_df = pd.read_csv( + f"{self.results_dir}/{model}-{self.dataset}-fcst.csv" + ).set_index(["unique_id", "ds"]) + fcsts_df.append(fcst_method_df) + time_method_df = pd.read_csv( + f"{self.results_dir}/{model}-{self.dataset}-time.csv" + ) + times_df.append(time_method_df) + fcsts_df = pd.concat(fcsts_df, axis=1).reset_index() + fcsts_df["ds"] = pd.to_datetime(fcsts_df["ds"]) + times_df = pd.concat(times_df) + test_df = test_df.merge(fcsts_df, how="left") + assert test_df.isna().sum().sum() == 0, "merge contains nas" + # point evaluation + point_fcsts_cols = ["unique_id", "ds", "y"] + models + test_df["unique_id"] = test_df["unique_id"].astype(str) + train_df["unique_id"] = train_df["unique_id"].astype(str) + mase_seas = partial(mase, seasonality=self.seasonality) + eval_df = evaluate( + test_df[point_fcsts_cols], + train_df=train_df, + metrics=[smape, mase_seas], + ) + # probabilistic evaluation + eval_prob_df = [] + for q in self.quantiles: + prob_cols = [f"{model}-q-{q}" for model in models] + eval_q_df = quantile_loss(test_df, models=prob_cols, q=q) + eval_q_df[prob_cols] = eval_q_df[prob_cols] * self.horizon + eval_q_df = eval_q_df.rename(columns=dict(zip(prob_cols, models))) + eval_q_df["metric"] = f"quantile-loss-{q}" + eval_prob_df.append(eval_q_df) + eval_prob_df = pd.concat(eval_prob_df) + eval_prob_df = eval_prob_df.groupby("metric").sum().reset_index() + total_y = test_df["y"].sum() + eval_prob_df[models] = eval_prob_df[models] / total_y + eval_prob_df["metric"] = "scaled_crps" + eval_df = pd.concat([eval_df, eval_prob_df]).reset_index(drop=True) + eval_df = eval_df.groupby("metric").mean(numeric_only=True).reset_index() + eval_df = eval_df.melt(id_vars="metric", value_name="value", var_name="model") + times_df.insert(0, "metric", "time") + times_df = times_df.rename(columns={"time": "value"}) + eval_df = pd.concat([eval_df, times_df]) + eval_df.insert(0, "dataset", self.dataset) + eval_df = eval_df.sort_values(["dataset", "metric", "model"]) + eval_df = eval_df.reset_index(drop=True) + return eval_df