diff --git a/experiments/prophet/Makefile b/experiments/prophet/Makefile new file mode 100644 index 00000000..e5f1810e --- /dev/null +++ b/experiments/prophet/Makefile @@ -0,0 +1,21 @@ +SRC_DIR := data +EXCLUDE_STRINGS := catalogue +TS_FILES := $(filter-out $(wildcard $(SRC_DIR)/*$(foreach str,$(EXCLUDE_STRINGS),*$(str)*)), $(wildcard $(SRC_DIR)/*.parquet)) + +evaluate: .require-method + @echo "Evaluation for $${method}..." + @for file in $(TS_FILES); do \ + echo $$file; \ + python -m src.$${method}_exp --file $$file; \ + done + @echo "Evaluation for $${method} complete." + +summarize_results: + @echo "Summarize results..." + @python -m src.results_summary --dir ./data/results/ + @echo "Summarize results complete." + +.require-method: +ifndef method + $(error method is required) +endif diff --git a/experiments/prophet/README.md b/experiments/prophet/README.md new file mode 100644 index 00000000..288813f3 --- /dev/null +++ b/experiments/prophet/README.md @@ -0,0 +1,108 @@ +# TimeGPT vs Prophet: Time Series Forecasting Benchmark + +## Overview + +This repository offers a detailed benchmarking framework for comparing the performance of TimeGPT against Prophet and StatsForecast in time series forecasting. We provide datasets with over 300,000 series across various frequencies, including daily, weekly, 10-minute, and hourly intervals. Users can also incorporate their own datasets for a more personalized analysis. **TimeGPT was not trained on this datasets.** + + +## Notes + +- Results were generated using a VM with 96 cores and 196 GB of RAM. +- Prophet and StatsForecast was executed in paralell. +- TimeGPT uses the AzureML endpoint. +- Since the AzureML endpoint does not support GPU and scalable requests, the results can improve. + +## Repository Structure + +- `/data`: Parquet files with time series data. +- `/src`: Source code for running benchmarks and experiments. +- `/data/results`: Outputs and analysis from benchmark runs. + +## Data Structure + +Datasets should adhere to this structure: + +- **unique_id**: Identifier for each series. +- **ds**: Timestamp of observation. +- **y**: Target variable for forecasting. +- **frequency**: Description of data frequency (e.g., 'Daily'). +- **pandas_frequency**: Pandas frequency string (e.g., 'D'). +- **h**: Forecasting horizon. (The last `h` periods of each series will be used as test.) +- **seasonality**: Seasonality of the series (e.g., 7 for daily). + +## Running Experiments + +### Makefile + +The repository includes a Makefile to streamline the process of running experiments. The key commands are: + +1. **evaluate**: Runs the evaluation for a specified method (`timegpt`, `prophet`, or `statsforecast`). +2. **summarize_results**: Summarizes the results from the evaluation. + +### Evaluation Flow + +1. **Run Evaluation**: Use `make evaluate method=` where `` is either `timegpt`, `prophet`, or `statsforecast`. The script filters out files containing specific strings (like 'catalogue') and runs the experiment for each `.parquet` file in the `/data` directory. The results will be written in `/data/results`. + +2. **Summarize Results**: After running evaluations for each method, execute `make summarize_results` to aggregate and summarize the results, which will be written in this `README.md` file. + +## Getting Started + +1. **Prepare Data**: Ensure your Parquet files are in `/data`. +2. **Create conda environment**: Run `conda env create -f environment.yml` and activate the environment using `conda activate timegpt-benchmark`. +3. **Run Benchmarks**: Use the Makefile commands to run evaluations and summarize results. + + +## Results + + +### Data Description + +| file | frequency | n_series | mean | std | min_length | max_length | n_obs | +|:---------------|:------------|-----------:|----------:|-----------:|-------------:|-------------:|------------:| +| 10Minutely_10T | 10Minutely | 17 | 2.919 | 6.095 | 3,000 | 3,000 | 51,000 | +| 30Minutely_30T | 30Minutely | 556 | 0.233 | 0.329 | 3,000 | 3,000 | 1,668,000 | +| Daily_D | Daily | 103,529 | 178.763 | 5,825.784 | 14 | 3,000 | 251,217,667 | +| Hourly_H | Hourly | 227 | 635.332 | 4,425.693 | 748 | 3,000 | 590,653 | +| Minutely_T | Minutely | 34 | 44.612 | 106.121 | 3,000 | 3,000 | 102,000 | +| Monthly_MS | Monthly | 97,588 | 4,280.461 | 72,939.696 | 24 | 1,456 | 9,116,399 | +| Quarterly_QS | Quarterly | 2,539 | 4,722.366 | 9,521.907 | 18 | 745 | 253,160 | +| Weekly_W-MON | Weekly | 98,144 | 1,388.030 | 99,852.095 | 9 | 399 | 35,096,195 | + +### Performance + + +| file | metric | TimeGPT | Prophet | SeasonalNaive | +|:---------------|:---------|----------:|----------:|----------------:| +| 10Minutely_10T | mae | **0.976** | 2.758 | 1.0 | +| 10Minutely_10T | rmse | **0.764** | 2.005 | 1.0 | +| 10Minutely_10T | time | **0.147** | 0.565 | 1.0 | +|----------------|----------|-----------|-----------|-----------------| +| 30Minutely_30T | mae | **0.6** | 0.661 | 1.0 | +| 30Minutely_30T | rmse | **0.652** | 0.687 | 1.0 | +| 30Minutely_30T | time | **0.318** | 7.498 | 1.0 | +|----------------|----------|-----------|-----------|-----------------| +| Daily_D | mae | **0.802** | 1.699 | 1.0 | +| Daily_D | rmse | **0.78** | 1.479 | 1.0 | +| Daily_D | time | **0.544** | 48.019 | 1.0 | +|----------------|----------|-----------|-----------|-----------------| +| Hourly_H | mae | **0.855** | 1.124 | 1.0 | +| Hourly_H | rmse | **0.881** | 1.048 | 1.0 | +| Hourly_H | time | **0.134** | 3.426 | 1.0 | +|----------------|----------|-----------|-----------|-----------------| +| Minutely_T | mae | **0.732** | 1.349 | 1.0 | +| Minutely_T | rmse | **0.705** | 1.207 | 1.0 | +| Minutely_T | time | **0.088** | 0.786 | 1.0 | +|----------------|----------|-----------|-----------|-----------------| +| Monthly_MS | mae | **0.728** | 1.41 | 1.0 | +| Monthly_MS | rmse | **0.686** | 1.196 | 1.0 | +| Monthly_MS | time | 7.02 | 118.178 | **1.0** | +|----------------|----------|-----------|-----------|-----------------| +| Quarterly_QS | mae | **0.966** | 1.384 | 1.0 | +| Quarterly_QS | rmse | **0.974** | 1.313 | 1.0 | +| Quarterly_QS | time | 1.218 | 18.685 | **1.0** | +|----------------|----------|-----------|-----------|-----------------| +| Weekly_W-MON | mae | **0.878** | 2.748 | 1.0 | +| Weekly_W-MON | rmse | **0.878** | 2.748 | 1.0 | +| Weekly_W-MON | time | 12.489 | 85.611 | **1.0** | +|----------------|----------|-----------|-----------|-----------------| + diff --git a/experiments/prophet/environment.yml b/experiments/prophet/environment.yml new file mode 100644 index 00000000..b4f86021 --- /dev/null +++ b/experiments/prophet/environment.yml @@ -0,0 +1,16 @@ +name: timegpt-benchmark +channels: + - conda-forge +dependencies: + - jupyterlab + - prophet + - pyspark>=3.3 + - python=3.10 + - pip: + - fire + - nixtlats + - python-dotenv + - statsforecast + - utilsforecast + - tabulate + diff --git a/experiments/prophet/src/prophet_exp.py b/experiments/prophet/src/prophet_exp.py new file mode 100644 index 00000000..fd6bb7c9 --- /dev/null +++ b/experiments/prophet/src/prophet_exp.py @@ -0,0 +1,187 @@ +from concurrent.futures import ThreadPoolExecutor +from copy import deepcopy +from time import time +from typing import Optional + +import fire +import numpy as np +import pandas as pd +from prophet import Prophet as _Prophet +from utilsforecast.processing import ( + backtest_splits, + drop_index_if_pandas, + join, + maybe_compute_sort_indices, + take_rows, + vertical_concat, +) + +from src.tools import ExperimentHandler + + +class ParallelForecaster: + def _process_group(self, func, df, **kwargs): + uid = df["unique_id"].iloc[0] + _df = df.drop("unique_id", axis=1) + res_df = func(_df, **kwargs) + res_df.insert(0, "unique_id", uid) + return res_df + + def _apply_parallel(self, df_grouped, func, **kwargs): + results = [] + with ThreadPoolExecutor(max_workers=None) as executor: + futures = [ + executor.submit(self._process_group, func, df, **kwargs) + for _, df in df_grouped + ] + for future in futures: + results.append(future.result()) + return pd.concat(results) + + def forecast( + self, + df: pd.DataFrame, + h: int, + X_df: Optional[pd.DataFrame] = None, + ): + df_grouped = df.groupby("unique_id") + return self._apply_parallel( + df_grouped, + self._local_forecast, + h=h, + ) + + def cross_validation( + self, + df: pd.DataFrame, + h: int, + n_windows: int = 1, + step_size: Optional[int] = None, + **kwargs, + ): + df_grouped = df.groupby("unique_id") + kwargs = {"h": h, "n_windows": n_windows, "step_size": step_size, **kwargs} + return self._apply_parallel( + df_grouped, + self._local_cross_validation, + **kwargs, + ) + + +class Prophet(_Prophet, ParallelForecaster): + def __init__( + self, + freq: str, + alias: str = "Prophet", + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.freq = freq + self.alias = alias + + def _local_forecast( + self, + df: pd.DataFrame, + h: int, + X_df: Optional[pd.DataFrame] = None, + ) -> pd.DataFrame: + model = deepcopy(self) + model.fit(df=df) + future_df = model.make_future_dataframe( + periods=h, include_history=False, freq=self.freq + ) + if X_df is not None: + future_df = future_df.merge(X_df, how="left") + np.random.seed(1000) + fcst_df = model.predict(future_df) + fcst_df = fcst_df.rename({"yhat": self.alias}, axis=1) + fcst_df = fcst_df[["ds", self.alias]] + return fcst_df + + def _local_cross_validation( + self, + df: pd.DataFrame, + h: int, + n_windows: int = 1, + step_size: Optional[int] = None, + ) -> pd.DataFrame: + df = df.copy() + df["ds"] = pd.to_datetime(df["ds"]) + df.insert(0, "unique_id", "ts_0") + # mlforecast cv code + results = [] + sort_idxs = maybe_compute_sort_indices(df, "unique_id", "ds") + if sort_idxs is not None: + df = take_rows(df, sort_idxs) + splits = backtest_splits( + df, + n_windows=n_windows, + h=h, + id_col="unique_id", + time_col="ds", + freq=pd.tseries.frequencies.to_offset(self.freq), + step_size=h if step_size is None else step_size, + ) + for i_window, (cutoffs, train, valid) in enumerate(splits): + if len(valid.columns) > 3: + # if we have uid, ds, y + exogenous vars + train_future = valid.drop(columns="y") + else: + train_future = None + y_pred = self._local_forecast( + df=train[["ds", "y"]], + h=h, + X_df=train_future, + ) + y_pred.insert(0, "unique_id", "ts_0") + y_pred = join(y_pred, cutoffs, on="unique_id", how="left") + result = join( + valid[["unique_id", "ds", "y"]], + y_pred, + on=["unique_id", "ds"], + ) + if result.shape[0] < valid.shape[0]: + raise ValueError( + "Cross validation result produced less results than expected. " + "Please verify that the frequency parameter (freq) matches your series' " + "and that there aren't any missing periods." + ) + results.append(result) + out = vertical_concat(results) + out = drop_index_if_pandas(out) + first_out_cols = ["unique_id", "ds", "cutoff", "y"] + remaining_cols = [c for c in out.columns if c not in first_out_cols] + fcst_cv_df = out[first_out_cols + remaining_cols] + return fcst_cv_df.drop(columns="unique_id") + + +def evaluate_experiment(file: str): + exp_handler = ExperimentHandler(file=file, method="prophet") + Y_df, freq, pandas_freq, h, seasonality = exp_handler.read_data() + model_name = "Prophet" + print(model_name) + prophet = Prophet(freq=pandas_freq) + start = time() + Y_hat_df = prophet.cross_validation( + df=Y_df, + h=h, + n_windows=1, + ) + total_time = time() - start + print(total_time) + # evaluation + eval_df, total_time_df = exp_handler.evaluate_model( + Y_hat_df=Y_hat_df, + model_name=model_name, + total_time=total_time, + ) + exp_handler.save_results( + freq=freq, + eval_df=eval_df, + total_time_df=total_time_df, + ) + + +if __name__ == "__main__": + fire.Fire(evaluate_experiment) diff --git a/experiments/prophet/src/results_summary.py b/experiments/prophet/src/results_summary.py new file mode 100644 index 00000000..384e8f5c --- /dev/null +++ b/experiments/prophet/src/results_summary.py @@ -0,0 +1,119 @@ +from pathlib import Path + +import fire +from numpy import column_stack +import pandas as pd + + +def read_kind_results(kind: str, dir: str): + files = list(Path(dir).rglob(f"*{kind}.parquet")) + df = pd.concat( + [pd.read_parquet(file).assign(file=str(file).split("/")[-2]) for file in files], + ignore_index=True, + ) + return df + + +def summarize_results_per_file(metrics_df: pd.DataFrame): + metrics_df_per_freq = metrics_df.groupby(["file", "metric", "model"]).mean( + numeric_only=True + ) + metrics_df_per_freq = metrics_df_per_freq.reset_index() + metrics_df_per_freq = metrics_df_per_freq.query( + "model in ['Prophet', 'SeasonalNaive', 'TimeGPT']" + ) + models = metrics_df_per_freq["model"].unique() + metrics_df_per_freq = pd.pivot( + metrics_df_per_freq, + index=["file", "metric"], + columns="model", + values="value", + ).reset_index() + for model in models: + if model == "SeasonalNaive": + continue + metrics_df_per_freq[model] /= metrics_df_per_freq["SeasonalNaive"] + metrics_df_per_freq["SeasonalNaive"] /= metrics_df_per_freq["SeasonalNaive"] + return metrics_df_per_freq + + +def prepare_results(df: pd.DataFrame): + def bold_best(row): + row = row.round(3) + models = row.drop(columns=["file", "metric"]).columns + best_model = row[models].idxmin(axis=1).item() + row[best_model] = "**" + str(row[best_model].item()) + "**" + return row + + df_bolded = df.groupby(["file", "metric"]).apply(bold_best) + df_bolded = df_bolded.reset_index(drop=True) + return df_bolded + + +def write_to_readme(content: str): + with open("README.md", "r") as file: + readme_content = file.readlines() + start_index = -1 + end_index = -1 + for i, line in enumerate(readme_content): + if line.strip().lower() == "## results": + start_index = i + 1 + if start_index != -1 and line.strip() == "": + end_index = i + break + + if start_index != -1 and end_index != -1: + readme_content = ( + readme_content[: start_index + 1] + + [content + "\n"] + + readme_content[end_index:] + ) + else: + print("Results section not found or improperly formatted") + + # Write the changes back to the README + with open("README.md", "w") as file: + file.writelines(readme_content) + + +def summarize_results(dir: str): + metrics_df = read_kind_results("metrics", dir) + summary_df = read_kind_results("summary", dir) + summary_df = ( + summary_df.set_index(["file", "frequency"]) + .reset_index() + .round(3) + .sort_values("frequency") + ) + no_int_cols = ["file", "frequency", "mean", "std"] + for col in summary_df.columns: + if col not in no_int_cols: + summary_df[col] = summary_df[col].astype(int) + summary_df = summary_df.to_markdown(index=False, intfmt=",", floatfmt=",.3f") + time_df = read_kind_results("time", dir) + time_df = time_df.assign(metric="time").rename(columns={"time": "value"}) + metrics_df_per_file = summarize_results_per_file(metrics_df) + time_df = summarize_results_per_file(time_df) + eval_df = pd.concat([metrics_df_per_file, time_df], ignore_index=True) + eval_df = prepare_results(eval_df)[ + ["file", "metric", "TimeGPT", "Prophet", "SeasonalNaive"] + ] + n_files = eval_df["file"].nunique() + eval_df = eval_df.to_markdown( + index=False, + colalign=2 * ["left"] + (eval_df.shape[1] - 2) * ["right"], + ) + markdown_lines = eval_df.split("\n") + custom_separator = markdown_lines[1].replace(":", "-") + for i in range(4, len(markdown_lines) + n_files - 1, 4): + markdown_lines.insert(i + 1, custom_separator) + markdown_lines.insert( + 0, + ("\n### Data Description\n\n" f"{summary_df}\n\n" "### Performance\n\n"), + ) + eval_df = "\n".join(markdown_lines) + write_to_readme(eval_df) + + +if __name__ == "__main__": + fire.Fire(summarize_results) diff --git a/experiments/prophet/src/statsforecast_exp.py b/experiments/prophet/src/statsforecast_exp.py new file mode 100644 index 00000000..c596c6b3 --- /dev/null +++ b/experiments/prophet/src/statsforecast_exp.py @@ -0,0 +1,57 @@ +from time import time + +import fire +import pandas as pd +from statsforecast import StatsForecast +from statsforecast.models import SeasonalNaive, ZeroModel + +from src.tools import ExperimentHandler + + +def evaluate_experiment(file: str): + exp_handler = ExperimentHandler(file=file, method="statsforecast") + Y_df, freq, pandas_freq, h, seasonality = exp_handler.read_data() + models = [ + SeasonalNaive(season_length=seasonality), + ZeroModel(), + ] + # even though statsforecast can handle multiple models, we only use one + # at a time to calculate time for each + eval_df = [] + total_time_df = [] + for model in models: + model_name = repr(model) + print(model_name) + sf = StatsForecast( + models=[model], + freq=pandas_freq, + n_jobs=-1, + ) + start = time() + Y_hat_df_model = sf.cross_validation( + df=Y_df, + h=h, + n_windows=1, + ).reset_index() + total_time = time() - start + print(total_time) + # evaluation + eval_df_model, total_time_df_model = exp_handler.evaluate_model( + Y_hat_df=Y_hat_df_model, + model_name=model_name, + total_time=total_time, + ) + eval_df.append(eval_df_model.set_index(["metric", "unique_id"])) + total_time_df.append(total_time_df_model) + eval_df = pd.concat(eval_df, axis=1).reset_index() + total_time_df = pd.concat(total_time_df) + exp_handler.save_results( + freq=freq, + eval_df=eval_df, + total_time_df=total_time_df, + df=Y_df, + ) + + +if __name__ == "__main__": + fire.Fire(evaluate_experiment) diff --git a/experiments/prophet/src/timegpt_exp.py b/experiments/prophet/src/timegpt_exp.py new file mode 100644 index 00000000..d363d0b9 --- /dev/null +++ b/experiments/prophet/src/timegpt_exp.py @@ -0,0 +1,53 @@ +import sys +from time import time + +import fire +from dotenv import load_dotenv +from nixtlats import TimeGPT + +from src.tools import ExperimentHandler + +load_dotenv() + + +def evaluate_experiment(file: str): + exp_handler = ExperimentHandler(file=file, method="timegpt") + model_name = "TimeGPT" + print(model_name) + # timegpt does not need the full history to + # make zero shot predictions + Y_df, freq, pandas_freq, h, seasonality = exp_handler.read_data( + max_insample_length=300 + ) + size_df = sys.getsizeof(Y_df) / (1024 * 1024) + max_partition_size_mb = 20 + num_partitions = int(size_df / max_partition_size_mb) + 1 + timegpt = TimeGPT( + environment="https://timegpt-endpoint.eastus.inference.ml.azure.com/", + max_retries=1, + ) + start = time() + Y_hat_df = timegpt.cross_validation( + df=Y_df, + h=h, + n_windows=1, + freq=pandas_freq, + num_partitions=num_partitions, + ) + total_time = time() - start + print(total_time) + # evaluation + eval_df, total_time_df = exp_handler.evaluate_model( + Y_hat_df=Y_hat_df, + model_name=model_name, + total_time=total_time, + ) + exp_handler.save_results( + freq=freq, + eval_df=eval_df, + total_time_df=total_time_df, + ) + + +if __name__ == "__main__": + fire.Fire(evaluate_experiment) diff --git a/experiments/prophet/src/tools.py b/experiments/prophet/src/tools.py new file mode 100644 index 00000000..ee900f46 --- /dev/null +++ b/experiments/prophet/src/tools.py @@ -0,0 +1,106 @@ +import os +from typing import Optional, Tuple + +import pandas as pd +from utilsforecast.evaluation import evaluate +from utilsforecast.losses import mae, rmse + + +class ExperimentHandler: + def __init__(self, file: str, method: str): + self.file = file + self.method = method + + @staticmethod + def get_parameter(parameter: str, df: pd.DataFrame): + parameter = df[parameter].unique() + if len(parameter) > 1: + raise ValueError(f"{parameter} is not unique: {parameter}") + return parameter[0] + + def read_data( + self, + max_insample_length: int = 3_000, + ) -> Tuple[pd.DataFrame, str, str, int, int]: + df = pd.read_parquet(self.file) + Y_df = df[["unique_id", "ds", "y"]].drop_duplicates(["unique_id", "ds"]) + Y_df = Y_df.sort_values(["unique_id", "ds"]) + Y_df = Y_df.groupby("unique_id").tail( + max_insample_length + ) # take only last 3_000 rows + Y_df["ds"] = Y_df["ds"].str.replace(":01$", ":00", regex=True) + freq = self.get_parameter("frequency", df) + pandas_freq = self.get_parameter("pandas_frequency", df) + h = self.get_parameter("horizon", df) + seasonality = self.get_parameter("seasonality", df) + return Y_df, freq, pandas_freq, int(h), int(seasonality) + + def evaluate_model( + self, + Y_hat_df: pd.DataFrame, + model_name: str, + total_time: float, + ): + if "cutoff" in Y_hat_df.columns: + Y_hat_df = Y_hat_df.drop(columns="cutoff") + eval_df = evaluate( + df=Y_hat_df, + metrics=[rmse, mae], + ) + total_time_df = pd.DataFrame({"model": [model_name], "time": [total_time]}) + return eval_df, total_time_df + + @staticmethod + def summarize_df(df: pd.DataFrame): + n_unique_ids = df["unique_id"].nunique() + mean_y = df["y"].mean() + std_y = df["y"].std() + lengths = df.groupby("unique_id").size() + min_length = lengths.min() + max_length = lengths.max() + n_obs = len(df) + summary = { + "n_series": n_unique_ids, + "mean": mean_y, + "std": std_y, + "min_length": min_length, + "max_length": max_length, + "n_obs": n_obs, + } + summary_df = pd.DataFrame.from_dict(summary, orient="index") + summary_df = summary_df.transpose() + return summary_df + + def save_results( + self, + freq: str, + eval_df: pd.DataFrame, + total_time_df: pd.DataFrame, + df: Optional[pd.DataFrame] = None, + ): + eval_df["frequency"] = freq + eval_df = eval_df.melt( + id_vars=["frequency", "metric", "unique_id"], + var_name="model", + value_name="value", + ) + total_time_df["frequency"] = freq + dir = self.file.split("/")[-1].replace(".parquet", "") + dir = f"./data/results/{dir}" + os.makedirs(dir, exist_ok=True) + eval_df.to_parquet( + f"{dir}/{self.method}_metrics.parquet", + index=False, + ) + total_time_df.to_parquet( + f"{dir}/{self.method}_time.parquet", + index=False, + ) + if df is not None: + summary_df = self.summarize_df(df) + summary_df["frequency"] = freq + print(summary_df) + summary_df.to_parquet( + f"{dir}/series_summary.parquet", + index=False, + ) diff --git a/experiments/prophet/src/utils.py b/experiments/prophet/src/utils.py new file mode 100644 index 00000000..c01e25a4 --- /dev/null +++ b/experiments/prophet/src/utils.py @@ -0,0 +1,55 @@ +from concurrent.futures import ThreadPoolExecutor +import pandas as pd + + +def read_parquet_and_assign(uid, url): + df = pd.read_parquet(url) + df["unique_id"] = uid + df["ds"] = df["ds"].astype(str) + return df[["unique_id", "ds", "y"]] + + +def download_data(): + catalogue_splits = pd.read_parquet("./data/catalogue_splits.parquet") + catalogue_datasets = pd.read_parquet("./data/catalogue_datasets.parquet") + catalogue_df = catalogue_splits.merge( + catalogue_datasets, + on=["dataset", "subdataset", "frequency"], + ) + del catalogue_splits + del catalogue_datasets + catalogue_df = catalogue_df.query("split == 'test'")[ + [ + "unique_id", + "frequency", + "url", + "pandas_frequency", + "seasonality", + "horizon", + ] + ] + grouped_df = catalogue_df.groupby(["frequency", "pandas_frequency"]) + for (frequency, pandas_frequency), df in grouped_df: + uids, urls = df["unique_id"].values, df["url"].values + print(f"frequency: {frequency}, pandas_frequency: {pandas_frequency}") + print(f"number of uids: {len(uids)}") + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(read_parquet_and_assign, uid, url) + for uid, url in zip(uids, urls) + ] + results = [future.result() for future in futures] + print("dataset read") + Y_df = pd.concat(results) + Y_df = Y_df.merge( + df.drop(columns="url"), + on="unique_id", + how="left", + ) + print(Y_df) + Y_df.to_parquet(f"./data/{frequency}_{pandas_frequency}.parquet") + del Y_df + + +if __name__ == "__main__": + download_data()