Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add quantiles to forecast and cross validation methods #241

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions experiments/azure-automl-forecasting/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
AZURE_SUBSCRIPTION_ID=
AZURE_RESOURCE_GROUP=
AZURE_WORKSPACE_NAME=
TIMEGPT_TOKEN=

39 changes: 39 additions & 0 deletions experiments/azure-automl-forecasting/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
TS_FILES := Hourly_H.parquet Daily_D.parquet Weekly_W-MON.parquet Monthly_MS.parquet
FILTERED_TS_FILES := $(patsubst %,./data/filtered_datasets/%,$(TS_FILES))

filter_data:
@for file in $(TS_FILES); do \
python -m src.utils.filter_data --dataset_path ./data/$$file; \
done

run_timegpt: .require-dataset_path
@echo Running TimeGPT with dataset_path=$(dataset_path)
@python -m src.nixtla_timegpt --dataset_path $(dataset_path)

run_sn: .require-dataset_path
@echo Running SN with dataset_path=$(dataset_path)
@python -m src.statsforecast_sn --dataset_path $(dataset_path)

run_automl: .require-dataset_path
@echo Running AutoML with dataset_path=$(dataset_path)
@python -m src.azure_automl.forecasting --dataset_path $(dataset_path)

run_methods:
@for file in $(TS_FILES); do \
echo "Running methods for $$file"; \
$(MAKE) run_timegpt dataset_path=./data/filtered_datasets/$$file; \
$(MAKE) run_sn dataset_path=./data/filtered_datasets/$$file; \
$(MAKE) run_automl dataset_path=./data/$$file; \
done

download_automl_forecasts:
@python -m src.azure_automl.download_forecasts

evaluate_experiments:
@python -m src.evaluation --datasets_paths "$(shell echo $(FILTERED_TS_FILES) | tr ' ' ',')"

.require-dataset_path:
ifndef dataset_path
$(error dataset_path is required)
endif

9 changes: 9 additions & 0 deletions experiments/azure-automl-forecasting/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Instructions


```
mamba create -n azure-automl-fcst python=3.10
conda activate azure-automl-fcst
pip install uv
uv pip install -r requirements.txt
```
11 changes: 11 additions & 0 deletions experiments/azure-automl-forecasting/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
azure-ai-ml
azure-identity
azureml-core
fire
mltable
nixtlats
pandas
python-dotenv
rich
statsforecast
utilsforecast
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
import json
import logging
import os
import yaml
from tempfile import TemporaryDirectory
from pathlib import Path

import numpy as np
import pandas as pd
from azure.ai.ml import Input
from azure.ai.ml import MLClient
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import AmlCompute, Job
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv

load_dotenv()
logging.basicConfig(level=logging.INFO)
main_logger = logging.getLogger(__name__)

loggers = logging.Logger.manager.loggerDict
for logger_name in loggers:
if logger_name.startswith("azure"):
logger = logging.getLogger(logger_name)
logger.disabled = True
logger.propagate = False


def str_to_datetime(date_str: str) -> pd.Timestamp:
return pd.Timestamp(date_str)


def df_to_parquet_azureml_input(df: pd.DataFrame, dir: str) -> Input:
series_path = Path(dir) / "series.parquet"
df.to_parquet(series_path, index=False)
table_data_input = Input(type=AssetTypes.URI_FOLDER, path=dir)
return table_data_input


def config_to_yaml_azureml_input(config: dict, dir: str) -> Input:
config_path = Path(dir) / "config.yaml"
with open(config_path, "w") as f:
yaml.dump(config, f)
config = Input(type="uri_file", path=str(config_path))
return config


class AzureAutoML:
"""
Before using this class, you need to login to Azure.
Use the following command to login:
$ az login
"""

def __init__(
self,
subscription_id: str,
resource_group_name: str,
workspace_name: str,
):
self.subscription_id = subscription_id
self.resource_group_name = resource_group_name
self.workspace_name = workspace_name

