From 1d0c9d3c9d3c318c1b1a6c16c6ec5b50aebf71d4 Mon Sep 17 00:00:00 2001 From: msweier Date: Thu, 12 Dec 2024 15:26:31 -0600 Subject: [PATCH] aprfc qte product --- dags/cumulus/aprfc_qte_01h.py | 126 ++++++++++++++++++++++++++++++++++ plugins/helpers/cumulus.py | 1 + 2 files changed, 127 insertions(+) create mode 100644 dags/cumulus/aprfc_qte_01h.py diff --git a/dags/cumulus/aprfc_qte_01h.py b/dags/cumulus/aprfc_qte_01h.py new file mode 100644 index 0000000..9ceef63 --- /dev/null +++ b/dags/cumulus/aprfc_qte_01h.py @@ -0,0 +1,126 @@ +""" +Acquire and Process APRFC QTE 01h + +Returns +------- +Airflow DAG + Directed Acyclic Graph +""" + +from datetime import datetime, timedelta +import json +from string import Template +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context +from airflow.utils.task_group import TaskGroup +from helpers.downloads import trigger_download + +import helpers.cumulus as cumulus + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": (datetime.utcnow() - timedelta(hours=48)).replace(minute=0, second=0, microsecond=0), + "catchup_by_default": False, + "email_on_failure": False, + "email_on_retry": False, + "retries": 5, + "retry_delay": timedelta(minutes=30), +} + + +@dag( + default_args=default_args, + tags=["cumulus", "AIRTEMP", "QTE", "APRFC"], + schedule="45 * * * *", + max_active_runs=2, + max_active_tasks=4, +) +def cumulus_aprfc_qte_01h(): + """ + # APRFC hourly estimated temps + + This pipeline handles download, processing, and derivative product creation for APRFC hourly estimated temps + Raw data downloaded to S3 and notifies the Cumulus API of new product(s) + + URLs: + - BASE - https://nomads.ncep.noaa.gov/pub/data/nccf/com/urma/prod/akurma.YYYYMMDD/ + + Filename/Dir Pattern: + + URL Dir - https://nomads.ncep.noaa.gov/pub/data/nccf/com/urma/prod/akurma.YYYYMMDD/ + Files matching akurma.tHHz.2dvaranl_ndfd_3p0.grb2 - 1 hour\n + """ + s3_bucket = cumulus.S3_BUCKET + key_prefix = cumulus.S3_ACQUIRABLE_PREFIX + + + + URL_ROOT = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/urma/prod/" + PRODUCT_SLUG = "aprfc-qte-01h" + + filename_template = Template( + "akurma.t${hr_}z.2dvaranl_ndfd_3p0.grb2 " + ) + + url_suffix_template = Template( + "akurma.${date_}" + ) + + + @task() + def download_raw_qte(): + logical_date = get_current_context()["logical_date"] + date_only = logical_date.strftime("%Y%m%d") + + url_suffix = url_suffix_template.substitute( + date_=date_only, + ) + + filename = filename_template.substitute( + hr_=logical_date.strftime("%H"), + ) + + file_dir = f"{URL_ROOT}{url_suffix}" + + + s3_filename = f'{date_only}_{filename}' + s3_key = f"{key_prefix}/{PRODUCT_SLUG}/{s3_filename}" + + + + + print(f"Downloading file: {filename}") + + trigger_download( + url=f"{file_dir}/{filename}", s3_bucket=s3_bucket, s3_key=s3_key + ) + return json.dumps( + { + "execution": logical_date.isoformat(), + "s3_key": s3_key, + "filename": s3_filename, + } + ) + + + + + + + @task() + def notify_cumulus(payload): + payload = json.loads(payload) + print("Notifying Cumulus: " + payload["filename"]) + cumulus.notify_acquirablefile( + acquirable_id=cumulus.acquirables[PRODUCT_SLUG], + datetime=payload["execution"], + s3_key=payload["s3_key"], + ) + + notify_cumulus(download_raw_qte()) + + + + +aprfc_qte_dag = cumulus_aprfc_qte_01h() diff --git a/plugins/helpers/cumulus.py b/plugins/helpers/cumulus.py index 7deac66..6f182f2 100644 --- a/plugins/helpers/cumulus.py +++ b/plugins/helpers/cumulus.py @@ -14,6 +14,7 @@ "abrfc-qpf-06h": "b1a4754c-5971-11ee-8c99-0242ac120002", "aprfc-qpe-06h": "1f67d822-7cbc-11ee-b962-0242ac120002", "aprfc-qpf-06h": "a64cb16f-01a8-45c0-a069-9afda805d3a7", + "aprfc-qte-01h": "7f8b2d6a-1f3e-11ee-be56-0242ac120002", "cnrfc-qpe-06h": "34a89c35-090d-46e8-964a-c621403301b9", "cnrfc-qpf-06h": "c22785cd-400e-4664-aef8-426734825c2c", "cnrfc-nbm-qpf-06h": "40cfce36-cfad-4a10-8b2d-eb8862378ca5",