diff --git a/experiments/azure-automl-forecasting/.env.example b/experiments/azure-automl-forecasting/.env.example new file mode 100644 index 00000000..0bfdf440 --- /dev/null +++ b/experiments/azure-automl-forecasting/.env.example @@ -0,0 +1,5 @@ +AZURE_SUBSCRIPTION_ID= +AZURE_RESOURCE_GROUP= +AZURE_WORKSPACE_NAME= +TIMEGPT_TOKEN= + diff --git a/experiments/azure-automl-forecasting/Makefile b/experiments/azure-automl-forecasting/Makefile new file mode 100644 index 00000000..f6b10086 --- /dev/null +++ b/experiments/azure-automl-forecasting/Makefile @@ -0,0 +1,39 @@ +TS_FILES := Hourly_H.parquet Daily_D.parquet Weekly_W-MON.parquet Monthly_MS.parquet +FILTERED_TS_FILES := $(patsubst %,./data/filtered_datasets/%,$(TS_FILES)) + +filter_data: + @for file in $(TS_FILES); do \ + python -m src.utils.filter_data --dataset_path ./data/$$file; \ + done + +run_timegpt: .require-dataset_path + @echo Running TimeGPT with dataset_path=$(dataset_path) + @python -m src.nixtla_timegpt --dataset_path $(dataset_path) + +run_sn: .require-dataset_path + @echo Running SN with dataset_path=$(dataset_path) + @python -m src.statsforecast_sn --dataset_path $(dataset_path) + +run_automl: .require-dataset_path + @echo Running AutoML with dataset_path=$(dataset_path) + @python -m src.azure_automl.forecasting --dataset_path $(dataset_path) + +run_methods: + @for file in $(TS_FILES); do \ + echo "Running methods for $$file"; \ + $(MAKE) run_timegpt dataset_path=./data/filtered_datasets/$$file; \ + $(MAKE) run_sn dataset_path=./data/filtered_datasets/$$file; \ + $(MAKE) run_automl dataset_path=./data/filtered_datasets/$$file; \ + done + +download_automl_forecasts: + @python -m src.azure_automl.download_forecasts + +evaluate_experiments: + @python -m src.evaluation --datasets_paths "$(shell echo $(FILTERED_TS_FILES) | tr ' ' ',')" + +.require-dataset_path: +ifndef dataset_path + $(error dataset_path is required) +endif + diff --git a/experiments/azure-automl-forecasting/README.md b/experiments/azure-automl-forecasting/README.md new file mode 100644 index 00000000..a86c9e3d --- /dev/null +++ b/experiments/azure-automl-forecasting/README.md @@ -0,0 +1,75 @@ +# Nixtla TimeGPT vs. Azure AutoML: A Comprehensive Performance Analysis + +This experiment evaluates the performance of **Nixtla TimeGPT's zero-shot inference** against **Microsoft's Azure AutoML** in the domain of time series forecasting. Our analysis shows that TimeGPT **surpasses Azure AutoML by 12%, 12%, and 10% in MAE, RMSE, and MASE metrics** and has **300x improvement in computational efficiency**. This evaluation spanned over 3,000 distinct time series across various data frequencies, with considerations for Azure AutoML's cost constraints. + +# Introduction + +[Azure AutoML](https://learn.microsoft.com/en-us/azure/machine-learning/concept-automl-forecasting-methods?view=azureml-api-2), a product of Microsoft, offers a robust automated machine-learning solution that caters to a wide array of predictive tasks, including time series forecasting. TimeGPT is a foundational model for time series forecasting that can be accessed [through an API](https://docs.nixtla.io/). While Azure AutoML is known for its adaptability and ease of use, our findings reveal that TimeGPT offers superior accuracy and efficiency, especially in the context of time series data. + +## Empirical Evaluation + +Our study involved a detailed comparison of both models across various datasets, including Hourly, Daily, Weekly, and Monthly data frequencies. The datasets were chosen from the test set of the [TimeGPT-1 paper](https://arxiv.org/abs/2310.03589), ensuring a diverse set of time series for evaluation. The selection process was designed to manage computational complexity and adhere to Azure AutoML's dataset size requirements, with a cap of 3,000 observations to maintain cost-effectiveness. + +## Results + +The following table shows the main findings of our analysis, presenting a comparison of performance metrics (MASE, MAE, RMSE) and computational time (in seconds) across different datasets. The best results are highlighted in **bold** for clarity. + +image + + +## Reproducibility + +All experiments were conducted in controlled environments to uphold the integrity and reproducibility of our results. TimeGPT evaluations were performed using a 2020 MacBook Air with an M1 chip, ensuring accessibility and practicality. In contrast, Azure AutoML experiments were carried out on a cluster of 11 STANDARD_DS5_V2 virtual machines equipped with substantial computational resources to showcase its scalability and power. + +### Instructions + +1. Configure Azure AutoML according to the official Microsoft documentation. +2. Set the environment variables in a `.env` file using `.env.example` as example. +3. Set up a conda environment using: + +```bash +mamba create -n azure-automl-fcst python=3.10 +conda activate azure-automl-fcst +pip install uv +uv pip install -r requirements.txt +``` + +4. Download the data using + +```python +python -m src.utils.download_data +``` + +If you're interested in replicating the results, write us at `ops@nixtla.io` to give you access to the data. + +5. Filter the datasets to prevent AzureML from crashing + +``` +make filter_data +``` + +6. Run the forecasting tasks for TimeGPT, SeasonalNaive, and AzureAutoML using the following: + +``` +make run_methods +``` + +Notice that AzureAutoML will send the job to the predefined cluster. + +7. Retrieve AzureAutoML forecasts once they are ready: + +``` +make download_automl_forecasts +``` + +8. Run evaluation + +``` +make evaluate_experiments +``` + + +### References +- [TimeGPT 1](https://arxiv.org/abs/2310.03589) +- [StatsForecast](https://github.com/Nixtla/statsforecast/) +- [Distributed AzureAutoML for forecasting](https://github.com/Azure/azureml-examples/blob/main/sdk/python/jobs/pipelines/1k_demand_forecasting_with_pipeline_components/automl-forecasting-demand-many-models-in-pipeline/automl-forecasting-demand-many-models-in-pipeline.ipynb) diff --git a/experiments/azure-automl-forecasting/requirements.txt b/experiments/azure-automl-forecasting/requirements.txt new file mode 100644 index 00000000..56d9b64e --- /dev/null +++ b/experiments/azure-automl-forecasting/requirements.txt @@ -0,0 +1,11 @@ +azure-ai-ml +azure-identity +azureml-core +fire +mltable +nixtlats +pandas +python-dotenv +rich +statsforecast +utilsforecast diff --git a/experiments/azure-automl-forecasting/src/azure_automl/__init__.py b/experiments/azure-automl-forecasting/src/azure_automl/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/experiments/azure-automl-forecasting/src/azure_automl/automl_handler.py b/experiments/azure-automl-forecasting/src/azure_automl/automl_handler.py new file mode 100644 index 00000000..a69d2ecb --- /dev/null +++ b/experiments/azure-automl-forecasting/src/azure_automl/automl_handler.py @@ -0,0 +1,253 @@ +import json +import logging +import os +import yaml +from pathlib import Path +from tempfile import TemporaryDirectory + +import numpy as np +import pandas as pd +from azure.ai.ml import Input +from azure.ai.ml import MLClient +from azure.ai.ml.constants import AssetTypes +from azure.ai.ml.dsl import pipeline +from azure.ai.ml.entities import AmlCompute, Job +from azure.identity import DefaultAzureCredential +from dotenv import load_dotenv + +load_dotenv() +logging.basicConfig(level=logging.INFO) +main_logger = logging.getLogger(__name__) + +loggers = logging.Logger.manager.loggerDict +for logger_name in loggers: + if logger_name.startswith("azure"): + logger = logging.getLogger(logger_name) + logger.disabled = True + logger.propagate = False + + +def str_to_datetime(date_str: str) -> pd.Timestamp: + return pd.Timestamp(date_str) + + +def df_to_parquet_azureml_input(df: pd.DataFrame, dir: str) -> Input: + series_path = Path(dir) / "series.parquet" + df.to_parquet(series_path, index=False) + table_data_input = Input(type=AssetTypes.URI_FOLDER, path=dir) + return table_data_input + + +def config_to_yaml_azureml_input(config: dict, dir: str) -> Input: + config_path = Path(dir) / "config.yaml" + with open(config_path, "w") as f: + yaml.dump(config, f) + config = Input(type="uri_file", path=str(config_path)) + return config + + +class AzureAutoML: + """ + Before using this class, you need to login to Azure. + Use the following command to login: + $ az login + """ + + def __init__( + self, + subscription_id: str, + resource_group_name: str, + workspace_name: str, + ): + self.subscription_id = subscription_id + self.resource_group_name = resource_group_name + self.workspace_name = workspace_name + + @classmethod + def from_environment(cls) -> "AzureAutoML": + return cls( + subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"], + resource_group_name=os.environ["AZURE_RESOURCE_GROUP"], + workspace_name=os.environ["AZURE_WORKSPACE_NAME"], + ) + + def get_ml_client(self, registry_name: str | None = None) -> MLClient: + kwargs = {} + if not registry_name: + kwargs["workspace_name"] = self.workspace_name + else: + kwargs["registry_name"] = registry_name + credential = DefaultAzureCredential(exclude_managed_identity_credential=True) + ml_client = MLClient( + credential=credential, + subscription_id=self.subscription_id, + resource_group_name=self.resource_group_name, + **kwargs, + ) + return ml_client + + def get_train_and_inference_components(self) -> tuple: + ml_client_reqistry = self.get_ml_client("azureml") + train_component = ml_client_reqistry.components.get( + name="automl_many_models_training", + label="latest", + ) + inference_component = ml_client_reqistry.components.get( + name="automl_many_models_inference", + label="latest", + ) + return train_component, inference_component + + def forecast( + self, + df: pd.DataFrame, + df_test: pd.DataFrame, + aml_compute: AmlCompute, + h: int, + freq: str, + id_col: str = "unique_id", + time_col: str = "ds", + target_col: str = "y", + primary_metric: str = "normalized_root_mean_squared_error", + n_cross_validations: str | int = "auto", + experiment_name: str | None = None, + begin_create_or_update_aml_compute: bool = False, + max_trials: int = 25, + enable_early_stopping: bool = True, + max_nodes: int = 1, + max_concurrency_per_node: int = 1, + forecast_mode: str = "rolling", + retrain_failed_model: bool = False, + ) -> str: + if experiment_name is None: + random_id = np.random.randint(10000, 99999) + experiment_name = f"automl-forecasting-job-{random_id}" + ml_client = self.get_ml_client() + train_component, inference_component = self.get_train_and_inference_components() + automl_config_dict = dict( + task="forecasting", + forecast_horizon=h, + forecast_step=h, + frequency=freq, + time_series_id_column_names=id_col, + partition_column_names=[id_col], + time_column_name=time_col, + label_column_name=target_col, + primary_metric=primary_metric, + n_cross_validations=n_cross_validations, + max_trials=max_trials, + enable_early_stopping=enable_early_stopping, + track_child_runs=False, + allow_multi_partitions=False, + # allowed_training_algorithms=["Naive"], + ) + + @pipeline(description="pipeline for automl forecasting") + def forecasting_pipeline( + training_data: Input, + test_data: Input, + automl_config: Input, + compute_name: str, + ): + # training node + training_node = train_component( + raw_data=training_data, + automl_config=automl_config, + max_concurrency_per_node=max_concurrency_per_node, + max_nodes=max_nodes, + retrain_failed_model=retrain_failed_model, + compute_name=compute_name, + ) + # inference node + inference_node = inference_component( + raw_data=test_data, + max_nodes=max_nodes, + max_concurrency_per_node=max_concurrency_per_node, + optional_train_metadata=training_node.outputs.run_output, + forecast_mode=forecast_mode, + forecast_step=h, + compute_name=compute_name, + ) + return {"forecast_output": inference_node.outputs.raw_predictions} + + if begin_create_or_update_aml_compute: + main_logger.info("Begin create or update aml compute") + ml_client.compute.begin_create_or_update(aml_compute).result() + + cwd = Path.cwd() + with TemporaryDirectory(dir=cwd) as tmp_dir, TemporaryDirectory( + dir=cwd + ) as tmp_dir_test, TemporaryDirectory(dir=cwd) as tmp_dir_config: + main_logger.info("Transforming datasets to parquet") + table_data_input = df_to_parquet_azureml_input(df, dir=tmp_dir) + table_data_input_test = df_to_parquet_azureml_input( + df_test, + dir=tmp_dir_test, + ) + automl_config = config_to_yaml_azureml_input( + automl_config_dict, + dir=tmp_dir_config, + ) + pipeline_job = forecasting_pipeline( + training_data=table_data_input, + test_data=table_data_input_test, + automl_config=automl_config, + compute_name=aml_compute.name, + ) + pipeline_job.settings.default_compute = aml_compute.name + main_logger.info("Begin submitting pipeline job") + returned_pipeline_job = ml_client.jobs.create_or_update( + pipeline_job, + experiment_name=experiment_name, + ) + return returned_pipeline_job.name + + def get_job(self, job_name: str) -> Job: + ml_client = self.get_ml_client() + job = ml_client.jobs.get(job_name) + return job + + def get_job_status(self, job_name: str) -> str | None: + job = self.get_job(job_name) + return job.status + + def get_job_total_time(self, job_name: str) -> float | None: + job = self.get_job(job_name) + if job.status == "NotStarted": + main_logger.info(f"Job {job_name} is not started yet") + return None + stages_key = "azureml.pipelines.stages" + if stages_key not in job.properties: + main_logger.info(f"Job {job_name} has no stages yet") + return None + stages = json.loads(job.properties[stages_key]) + execution_info = stages["Execution"] + status = execution_info["Status"] + if status == "Failed": + raise Exception(f"Job {job_name} failed") + start_time = str_to_datetime(execution_info["StartTime"]) + if "EndTime" not in execution_info: + total_time = pd.Timestamp.now(tz=start_time.tz) - start_time + main_logger.info( + f"Job has status {status}, total time so far: {total_time.total_seconds()}" + ) + end_time = str_to_datetime(execution_info["EndTime"]) + total_time = end_time - start_time + return total_time.total_seconds() + + def get_forecast_df(self, job_name: str) -> pd.DataFrame | None: + job_status = self.get_job_status(job_name) + if job_status != "Completed": + main_logger.info(f"Job {job_name} is not completed yet") + return None + ml_client = self.get_ml_client() + cwd = Path.cwd() + with TemporaryDirectory(dir=cwd) as tmp_dir: + ml_client.jobs.download( + job_name, + download_path=tmp_dir, + output_name="forecast_output", + ) + output_path = Path(tmp_dir) / "named-outputs" / "forecast_output" + forecast_df = pd.read_parquet(output_path) + return forecast_df diff --git a/experiments/azure-automl-forecasting/src/azure_automl/download_forecasts.py b/experiments/azure-automl-forecasting/src/azure_automl/download_forecasts.py new file mode 100644 index 00000000..65208d8f --- /dev/null +++ b/experiments/azure-automl-forecasting/src/azure_automl/download_forecasts.py @@ -0,0 +1,53 @@ +import logging +from pathlib import Path + +import fire + +from .automl_handler import AzureAutoML +from .forecasting import AzureAutoMLJobs +from src.utils.data_handler import ForecastDataset + +logging.basicConfig(level=logging.INFO) +main_logger = logging.getLogger(__name__) + + +def download_forecasts(dir: str = "./results"): + azure_automl = AzureAutoML.from_environment() + azure_automl_experiments = AzureAutoMLJobs() + results_path = Path(dir) / "azure_automl" + + jobs_df = azure_automl_experiments.get_jobs_df() + jobs_df = jobs_df.sort_values("created_at", ascending=False).drop_duplicates( + "experiment_name" + ) + + for _, row in jobs_df.iterrows(): + experiment_name = row.experiment_name + job_name = row.job_name + main_logger.info( + f"Downloading forecasts for experiment {experiment_name} and job {job_name}" + ) + try: + forecast_df = azure_automl.get_forecast_df(job_name) + total_time = azure_automl.get_job_total_time(job_name) + except Exception: + main_logger.info( + f"Failed to download forecasts for experiment {experiment_name} and job {job_name}" + ) + continue + if forecast_df is None: + main_logger.info( + f"Failed to download forecasts for experiment {experiment_name} and job {job_name}" + "probably because the job is not finished yet or failed" + ) + continue + fcst_dataset = ForecastDataset(forecast_df=forecast_df, total_time=total_time) + experiment_name = row.experiment_name + fcst_dataset.save_to_dir(results_path / experiment_name) + main_logger.info( + f"Saved forecasts for experiment {experiment_name} and job {job_name}" + ) + + +if __name__ == "__main__": + fire.Fire(download_forecasts) diff --git a/experiments/azure-automl-forecasting/src/azure_automl/forecasting.py b/experiments/azure-automl-forecasting/src/azure_automl/forecasting.py new file mode 100644 index 00000000..73deac17 --- /dev/null +++ b/experiments/azure-automl-forecasting/src/azure_automl/forecasting.py @@ -0,0 +1,82 @@ +from pathlib import Path + +import fire +import pandas as pd +from azure.ai.ml.entities import AmlCompute + +from .automl_handler import AzureAutoML +from src.utils.data_handler import ExperimentDataset + + +class AzureAutoMLJobs: + """ + This class stores and updates the Azure AutoML Experiments, + to keep track of the pipeline jobs. + We need this to later downlaod the forecasts. + """ + + file_name = "forecasting_jobs.csv" + + def __init__(self, dir: str = "./azure_automl_results"): + self.dir = dir + self.jobs_path = Path(self.dir) / self.file_name + self.setup() + + def setup(self): + self.jobs_path.parent.mkdir(parents=True, exist_ok=True) + if not self.jobs_path.exists(): + pd.DataFrame(columns=["created_at", "experiment_name", "job_name"]).to_csv( + self.jobs_path, + index=False, + ) + + def get_jobs_df(self) -> pd.DataFrame: + return pd.read_csv(self.jobs_path) + + def save_job(self, job_name: str, experiment_name: str): + jobs_df = self.get_jobs_df() + new_row = pd.DataFrame( + { + "created_at": [pd.Timestamp.now()], + "experiment_name": [experiment_name], + "job_name": [job_name], + } + ) + jobs_df = pd.concat([jobs_df, new_row]) + jobs_df.to_csv(self.jobs_path, index=False) + + +def start_forecasting_job( + dataset_path: str, + begin_create_or_update_aml_compute: bool = False, +): + experiment_name = dataset_path.split("/")[-1].split(".")[0] + dataset = ExperimentDataset.from_parquet(parquet_path=dataset_path) + azure_automl = AzureAutoML.from_environment() + azure_automl_jobs = AzureAutoMLJobs() + + aml_compute = AmlCompute( + name="azure-automl-fcst-cluster-nixtla", + min_instances=11, + max_instances=11, + size="STANDARD_DS5_V2", + ) + + job_name = azure_automl.forecast( + df=dataset.Y_df_train, + df_test=dataset.Y_df_test, + aml_compute=aml_compute, + h=dataset.horizon, + freq=dataset.pandas_frequency, + n_cross_validations=2, + experiment_name=experiment_name, + begin_create_or_update_aml_compute=begin_create_or_update_aml_compute, + max_nodes=11, + max_concurrency_per_node=8, + ) + + azure_automl_jobs.save_job(job_name, experiment_name) + + +if __name__ == "__main__": + fire.Fire(start_forecasting_job) diff --git a/experiments/azure-automl-forecasting/src/evaluation.py b/experiments/azure-automl-forecasting/src/evaluation.py new file mode 100644 index 00000000..6d7cd16d --- /dev/null +++ b/experiments/azure-automl-forecasting/src/evaluation.py @@ -0,0 +1,137 @@ +import logging +from pathlib import Path +from typing import List +from unicodedata import numeric + +import fire +import pandas as pd +from rich.console import Console +from rich.table import Table + +from src.utils.data_handler import ExperimentDataset, ForecastDataset + +logging.basicConfig(level=logging.INFO) +main_logger = logging.getLogger(__name__) + + +def print_df_rich(df: pd.DataFrame): + console = Console() + table = Table() + for col in df.select_dtypes(include=["float"]).columns: + df[col] = df[col].apply(lambda x: f"{x:.3f}") + for col in df.columns: + table.add_column(col) + for row in df.itertuples(index=False): + table.add_row(*row) + console.print(table) + + +METHODS = { + "azure_automl": "automl_prediction", + "nixtla_timegpt": "TimeGPT", + "statsforecast_sn": "SeasonalNaive", +} + + +def get_model_name(method: str) -> str: + if method not in METHODS: + raise ValueError(f"Invalid method: {method}") + return METHODS[method] + + +def evaluate_experiments( + datasets_paths: str, + methods_to_evaluate: List[str] = list(METHODS.keys()), + results_dir: str = "./results", +): + datasets_paths_ = datasets_paths.split(",") + eval_datasets_df: pd.DataFrame | None = None + for dataset_path in datasets_paths_: + experiment_name = dataset_path.split("/")[-1].split(".")[0] + eval_method_df: pd.DataFrame | None = None + dataset: None | ExperimentDataset = None + for method in methods_to_evaluate: + results_experiment_dir = Path(results_dir) / method / experiment_name + if ForecastDataset.is_forecast_ready(results_experiment_dir): + main_logger.info( + f"Evaluating experiment {experiment_name} and method {method}" + ) + forecast_dataset = ForecastDataset.from_dir(results_experiment_dir) + if dataset is None: + dataset = ExperimentDataset.from_parquet(parquet_path=dataset_path) + eval_df = dataset.evaluate_forecast_df( + forecast_df=forecast_dataset.forecast_df, + model=get_model_name(method), + total_time=forecast_dataset.total_time, + ) + if eval_method_df is None: + eval_method_df = eval_df + else: + eval_method_df = pd.concat( + [eval_method_df, eval_df], + axis=1, + ) # type: ignore + else: + main_logger.info( + f"Skipping evaluation for experiment {experiment_name} and method {method}" + " because the forecasts are not ready yet" + ) + if eval_method_df is not None: + eval_method_df.reset_index(inplace=True) + eval_method_df.insert(0, "dataset", experiment_name) + if eval_datasets_df is None: + eval_datasets_df = eval_method_df + else: + eval_datasets_df = pd.concat( + [eval_datasets_df, eval_method_df], + ignore_index=True, + ) # type: ignore + if eval_datasets_df is not None: + azure_renamer = {"automl_prediction": "AzureAutoML"} + if "azure_automl" in methods_to_evaluate: + eval_datasets_df = eval_datasets_df.rename(columns=azure_renamer) + eval_datasets_df.to_csv(Path(results_dir) / "eval_datasets.csv", index=False) + eval_datasets_df["metric"] = ( + eval_datasets_df["metric"].str.upper().str.replace("TOTAL_", "") + ) + # scale by SeasonalNaive + if "SeasonalNaive" in eval_datasets_df.columns: + time_mask = eval_datasets_df["metric"] == "TIME" + for model in eval_datasets_df.columns.drop(["dataset", "metric"]): + if model == "SeasonalNaive": + continue + eval_datasets_df.loc[~time_mask, model] = ( + eval_datasets_df.loc[~time_mask, model] + / eval_datasets_df.loc[~time_mask, "SeasonalNaive"] + ) + eval_datasets_df = eval_datasets_df.drop(columns=["SeasonalNaive"]) + + def pivot_df(df: pd.DataFrame, col: str) -> pd.DataFrame: + return df.pivot( + index="dataset", + columns="metric", + values=col, + ) + + result_list = [] + models = [] + for method in methods_to_evaluate: + if method == "statsforecast_sn": + continue + if method == "azure_automl": + col = "AzureAutoML" + else: + col = get_model_name(method) + pivotted_df = pivot_df(eval_datasets_df, col) + result_list.append(pivotted_df) + models.append(col) + result = pd.concat(result_list, axis=1, keys=models) + result = result.swaplevel(axis=1).sort_index(axis=1) + flattened_columns = ["_".join(col) for col in result.columns.values] + result.columns = flattened_columns + result = result.reset_index() + print_df_rich(result) + + +if __name__ == "__main__": + fire.Fire(evaluate_experiments) diff --git a/experiments/azure-automl-forecasting/src/nixtla_timegpt.py b/experiments/azure-automl-forecasting/src/nixtla_timegpt.py new file mode 100644 index 00000000..16571613 --- /dev/null +++ b/experiments/azure-automl-forecasting/src/nixtla_timegpt.py @@ -0,0 +1,40 @@ +import sys +from pathlib import Path +from time import time + +import fire +from dotenv import load_dotenv +from nixtlats import TimeGPT + +from src.utils.data_handler import ExperimentDataset, ForecastDataset + +load_dotenv() + + +def timegpt_forecast(dataset_path: str, results_dir: str = "./results"): + dataset = ExperimentDataset.from_parquet(parquet_path=dataset_path) + size_df = sys.getsizeof(dataset.Y_df_train) / (1024 * 1024) + max_partition_size_mb = 20 + num_partitions = int(size_df / max_partition_size_mb) + 1 + timegpt = TimeGPT(max_retries=1) + start = time() + forecast_df = timegpt.forecast( + df=dataset.Y_df_train, + h=dataset.horizon, + freq=dataset.pandas_frequency, + model="timegpt-1-long-horizon", + num_partitions=num_partitions, + ) + end = time() + total_time = end - start + forecast_dataset = ForecastDataset( + forecast_df=forecast_df, + total_time=total_time, + ) + experiment_name = dataset_path.split("/")[-1].split(".")[0] + results_path = Path(results_dir) / "nixtla_timegpt" / experiment_name + forecast_dataset.save_to_dir(results_path) + + +if __name__ == "__main__": + fire.Fire(timegpt_forecast) diff --git a/experiments/azure-automl-forecasting/src/statsforecast_sn.py b/experiments/azure-automl-forecasting/src/statsforecast_sn.py new file mode 100644 index 00000000..a641715a --- /dev/null +++ b/experiments/azure-automl-forecasting/src/statsforecast_sn.py @@ -0,0 +1,33 @@ +import os +from pathlib import Path +from time import time + +import fire +from statsforecast import StatsForecast +from statsforecast.models import SeasonalNaive + +from src.utils.data_handler import ExperimentDataset, ForecastDataset + + +def sn_forecast(dataset_path: str, results_dir: str = "./results"): + os.environ["NIXTLA_ID_AS_COL"] = "true" + dataset = ExperimentDataset.from_parquet(parquet_path=dataset_path) + sf = StatsForecast( + models=[SeasonalNaive(season_length=dataset.seasonality)], + freq=dataset.pandas_frequency, + ) + start = time() + forecast_df = sf.forecast( + df=dataset.Y_df_train, + h=dataset.horizon, + ) + end = time() + total_time = end - start + forecast_dataset = ForecastDataset(forecast_df=forecast_df, total_time=total_time) + experiment_name = dataset_path.split("/")[-1].split(".")[0] + results_path = Path(results_dir) / "statsforecast_sn" / experiment_name + forecast_dataset.save_to_dir(results_path) + + +if __name__ == "__main__": + fire.Fire(sn_forecast) diff --git a/experiments/azure-automl-forecasting/src/utils/data_handler.py b/experiments/azure-automl-forecasting/src/utils/data_handler.py new file mode 100644 index 00000000..c14b5b07 --- /dev/null +++ b/experiments/azure-automl-forecasting/src/utils/data_handler.py @@ -0,0 +1,135 @@ +import logging +import warnings +from dataclasses import dataclass, asdict +from functools import partial +from pathlib import Path + +import pandas as pd +from utilsforecast.evaluation import evaluate +from utilsforecast.losses import rmse, mae, mase + +from src.utils.filter_data import DatasetParams + +warnings.simplefilter(action="ignore", category=FutureWarning) +logging.basicConfig(level=logging.INFO) +main_logger = logging.getLogger(__name__) + + +@dataclass +class ExperimentDataset: + Y_df_train: pd.DataFrame + Y_df_test: pd.DataFrame + horizon: int + seasonality: int + frequency: str + pandas_frequency: str + + @classmethod + def from_df(cls, df: pd.DataFrame) -> "ExperimentDataset": + """ + Parameters + ---------- + df : pd.DataFrame + df should have columns: unique_id, ds, y, frequency, pandas_frequency, horizon, seasonality + """ + ds_params = DatasetParams.from_df(df) + df = df[["unique_id", "ds", "y"]] # type: ignore + Y_df_test = df.groupby("unique_id").tail(ds_params.horizon) + Y_df_train = df.drop(Y_df_test.index) # type: ignore + return cls( + Y_df_train=Y_df_train, + Y_df_test=Y_df_test, + **asdict(ds_params), + ) + + @classmethod + def from_parquet( + cls, + parquet_path: str, + ) -> "ExperimentDataset": + df = pd.read_parquet(parquet_path) + return cls.from_df(df=df) + + def evaluate_forecast_df( + self, + forecast_df: pd.DataFrame, + model: str, + total_time: float, + ) -> pd.DataFrame: + df_ = self.Y_df_test.copy(deep=True) + if forecast_df.dtypes["ds"] != df_.dtypes["ds"]: + df_["ds"] = df_["ds"].astype(forecast_df.dtypes["ds"]) + df = df_.merge( + forecast_df[["unique_id", "ds", model]], + on=["unique_id", "ds"], + how="left", + ) + if df[model].isna().sum() > 0: + na_uids = df.loc[df[model].isna()]["unique_id"].unique() + main_logger.warning( + f"{model} contains NaN for {len(na_uids)} series: {na_uids}" + "filling with last values" + ) + from statsforecast import StatsForecast + from statsforecast.models import SeasonalNaive + + sf = StatsForecast( + models=[SeasonalNaive(season_length=self.seasonality)], + freq=self.pandas_frequency, + ) + sn_df = sf.forecast( + df=self.Y_df_train, + h=self.horizon, + ) + df = df.merge(sn_df, on=["unique_id", "ds"], how="left") # type: ignore + df.loc[df["unique_id"].isin(na_uids), model] = df.loc[ + df["unique_id"].isin(na_uids), "SeasonalNaive" + ] + df = df.drop(columns=["SeasonalNaive"]) + partial_mase = partial(mase, seasonality=self.seasonality) + eval_df = evaluate( + df=df, + metrics=[rmse, mae, partial_mase], + train_df=self.Y_df_train, + models=[model], + ) + eval_df = eval_df.groupby("metric").mean(numeric_only=True).reset_index() # type: ignore + eval_time_df = pd.DataFrame( + { + "metric": ["total_time"], + model: [total_time], + } + ) + eval_df = pd.concat( + [eval_df, eval_time_df], + ignore_index=True, + ) # type: ignore + return eval_df.set_index("metric") + + +@dataclass +class ForecastDataset: + forecast_df: pd.DataFrame + total_time: float + + @classmethod + def from_dir(cls, dir: str | Path): + dir_ = Path(dir) + forecast_df = pd.read_parquet(dir_ / "forecast_df.parquet") + with open(dir_ / "total_time.txt", "r") as file: + total_time = float(file.read()) + return cls(forecast_df=forecast_df, total_time=total_time) + + @staticmethod + def is_forecast_ready(dir: str | Path): + dir_ = Path(dir) + forecast_path = dir_ / "forecast_df.parquet" + time_path = dir_ / "total_time.txt" + return forecast_path.exists() and time_path.exists() + + def save_to_dir(self, dir: str | Path): + dir_ = Path(dir) + dir_.mkdir(parents=True, exist_ok=True) + self.forecast_df.to_parquet(dir_ / "forecast_df.parquet") + with open(dir_ / "total_time.txt", "w") as file: + file.write(str(self.total_time)) diff --git a/experiments/azure-automl-forecasting/src/utils/download_data.py b/experiments/azure-automl-forecasting/src/utils/download_data.py new file mode 100644 index 00000000..c851219e --- /dev/null +++ b/experiments/azure-automl-forecasting/src/utils/download_data.py @@ -0,0 +1,64 @@ +import logging +from concurrent.futures import ProcessPoolExecutor + +import pandas as pd + +logging.basicConfig(level=logging.INFO) +main_logger = logging.getLogger(__name__) + + +def read_parquet_and_assign(uid, url): + df = pd.read_parquet(url) + df["unique_id"] = uid + df["ds"] = df["ds"].astype(str) + return df[["unique_id", "ds", "y"]] + + +def download_data(): + catalogue_splits = pd.read_parquet("./data/catalogue_splits.parquet") + catalogue_datasets = pd.read_parquet("./data/catalogue_datasets.parquet") + catalogue_df = catalogue_splits.merge( + catalogue_datasets, + on=["dataset", "subdataset", "frequency"], + ) + del catalogue_splits + del catalogue_datasets + catalogue_df = catalogue_df.query("split == 'test'")[ + [ + "unique_id", + "frequency", + "url", + "pandas_frequency", + "seasonality", + "horizon", + ] + ] + grouped_df = catalogue_df.groupby(["frequency", "pandas_frequency"]) + for (frequency, pandas_frequency), df in grouped_df: + uids, urls = df["unique_id"].values, df["url"].values + main_logger.info( + f"frequency: {frequency}, pandas_frequency: {pandas_frequency}" + ) + n_uids = len(uids) + main_logger.info(f"number of uids: {n_uids}") + max_workers = min(10, n_uids) + with ProcessPoolExecutor(max_workers=max_workers) as executor: + futures = [ + executor.submit(read_parquet_and_assign, uid, url) + for uid, url in zip(uids, urls) + ] + results = [future.result() for future in futures] + main_logger.info("dataset read") + Y_df = pd.concat(results) + Y_df = Y_df.merge( + df.drop(columns="url"), + on="unique_id", + how="left", + ) + Y_df.to_parquet(f"./data/{frequency}_{pandas_frequency}.parquet") + del Y_df + main_logger.info("dataset saved") + + +if __name__ == "__main__": + download_data() diff --git a/experiments/azure-automl-forecasting/src/utils/filter_data.py b/experiments/azure-automl-forecasting/src/utils/filter_data.py new file mode 100644 index 00000000..13edc1dc --- /dev/null +++ b/experiments/azure-automl-forecasting/src/utils/filter_data.py @@ -0,0 +1,109 @@ +""" +this module takes Nixtla's benchmarking data +and filters it to prevent azureml from crashing +in the following cases: +- too short series, see https://learn.microsoft.com/en-us/azure/machine-learning/concept-automl-forecasting-methods?view=azureml-api-2#data-length-requirements +""" +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Callable + +import fire +import numpy as np +import pandas as pd + +logging.basicConfig(level=logging.INFO) +main_logger = logging.getLogger(__name__) + + +@dataclass +class DatasetParams: + frequency: str + pandas_frequency: str + horizon: int + seasonality: int + + @staticmethod + def _get_value_from_df_col( + df: pd.DataFrame, + col: str, + dtype: Callable | None = None, + ) -> Any: + col_values = df[col].unique() + if len(col_values) > 1: + raise ValueError(f"{col} is not unique: {col_values}") + value = col_values[0] + if dtype is not None: + value = dtype(value) + return value + + @classmethod + def from_df(cls, df: pd.DataFrame) -> "DatasetParams": + dataset_params = {} + dataset_params_cols = [ + "frequency", + "pandas_frequency", + "horizon", + "seasonality", + ] + dataset_params_cols_dtypes = [str, str, int, int] + for col, dtype in zip(dataset_params_cols, dataset_params_cols_dtypes): + dataset_params[col] = cls._get_value_from_df_col(df, col, dtype=dtype) + return cls(**dataset_params) + + +def filter_and_clean_dataset( + dataset_path: str, + max_series: int = 1_000, + n_train_cv: int = 2, + n_seasonalities: int = 5, + max_insample_length: int = 3_000, + random_seed: int = 420, +): + main_logger.info(f"Processing dataset {dataset_path}") + df = pd.read_parquet(dataset_path) + df = df.drop_duplicates(["unique_id", "ds"]) # type: ignore + df = df.sort_values(["unique_id", "ds"]) + ds_params = DatasetParams.from_df(df) + min_train_size_per_series = ( + ds_params.horizon + + 2 * ds_params.horizon + + (n_train_cv - 1) * ds_params.horizon + + 1 + ) + if ds_params.seasonality < 100: + # if series has low seasonality + # we add n_seasonalities to min_train_size_per_series + # to keep the series long enough + min_train_size_per_series += n_seasonalities * ds_params.seasonality + uids = df["unique_id"].unique() # type: ignore + df = ( + df.groupby("unique_id") + .filter(lambda x: len(x) >= min_train_size_per_series) + .groupby("unique_id") # type: ignore + .tail(max_insample_length + ds_params.horizon) + .reset_index(drop=True) + ) + main_logger.info( + f"Filtering out {len(uids) - len(df['unique_id'].unique())} series" + ) + uids = df["unique_id"].unique() # type: ignore + if len(uids) > max_series: + np.random.seed(random_seed) + uids = np.random.choice(uids, max_series, replace=False) # type: ignore + df = df.query("unique_id in @uids") # type: ignore + main_logger.info(f"Filtering out {len(uids) - max_series} series") + # finally we clean some strange dates + mask = df["ds"].str.endswith(":01") # type: ignore + df.loc[mask, "ds"] = df.loc[mask, "ds"].str[:-3] + ":00" + # save the dataset + dataset_path = Path(dataset_path) # type: ignore + filtered_dataset_path = dataset_path.parent / "filtered_datasets" / dataset_path.name # type: ignore + filtered_dataset_path.parent.mkdir(exist_ok=True, parents=True) + df.to_parquet(filtered_dataset_path) + main_logger.info(f"Filtered dataset saved to {filtered_dataset_path}") + + +if __name__ == "__main__": + fire.Fire(filter_and_clean_dataset)