@classmethod
def from_environment(cls) -> "AzureAutoML":
return cls(
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"],
resource_group_name=os.environ["AZURE_RESOURCE_GROUP"],
workspace_name=os.environ["AZURE_WORKSPACE_NAME"],
)

def get_ml_client(self, registry_name: str | None = None) -> MLClient:
kwargs = {}
if not registry_name:
kwargs["workspace_name"] = self.workspace_name
else:
kwargs["registry_name"] = registry_name
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
ml_client = MLClient(
credential=credential,
subscription_id=self.subscription_id,
resource_group_name=self.resource_group_name,
**kwargs,
)
return ml_client

def get_train_and_inference_components(self) -> tuple:
ml_client_reqistry = self.get_ml_client("azureml")
train_component = ml_client_reqistry.components.get(
name="automl_many_models_training",
label="latest",
)
inference_component = ml_client_reqistry.components.get(
name="automl_many_models_inference",
label="latest",
)
return train_component, inference_component

def forecast(
self,
df: pd.DataFrame,
df_test: pd.DataFrame,
aml_compute: AmlCompute,
h: int,
freq: str,
id_col: str = "unique_id",
time_col: str = "ds",
target_col: str = "y",
primary_metric: str = "normalized_root_mean_squared_error",
n_cross_validations: str | int = "auto",
experiment_name: str | None = None,
begin_create_or_update_aml_compute: bool = False,
max_trials: int = 25,
enable_early_stopping: bool = True,
max_nodes: int = 1,
max_concurrency_per_node: int = 1,
forecast_mode: str = "rolling",
retrain_failed_model: bool = False,
) -> str:
if experiment_name is None:
random_id = np.random.randint(10000, 99999)
experiment_name = f"automl-forecasting-job-{random_id}"
ml_client = self.get_ml_client()
train_component, inference_component = self.get_train_and_inference_components()
automl_config_dict = dict(
task="forecasting",
forecast_horizon=h,
forecast_step=h,
frequency=freq,
time_series_id_column_names=id_col,
partition_column_names=[id_col],
time_column_name=time_col,
label_column_name=target_col,
primary_metric=primary_metric,
n_cross_validations=n_cross_validations,
max_trials=max_trials,
enable_early_stopping=enable_early_stopping,
track_child_runs=False,
allow_multi_partitions=False,
# allowed_training_algorithms=["Naive"],
)

@pipeline(description="pipeline for automl forecasting")
def forecasting_pipeline(
training_data: Input,
test_data: Input,
automl_config: Input,
compute_name: str,
):
# training node
training_node = train_component(
raw_data=training_data,
automl_config=automl_config,
max_concurrency_per_node=max_concurrency_per_node,
max_nodes=max_nodes,
retrain_failed_model=retrain_failed_model,
compute_name=compute_name,
)
# inference node
inference_node = inference_component(
raw_data=test_data,
max_nodes=max_nodes,
max_concurrency_per_node=max_concurrency_per_node,
optional_train_metadata=training_node.outputs.run_output,
forecast_mode=forecast_mode,
forecast_step=h,
compute_name=compute_name,
)
return {"forecast_output": inference_node.outputs.raw_predictions}

if begin_create_or_update_aml_compute:
main_logger.info("Begin create or update aml compute")
ml_client.compute.begin_create_or_update(aml_compute).result()

cwd = Path.cwd()
with TemporaryDirectory(dir=cwd) as tmp_dir, TemporaryDirectory(
dir=cwd
) as tmp_dir_test, TemporaryDirectory(dir=cwd) as tmp_dir_config:
main_logger.info("Transforming datasets to parquet")
table_data_input = df_to_parquet_azureml_input(df, dir=tmp_dir)
table_data_input_test = df_to_parquet_azureml_input(
df_test,
dir=tmp_dir_test,
)
automl_config = config_to_yaml_azureml_input(
automl_config_dict,
dir=tmp_dir_config,
)
pipeline_job = forecasting_pipeline(
training_data=table_data_input,
test_data=table_data_input_test,
automl_config=automl_config,
compute_name=aml_compute.name,
)
pipeline_job.settings.default_compute = aml_compute.name
main_logger.info("Begin submitting pipeline job")
returned_pipeline_job = ml_client.jobs.create_or_update(
pipeline_job,
experiment_name=experiment_name,
)
return returned_pipeline_job.name

