Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add azure automl experiment #246

Merged
merged 22 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions experiments/azure-automl-forecasting/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
AZURE_SUBSCRIPTION_ID=
AZURE_RESOURCE_GROUP=
AZURE_WORKSPACE_NAME=
TIMEGPT_TOKEN=

39 changes: 39 additions & 0 deletions experiments/azure-automl-forecasting/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
TS_FILES := Hourly_H.parquet Daily_D.parquet Weekly_W-MON.parquet Monthly_MS.parquet
FILTERED_TS_FILES := $(patsubst %,./data/filtered_datasets/%,$(TS_FILES))

filter_data:
@for file in $(TS_FILES); do \
python -m src.utils.filter_data --dataset_path ./data/$$file; \
done

run_timegpt: .require-dataset_path
@echo Running TimeGPT with dataset_path=$(dataset_path)
@python -m src.nixtla_timegpt --dataset_path $(dataset_path)

run_sn: .require-dataset_path
@echo Running SN with dataset_path=$(dataset_path)
@python -m src.statsforecast_sn --dataset_path $(dataset_path)

run_automl: .require-dataset_path
@echo Running AutoML with dataset_path=$(dataset_path)
@python -m src.azure_automl.forecasting --dataset_path $(dataset_path)

run_methods:
@for file in $(TS_FILES); do \
echo "Running methods for $$file"; \
$(MAKE) run_timegpt dataset_path=./data/filtered_datasets/$$file; \
$(MAKE) run_sn dataset_path=./data/filtered_datasets/$$file; \
$(MAKE) run_automl dataset_path=./data/filtered_datasets/$$file; \
done

download_automl_forecasts:
@python -m src.azure_automl.download_forecasts

evaluate_experiments:
@python -m src.evaluation --datasets_paths "$(shell echo $(FILTERED_TS_FILES) | tr ' ' ',')"

.require-dataset_path:
ifndef dataset_path
$(error dataset_path is required)
endif

75 changes: 75 additions & 0 deletions experiments/azure-automl-forecasting/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Nixtla TimeGPT vs. Azure AutoML: A Comprehensive Performance Analysis

This experiment evaluates the performance of **Nixtla TimeGPT's zero-shot inference** against **Microsoft's Azure AutoML** in the domain of time series forecasting. Our analysis shows that TimeGPT **surpasses Azure AutoML by 12%, 12%, and 10% in MAE, RMSE, and MASE metrics** and has **300x improvement in computational efficiency**. This evaluation spanned over 3,000 distinct time series across various data frequencies, with considerations for Azure AutoML's cost constraints.

# Introduction

