From 47346705cdcd4b69ef68be8611496a64aba2fb16 Mon Sep 17 00:00:00 2001 From: azul Date: Tue, 13 Feb 2024 08:18:21 -0600 Subject: [PATCH] =?UTF-8?q?feat:=20add=20lag-llama=20experiment=20(#224)?= =?UTF-8?q?=20y=20la=20=F0=9F=A7=80=E2=9C=96=EF=B8=8F=E2=98=95=EF=B8=8F=20?= =?UTF-8?q?=F0=9F=92=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- experiments/lag-llama/Makefile | 10 + experiments/lag-llama/README.md | 61 +++++ experiments/lag-llama/environment.yml | 18 ++ .../lag-llama/src/lag_llama_pipeline.py | 112 +++++++++ experiments/lag-llama/src/main.py | 69 ++++++ .../lag-llama/src/statsforecast_pipeline.py | 48 ++++ experiments/lag-llama/src/utils.py | 219 ++++++++++++++++++ 7 files changed, 537 insertions(+) create mode 100644 experiments/lag-llama/Makefile create mode 100644 experiments/lag-llama/README.md create mode 100644 experiments/lag-llama/environment.yml create mode 100644 experiments/lag-llama/src/lag_llama_pipeline.py create mode 100644 experiments/lag-llama/src/main.py create mode 100644 experiments/lag-llama/src/statsforecast_pipeline.py create mode 100644 experiments/lag-llama/src/utils.py diff --git a/experiments/lag-llama/Makefile b/experiments/lag-llama/Makefile new file mode 100644 index 00000000..e5c4bb3f --- /dev/null +++ b/experiments/lag-llama/Makefile @@ -0,0 +1,10 @@ +download_lag_llama_code: + @git clone https://github.com/time-series-foundation-models/lag-llama tempdir + @cp -R tempdir/data/ . + @cp -R tempdir/gluon_utils/ . + @cp -R tempdir/lag_llama/ . + @cp -R tempdir/requirements.txt lag-llama-requirements.txt + @rm -rf tempdir + +download_lag_llama_model: + @huggingface-cli download time-series-foundation-models/Lag-Llama lag-llama.ckpt --local-dir ./models/ diff --git a/experiments/lag-llama/README.md b/experiments/lag-llama/README.md new file mode 100644 index 00000000..06ce9746 --- /dev/null +++ b/experiments/lag-llama/README.md @@ -0,0 +1,61 @@ +# LagLLama is 40% less accurate than a simple SeasonalNaive and 1000x slower. + +We present a fully reproducible experiment showing that SeasonalNaive significantly outperforms LagLlama, a recently introduced open-source foundational model for time series forecasting (a deep learning architecture pre-trained on time series datasets). Specifically, **SeasonalNaive achieves 42%, 24%, and 16% better performance** in terms of MASE, MAPE, and CRPS respectively, and boasts **a 1,000x speed advantage**. These findings are based on an extensive analysis covering 105,289 unique time series from the M1, M3, M4, and Tourism datasets, which were omitted in the original LagLlama paper. + +# Introduction + +In the field of time series forecasting, recent developments have introduced foundational models such as LagLlama, which utilizes deep learning and extensive data for pretraining, aiming to enhance predictive performance and model complexity. LagLLama is to be praised as one of the first open-source foundational models. However, contrary to expectations, our analysis indicates that the traditional SeasonalNaive model, known for its straightforward approach of extending past seasonal trends into future predictions, outperforms LagLlama in terms of both accuracy and computational efficiency. + +## Empirical Evaluation + +The original paper uses 3,113 time series to assess the model performance. The original paper only reports CRPS and omits point forecast error metrics widely used in academia and industry, e.g. MASE and MAPE. + +Our evaluation encompasses 105,289 unique time series from different datasets, including M1, M3, M4, and Tourism, covering yearly, quarterly, monthly, weekly, daily, and hourly frequencies. This diverse dataset selection allows for a robust assessment of the models across various time series characteristics and forecasting horizons. We also reproduce results for Pedestrian Counts and Weather originally included in the paper/code to show that we are running LagLlama correctly. + +## Results + +The results are summarized in the following table, highlighting the performance metrics of MASE, MAPE, CRPS, and TIME (measured in seconds). The best results are indicated in **bold** for easy reference. + +image + + +## Reproducibility + +To ensure the reproducibility of our findings, the experiments were conducted on an AWS g5.4xlarge GPU instance equipped with 16 vCPUs, 64 GiB of RAM, and an NVIDIA A10G Tensor Core GPU (24 GiB). The complete code can be found in this repo. + +### Instructions + +1. Create a python environment using: +``` +mamba env create -f environment.yml +conda activate lag-llama +``` + +2. Add lag-llama code to your environment + +``` +make download_lag_llama_code +``` + +5. Download lag-llama model + +``` +make download_lag_llama_model +``` + +4. Install lag-llama requirements + +``` +pip install -r lag-llama-requirements.txt +``` + +5. Run complete experiments reported in the table + +``` +python -m src.main +``` + +### References +- **Lag-Llama Paper**: [Towards Foundation Models for Probabilistic Time Series Forecasting](https://arxiv.org/abs/2310.08278) +- **SeasonalNaive Implementation**: [GitHub Repository](https://github.com/nixtla/statsforecast/) +- **CRPS Replication Note**: The CRPS performance for `LagLlama` is replicated from the model's publicly available [Colab notebook](https://colab.research.google.com/drive/13HHKYL_HflHBKxDWycXgIUAHSeHRR5eo?usp=sharing), ensuring a fair comparison. diff --git a/experiments/lag-llama/environment.yml b/experiments/lag-llama/environment.yml new file mode 100644 index 00000000..1c8a8098 --- /dev/null +++ b/experiments/lag-llama/environment.yml @@ -0,0 +1,18 @@ +name: lag-llama +channels: + - conda-forge + - defaults + - anaconda +dependencies: + - jupyterlab + - pip + - python=3.10 + - pip: + - datasetsforecast + - fire + - huggingface_hub[cli] + - neuralforecast + - orjson + - statsforecast + - utilsforecast + diff --git a/experiments/lag-llama/src/lag_llama_pipeline.py b/experiments/lag-llama/src/lag_llama_pipeline.py new file mode 100644 index 00000000..5c0949fc --- /dev/null +++ b/experiments/lag-llama/src/lag_llama_pipeline.py @@ -0,0 +1,112 @@ +from time import time +from typing import Iterable, List, Tuple + +import fire +import pandas as pd +import torch +from gluonts.dataset import Dataset +from gluonts.model.forecast import Forecast +from gluonts.torch.model.predictor import PyTorchPredictor +from tqdm import tqdm + +from lag_llama.gluon.estimator import LagLlamaEstimator +from src.utils import ExperimentHandler + + +def get_lag_llama_predictor( + prediction_length: int, models_dir: str +) -> PyTorchPredictor: + model_path = f"{models_dir}/lag-llama.ckpt" + map_location = torch.device("cuda:0") if torch.cuda.is_available() else "cpu" + if map_location == "cpu": + raise ValueError("cpu is not supported in lagllama (there is a bug)") + ckpt = torch.load(model_path, map_location=map_location) + estimator_args = ckpt["hyper_parameters"]["model_kwargs"] + # this context length is reported in the paper + context_length = 32 + estimator = LagLlamaEstimator( + ckpt_path=model_path, + prediction_length=prediction_length, + context_length=context_length, + # estimator args + input_size=estimator_args["input_size"], + n_layer=estimator_args["n_layer"], + n_embd_per_head=estimator_args["n_embd_per_head"], + n_head=estimator_args["n_head"], + scaling=estimator_args["scaling"], + time_feat=estimator_args["time_feat"], + ) + lightning_module = estimator.create_lightning_module() + transformation = estimator.create_transformation() + predictor = estimator.create_predictor(transformation, lightning_module) + return predictor + + +def gluonts_instance_fcst_to_df( + fcst: Forecast, + quantiles: List[float], + model_name: str, +) -> pd.DataFrame: + point_forecast = fcst.mean + h = len(point_forecast) + dates = pd.date_range( + fcst.start_date.to_timestamp(), + freq=fcst.freq, + periods=h, + ) + fcst_df = pd.DataFrame( + { + "ds": dates, + "unique_id": fcst.item_id, + model_name: point_forecast, + } + ) + for q in quantiles: + fcst_df[f"{model_name}-q-{q}"] = fcst.quantile(q) + return fcst_df + + +def gluonts_fcsts_to_df( + fcsts: Iterable[Forecast], + quantiles: List[float], + model_name: str, +) -> pd.DataFrame: + df = [] + for fcst in tqdm(fcsts): + fcst_df = gluonts_instance_fcst_to_df(fcst, quantiles, model_name) + df.append(fcst_df) + return pd.concat(df).reset_index(drop=True) + + +def run_lag_llama( + gluonts_dataset: Dataset, + horizon: int, + quantiles: List[float], + models_dir: str, +) -> Tuple[pd.DataFrame, float, str]: + init_time = time() + predictor = get_lag_llama_predictor(horizon, models_dir) + fcsts = predictor.predict(gluonts_dataset, num_samples=100) + model_name = "LagLlama" + fcsts_df = gluonts_fcsts_to_df( + fcsts, + quantiles=quantiles, + model_name=model_name, + ) + total_time = time() - init_time + return fcsts_df, total_time, model_name + + +def main(dataset: str): + exp = ExperimentHandler(dataset) + fcst_df, total_time, model_name = run_lag_llama( + gluonts_dataset=exp.gluonts_train_dataset, + horizon=exp.horizon, + quantiles=exp.quantiles, + models_dir=exp.models_dir, + ) + exp._save_results(fcst_df, total_time, model_name) + + +if __name__ == "__main__": + fire.Fire(main) diff --git a/experiments/lag-llama/src/main.py b/experiments/lag-llama/src/main.py new file mode 100644 index 00000000..7e6de697 --- /dev/null +++ b/experiments/lag-llama/src/main.py @@ -0,0 +1,69 @@ +import logging +import subprocess + +import pandas as pd + +from src.utils import ExperimentHandler + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +not_included_datasets = [ + "m1_yearly", + "m1_quarterly", + "m1_monthly", + "m3_yearly", + "m3_quarterly", + "m3_monthly", + "m3_other", + "m4_yearly", + "m4_quarterly", + "m4_monthly", + "m4_weekly", + "m4_daily", + "m4_hourly", + "tourism_yearly", + "tourism_quarterly", + "tourism_monthly", +] + +test_paper_datasets = [ + "pedestrian_counts", + "weather", +] + +datasets = { + "not_included": not_included_datasets, + "test_set": test_paper_datasets, +} + + +def evaluate(): + eval_df = [] + prefix_process = ["python", "-m"] + + for name_group, groups in datasets.items(): + for dataset in groups: + logger.info(f"Evaluating {dataset}...") + suffix_process = ["--dataset", dataset] + process = ( + lambda middle_process: prefix_process + middle_process + suffix_process + ) + # running statsforecast and lagllama in separated + # processes because gluonts sets multiprocessing context + # see: https://github.com/awslabs/gluonts/blob/dev/src/gluonts/torch/__init__.py + logger.info("Running SeasonalNaive") + subprocess.run(process(["src.statsforecast_pipeline"])) + logger.info("Running LagLLama") + subprocess.run(process(["src.lag_llama_pipeline"])) + logger.info("Running dataset evaluation") + exp = ExperimentHandler(dataset) + eval_dataset_df = exp.evaluate_models(["LagLlama", "SeasonalNaive"]) + eval_dataset_df.insert(0, "paper", name_group) + eval_df.append(eval_dataset_df) + eval_df = pd.concat(eval_df).reset_index(drop=True) + exp.save_dataframe(eval_df, "complete-results.csv") + + +if __name__ == "__main__": + evaluate() diff --git a/experiments/lag-llama/src/statsforecast_pipeline.py b/experiments/lag-llama/src/statsforecast_pipeline.py new file mode 100644 index 00000000..474310d4 --- /dev/null +++ b/experiments/lag-llama/src/statsforecast_pipeline.py @@ -0,0 +1,48 @@ +import os +from time import time +from typing import List, Tuple + +import fire +import pandas as pd +from statsforecast import StatsForecast +from statsforecast.models import SeasonalNaive + +from src.utils import ExperimentHandler + + +def run_statsforecast( + train_df: pd.DataFrame, + horizon: int, + freq: str, + seasonality: int, + level: List[int], +) -> Tuple[pd.DataFrame, float, str]: + os.environ["NIXTLA_ID_AS_COL"] = "true" + models = [SeasonalNaive(season_length=seasonality)] + init_time = time() + sf = StatsForecast( + models=models, + freq=freq, + n_jobs=-1, + ) + fcsts_df = sf.forecast(df=train_df, h=horizon, level=level) + total_time = time() - init_time + model_name = repr(models[0]) + return fcsts_df, total_time, model_name + + +def main(dataset: str): + exp = ExperimentHandler(dataset) + fcst_df, total_time, model_name = run_statsforecast( + train_df=exp.train_df, + horizon=exp.horizon, + freq=exp.freq, + seasonality=exp.seasonality, + level=exp.level, + ) + fcst_df = exp._fcst_from_level_to_quantiles(fcst_df, model_name) + exp._save_results(fcst_df, total_time, model_name) + + +if __name__ == "__main__": + fire.Fire(main) diff --git a/experiments/lag-llama/src/utils.py b/experiments/lag-llama/src/utils.py new file mode 100644 index 00000000..2a703213 --- /dev/null +++ b/experiments/lag-llama/src/utils.py @@ -0,0 +1,219 @@ +from functools import partial +from pathlib import Path +from typing import List + +import numpy as np +import pandas as pd +from gluonts.dataset import Dataset +from gluonts.dataset.repository.datasets import ( + get_dataset, + dataset_names as gluonts_datasets, +) +from gluonts.time_feature.seasonality import get_seasonality +from utilsforecast.evaluation import evaluate +from utilsforecast.losses import mase, mape + + +def quantile_loss( + df: pd.DataFrame, + models: list, + q: float = 0.5, + id_col: str = "unique_id", + target_col: str = "y", +) -> pd.DataFrame: + delta_y = df[models].sub(df[target_col], axis=0) + res = ( + np.maximum(q * delta_y, (q - 1) * delta_y) + .groupby(df[id_col], observed=True) + .mean() + ) + res.index.name = id_col + res = res.reset_index() + return res + + +class ExperimentHandler: + def __init__( + self, + dataset: str, + # default quantiles taken from https://github.com/awslabs/gluonts/blob/08ab434b1e0946c21010ddbb4248288f8f043599/src/gluonts/evaluation/_base.py#L90C5-L90C68 + quantiles: List[float] = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], + results_dir: str = "./results", + models_dir: str = "./models", + ): + if dataset not in gluonts_datasets: + raise Exception( + f"dataset {dataset} not found in gluonts " + f"available datasets: {', '.join(gluonts_datasets)}" + ) + self.dataset = dataset + self.quantiles = quantiles + self.level = self._transform_quantiles_to_levels(quantiles) + self.results_dir = results_dir + self.models_dir = models_dir + # defining datasets + self._maybe_download_m3_file(self.dataset) + gluonts_dataset = get_dataset(self.dataset) + self.horizon = gluonts_dataset.metadata.prediction_length + if self.horizon is None: + raise Exception( + f"horizon not found for dataset {self.dataset} " + "experiment cannot be run" + ) + self.freq = gluonts_dataset.metadata.freq + self.seasonality = get_seasonality(self.freq) + self.gluonts_train_dataset = gluonts_dataset.train + self.gluonts_test_dataset = gluonts_dataset.test + self._create_dir_if_not_exists(self.results_dir) + + @staticmethod + def _maybe_download_m3_file(dataset: str): + if dataset[:2] == "m3": + m3_file = Path.home() / ".gluonts" / "datasets" / "M3C.xls" + if not m3_file.exists(): + from datasetsforecast.m3 import M3 + from datasetsforecast.utils import download_file + + download_file(m3_file.parent, M3.source_url) + + @staticmethod + def _transform_quantiles_to_levels(quantiles: List[float]) -> List[int]: + level = [ + int(100 - 200 * q) for q in quantiles if q < 0.5 + ] # in this case mean=mediain + level = sorted(list(set(level))) + return level + + @staticmethod + def _create_dir_if_not_exists(directory: str): + Path(directory).mkdir(parents=True, exist_ok=True) + + @staticmethod + def _transform_gluonts_instance_to_df( + ts: dict, + last_n: int | None = None, + ) -> pd.DataFrame: + start_period = ts["start"] + start_ds, freq = start_period.to_timestamp(), start_period.freq + target = ts["target"] + ds = pd.date_range(start=start_ds, freq=freq, periods=len(target)) + if last_n is not None: + target = target[-last_n:] + ds = ds[-last_n:] + ts_df = pd.DataFrame({"unique_id": ts["item_id"], "ds": ds, "y": target}) + return ts_df + + @staticmethod + def _transform_gluonts_dataset_to_df( + gluonts_dataset: Dataset, + last_n: int | None = None, + ) -> pd.DataFrame: + df = pd.concat( + [ + ExperimentHandler._transform_gluonts_instance_to_df(ts, last_n=last_n) + for ts in gluonts_dataset + ] + ) + df = df.reset_index(drop=True) + return df + + @property + def train_df(self) -> pd.DataFrame: + train_df = self._transform_gluonts_dataset_to_df(self.gluonts_train_dataset) + return train_df + + @property + def test_df(self) -> pd.DataFrame: + test_df = self._transform_gluonts_dataset_to_df( + self.gluonts_test_dataset, + last_n=self.horizon, + ) + return test_df + + def save_dataframe(self, df: pd.DataFrame, file_name: str): + df.to_csv(f"{self.results_dir}/{file_name}", index=False) + + def _save_results(self, fcst_df: pd.DataFrame, total_time: float, model_name: str): + self.save_dataframe( + fcst_df, + f"{model_name}-{self.dataset}-fcst.csv", + ) + time_df = pd.DataFrame({"time": [total_time], "model": model_name}) + self.save_dataframe( + time_df, + f"{model_name}-{self.dataset}-time.csv", + ) + + def _fcst_from_level_to_quantiles( + self, + fcst_df: pd.DataFrame, + model_name: str, + ) -> pd.DataFrame: + fcst_df = fcst_df.copy() + cols = ["unique_id", "ds", model_name] + for q in self.quantiles: + if q == 0.5: + col = f"{model_name}" + else: + lv = int(100 - 200 * q) + hi_or_lo = "lo" if lv > 0 else "hi" + lv = abs(lv) + col = f"{model_name}-{hi_or_lo}-{lv}" + q_col = f"{model_name}-q-{q}" + fcst_df[q_col] = fcst_df[col].values + cols.append(q_col) + return fcst_df[cols] + + def evaluate_models(self, models: List[str]) -> pd.DataFrame: + test_df = self.test_df + train_df = self.train_df + fcsts_df = [] + times_df = [] + for model in models: + fcst_method_df = pd.read_csv( + f"{self.results_dir}/{model}-{self.dataset}-fcst.csv" + ).set_index(["unique_id", "ds"]) + fcsts_df.append(fcst_method_df) + time_method_df = pd.read_csv( + f"{self.results_dir}/{model}-{self.dataset}-time.csv" + ) + times_df.append(time_method_df) + fcsts_df = pd.concat(fcsts_df, axis=1).reset_index() + fcsts_df["ds"] = pd.to_datetime(fcsts_df["ds"]) + times_df = pd.concat(times_df) + test_df = test_df.merge(fcsts_df, how="left") + assert test_df.isna().sum().sum() == 0, "merge contains nas" + # point evaluation + point_fcsts_cols = ["unique_id", "ds", "y"] + models + test_df["unique_id"] = test_df["unique_id"].astype(str) + train_df["unique_id"] = train_df["unique_id"].astype(str) + mase_seas = partial(mase, seasonality=self.seasonality) + eval_df = evaluate( + test_df[point_fcsts_cols], + train_df=train_df, + metrics=[mape, mase_seas], + ) + # probabilistic evaluation + eval_prob_df = [] + for q in self.quantiles: + prob_cols = [f"{model}-q-{q}" for model in models] + eval_q_df = quantile_loss(test_df, models=prob_cols, q=q) + eval_q_df[prob_cols] = eval_q_df[prob_cols] * self.horizon + eval_q_df = eval_q_df.rename(columns=dict(zip(prob_cols, models))) + eval_q_df["metric"] = f"quantile-loss-{q}" + eval_prob_df.append(eval_q_df) + eval_prob_df = pd.concat(eval_prob_df) + eval_prob_df = eval_prob_df.groupby("metric").sum().reset_index() + total_y = test_df["y"].sum() + eval_prob_df[models] = eval_prob_df[models] / total_y + eval_prob_df["metric"] = "scaled_crps" + eval_df = pd.concat([eval_df, eval_prob_df]).reset_index(drop=True) + eval_df = eval_df.groupby("metric").mean(numeric_only=True).reset_index() + eval_df = eval_df.melt(id_vars="metric", value_name="value", var_name="model") + times_df.insert(0, "metric", "time") + times_df = times_df.rename(columns={"time": "value"}) + eval_df = pd.concat([eval_df, times_df]) + eval_df.insert(0, "dataset", self.dataset) + eval_df = eval_df.sort_values(["dataset", "metric", "model"]) + eval_df = eval_df.reset_index(drop=True) + return eval_df