def get_job(self, job_name: str) -> Job:
ml_client = self.get_ml_client()
job = ml_client.jobs.get(job_name)
return job

def get_job_status(self, job_name: str) -> str | None:
job = self.get_job(job_name)
return job.status

def get_job_total_time(self, job_name: str) -> float | None:
job = self.get_job(job_name)
if job.status == "NotStarted":
main_logger.info(f"Job {job_name} is not started yet")
return None
stages_key = "azureml.pipelines.stages"
if stages_key not in job.properties:
main_logger.info(f"Job {job_name} has no stages yet")
return None
stages = json.loads(job.properties[stages_key])
execution_info = stages["Execution"]
status = execution_info["Status"]
if status == "Failed":
raise Exception(f"Job {job_name} failed")
start_time = str_to_datetime(execution_info["StartTime"])
if "EndTime" not in execution_info:
total_time = pd.Timestamp.now(tz=start_time.tz) - start_time
main_logger.info(
f"Job has status {status}, total time so far: {total_time.total_seconds()}"
)
end_time = str_to_datetime(execution_info["EndTime"])
total_time = end_time - start_time
return total_time.total_seconds()

def get_forecast_df(self, job_name: str) -> pd.DataFrame | None:
job_status = self.get_job_status(job_name)
if job_status != "Completed":
main_logger.info(f"Job {job_name} is not completed yet")
return None
ml_client = self.get_ml_client()
cwd = Path.cwd()
with TemporaryDirectory(dir=cwd) as tmp_dir:
ml_client.jobs.download(
job_name,
download_path=tmp_dir,
output_name="forecast_output",
)
output_path = Path(tmp_dir) / "named-outputs" / "forecast_output"
forecast_df = pd.read_parquet(output_path)
return forecast_df
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
from pathlib import Path

import fire

from .automl_handler import AzureAutoML
from .forecasting import AzureAutoMLJobs
from src.utils.data_handler import ForecastDataset

logging.basicConfig(level=logging.INFO)
main_logger = logging.getLogger(__name__)


def download_forecasts(dir: str = "./results"):
azure_automl = AzureAutoML.from_environment()
azure_automl_experiments = AzureAutoMLJobs()
results_path = Path(dir) / "azure_automl"

jobs_df = azure_automl_experiments.get_jobs_df()
jobs_df = jobs_df.sort_values("created_at", ascending=False).drop_duplicates(
"experiment_name"
)

for _, row in jobs_df.iterrows():
experiment_name = row.experiment_name
job_name = row.job_name
main_logger.info(
f"Downloading forecasts for experiment {experiment_name} and job {job_name}"
)
try:
forecast_df = azure_automl.get_forecast_df(job_name)
total_time = azure_automl.get_job_total_time(job_name)
except Exception:
main_logger.info(
f"Failed to download forecasts for experiment {experiment_name} and job {job_name}"
)
continue
if forecast_df is None:
main_logger.info(
f"Failed to download forecasts for experiment {experiment_name} and job {job_name}"
"probably because the job is not finished yet or failed"
)
continue
fcst_dataset = ForecastDataset(forecast_df=forecast_df, total_time=total_time)
experiment_name = row.experiment_name
fcst_dataset.save_to_dir(results_path / experiment_name)
main_logger.info(
f"Saved forecasts for experiment {experiment_name} and job {job_name}"
)


if __name__ == "__main__":
fire.Fire(download_forecasts)
Loading
Loading