[Azure AutoML](https://learn.microsoft.com/en-us/azure/machine-learning/concept-automl-forecasting-methods?view=azureml-api-2), a product of Microsoft, offers a robust automated machine-learning solution that caters to a wide array of predictive tasks, including time series forecasting. TimeGPT is a foundational model for time series forecasting that can be accessed [through an API](https://docs.nixtla.io/). While Azure AutoML is known for its adaptability and ease of use, our findings reveal that TimeGPT offers superior accuracy and efficiency, especially in the context of time series data.

## Empirical Evaluation

Our study involved a detailed comparison of both models across various datasets, including Hourly, Daily, Weekly, and Monthly data frequencies. The datasets were chosen from the test set of the [TimeGPT-1 paper](https://arxiv.org/abs/2310.03589), ensuring a diverse set of time series for evaluation. The selection process was designed to manage computational complexity and adhere to Azure AutoML's dataset size requirements, with a cap of 3,000 observations to maintain cost-effectiveness.

## Results

The following table shows the main findings of our analysis, presenting a comparison of performance metrics (MASE, MAE, RMSE) and computational time (in seconds) across different datasets. The best results are highlighted in **bold** for clarity.

<img width="632" alt="image" src="https://github.com/Nixtla/nixtla/assets/10517170/0cc4285e-2572-4f08-9846-94c68ad72e8b">


## Reproducibility

All experiments were conducted in controlled environments to uphold the integrity and reproducibility of our results. TimeGPT evaluations were performed using a 2020 MacBook Air with an M1 chip, ensuring accessibility and practicality. In contrast, Azure AutoML experiments were carried out on a cluster of 11 STANDARD_DS5_V2 virtual machines equipped with substantial computational resources to showcase its scalability and power.

### Instructions

1. Configure Azure AutoML according to the official Microsoft documentation.
2. Set the environment variables in a `.env` file using `.env.example` as example.
3. Set up a conda environment using:

```bash
mamba create -n azure-automl-fcst python=3.10
conda activate azure-automl-fcst
pip install uv
uv pip install -r requirements.txt
```

4. Download the data using

```python
python -m src.utils.download_data
```

If you're interested in replicating the results, write us at `[email protected]` to give you access to the data.

5. Filter the datasets to prevent AzureML from crashing

```
make filter_data
```

6. Run the forecasting tasks for TimeGPT, SeasonalNaive, and AzureAutoML using the following:

```
make run_methods
```

Notice that AzureAutoML will send the job to the predefined cluster.

7. Retrieve AzureAutoML forecasts once they are ready:

```
make download_automl_forecasts
```

8. Run evaluation

```
make evaluate_experiments
```


### References
- [TimeGPT 1](https://arxiv.org/abs/2310.03589)
- [StatsForecast](https://github.com/Nixtla/statsforecast/)
- [Distributed AzureAutoML for forecasting](https://github.com/Azure/azureml-examples/blob/main/sdk/python/jobs/pipelines/1k_demand_forecasting_with_pipeline_components/automl-forecasting-demand-many-models-in-pipeline/automl-forecasting-demand-many-models-in-pipeline.ipynb)
11 changes: 11 additions & 0 deletions experiments/azure-automl-forecasting/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
azure-ai-ml
azure-identity
azureml-core
fire
mltable
nixtlats
pandas
python-dotenv
rich
statsforecast
utilsforecast
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
import json
import logging
import os
import yaml
from pathlib import Path
from tempfile import TemporaryDirectory

import numpy as np
import pandas as pd
from azure.ai.ml import Input
from azure.ai.ml import MLClient
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import AmlCompute, Job
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv

load_dotenv()
logging.basicConfig(level=logging.INFO)
main_logger = logging.getLogger(__name__)

loggers = logging.Logger.manager.loggerDict
for logger_name in loggers:
if logger_name.startswith("azure"):
logger = logging.getLogger(logger_name)
logger.disabled = True
logger.propagate = False


def str_to_datetime(date_str: str) -> pd.Timestamp:
return pd.Timestamp(date_str)


def df_to_parquet_azureml_input(df: pd.DataFrame, dir: str) -> Input:
series_path = Path(dir) / "series.parquet"
df.to_parquet(series_path, index=False)
table_data_input = Input(type=AssetTypes.URI_FOLDER, path=dir)
return table_data_input


def config_to_yaml_azureml_input(config: dict, dir: str) -> Input:
config_path = Path(dir) / "config.yaml"
with open(config_path, "w") as f:
yaml.dump(config, f)
config = Input(type="uri_file", path=str(config_path))
return config


class AzureAutoML:
"""
Before using this class, you need to login to Azure.
Use the following command to login:
$ az login
"""

def __init__(
self,
subscription_id: str,
resource_group_name: str,
workspace_name: str,
):
self.subscription_id = subscription_id
self.resource_group_name = resource_group_name
self.workspace_name = workspace_name

@classmethod
def from_environment(cls) -> "AzureAutoML":
return cls(
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"],
resource_group_name=os.environ["AZURE_RESOURCE_GROUP"],
workspace_name=os.environ["AZURE_WORKSPACE_NAME"],
)

def get_ml_client(self, registry_name: str | None = None) -> MLClient:
kwargs = {}
if not registry_name:
kwargs["workspace_name"] = self.workspace_name
else:
kwargs["registry_name"] = registry_name
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
ml_client = MLClient(
credential=credential,
subscription_id=self.subscription_id,
resource_group_name=self.resource_group_name,
**kwargs,
)
return ml_client

def get_train_and_inference_components(self) -> tuple:
ml_client_reqistry = self.get_ml_client("azureml")
train_component = ml_client_reqistry.components.get(
name="automl_many_models_training",
label="latest",
)
inference_component = ml_client_reqistry.components.get(
name="automl_many_models_inference",
label="latest",
)
return train_component, inference_component

def forecast(
self,
df: pd.DataFrame,
df_test: pd.DataFrame,
aml_compute: AmlCompute,
h: int,
freq: str,
id_col: str = "unique_id",
time_col: str = "ds",
target_col: str = "y",
primary_metric: str = "normalized_root_mean_squared_error",
n_cross_validations: str | int = "auto",
experiment_name: str | None = None,
begin_create_or_update_aml_compute: bool = False,
max_trials: int = 25,
enable_early_stopping: bool = True,
max_nodes: int = 1,
max_concurrency_per_node: int = 1,
forecast_mode: str = "rolling",
retrain_failed_model: bool = False,
) -> str:
if experiment_name is None:
random_id = np.random.randint(10000, 99999)
experiment_name = f"automl-forecasting-job-{random_id}"
ml_client = self.get_ml_client()
train_component, inference_component = self.get_train_and_inference_components()
automl_config_dict = dict(
task="forecasting",
forecast_horizon=h,
forecast_step=h,
frequency=freq,
time_series_id_column_names=id_col,
partition_column_names=[id_col],
time_column_name=time_col,
label_column_name=target_col,
primary_metric=primary_metric,
n_cross_validations=n_cross_validations,
max_trials=max_trials,
enable_early_stopping=enable_early_stopping,
track_child_runs=False,
allow_multi_partitions=False,
# allowed_training_algorithms=["Naive"],
)

@pipeline(description="pipeline for automl forecasting")
def forecasting_pipeline(
training_data: Input,
test_data: Input,
automl_config: Input,
compute_name: str,
):
# training node
training_node = train_component(
raw_data=training_data,
automl_config=automl_config,
max_concurrency_per_node=max_concurrency_per_node,
max_nodes=max_nodes,
retrain_failed_model=retrain_failed_model,
compute_name=compute_name,
)
# inference node
inference_node = inference_component(
raw_data=test_data,
max_nodes=max_nodes,
max_concurrency_per_node=max_concurrency_per_node,
optional_train_metadata=training_node.outputs.run_output,
forecast_mode=forecast_mode,
forecast_step=h,
compute_name=compute_name,
)
return {"forecast_output": inference_node.outputs.raw_predictions}

if begin_create_or_update_aml_compute:
main_logger.info("Begin create or update aml compute")
ml_client.compute.begin_create_or_update(aml_compute).result()

cwd = Path.cwd()
with TemporaryDirectory(dir=cwd) as tmp_dir, TemporaryDirectory(
dir=cwd
) as tmp_dir_test, TemporaryDirectory(dir=cwd) as tmp_dir_config:
main_logger.info("Transforming datasets to parquet")
table_data_input = df_to_parquet_azureml_input(df, dir=tmp_dir)
table_data_input_test = df_to_parquet_azureml_input(
df_test,
dir=tmp_dir_test,
)
automl_config = config_to_yaml_azureml_input(
automl_config_dict,
dir=tmp_dir_config,
)
pipeline_job = forecasting_pipeline(
training_data=table_data_input,
test_data=table_data_input_test,
automl_config=automl_config,
compute_name=aml_compute.name,
)
pipeline_job.settings.default_compute = aml_compute.name
main_logger.info("Begin submitting pipeline job")
returned_pipeline_job = ml_client.jobs.create_or_update(
pipeline_job,
experiment_name=experiment_name,
)
return returned_pipeline_job.name

def get_job(self, job_name: str) -> Job:
ml_client = self.get_ml_client()
job = ml_client.jobs.get(job_name)
return job

def get_job_status(self, job_name: str) -> str | None:
job = self.get_job(job_name)
return job.status

def get_job_total_time(self, job_name: str) -> float | None:
job = self.get_job(job_name)
if job.status == "NotStarted":
main_logger.info(f"Job {job_name} is not started yet")
return None
stages_key = "azureml.pipelines.stages"
if stages_key not in job.properties:
main_logger.info(f"Job {job_name} has no stages yet")
return None
stages = json.loads(job.properties[stages_key])
execution_info = stages["Execution"]
status = execution_info["Status"]
if status == "Failed":
raise Exception(f"Job {job_name} failed")
start_time = str_to_datetime(execution_info["StartTime"])
if "EndTime" not in execution_info:
total_time = pd.Timestamp.now(tz=start_time.tz) - start_time
main_logger.info(
f"Job has status {status}, total time so far: {total_time.total_seconds()}"
)
end_time = str_to_datetime(execution_info["EndTime"])
total_time = end_time - start_time
return total_time.total_seconds()

def get_forecast_df(self, job_name: str) -> pd.DataFrame | None:
job_status = self.get_job_status(job_name)
if job_status != "Completed":
main_logger.info(f"Job {job_name} is not completed yet")
return None
ml_client = self.get_ml_client()
cwd = Path.cwd()
with TemporaryDirectory(dir=cwd) as tmp_dir:
ml_client.jobs.download(
job_name,
download_path=tmp_dir,
output_name="forecast_output",
)
output_path = Path(tmp_dir) / "named-outputs" / "forecast_output"
forecast_df = pd.read_parquet(output_path)
return forecast_df
Loading
Loading