Skip to content

Commit

Permalink
WIP begin implementing use of harmony-py
Browse files Browse the repository at this point in the history
Subset orders can now be submitted and the job completes. Some next steps:

* Support downloads from harmony
* Support non-subset downloads. This might be a little more complicated, as it
does not appear that the harmony API can support non-subset orders (it always
uses spatial/temporal constraints to do subsetting,  not just filtering). So we
may need to use `earthaccess` to download our granule list instead.
* Remove code supporting variable subsetting. This is not supported by Harmony,
but could be in the future.
  • Loading branch information
trey-stafford committed Nov 12, 2024
1 parent e7ff043 commit d80a4ce
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 199 deletions.
28 changes: 14 additions & 14 deletions icepyx/core/APIformatting.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""Generate and format information for submitting to API (CMR and NSIDC)."""

import datetime as dt
from typing import Any, Generic, Literal, Optional, TypeVar, Union, overload
from typing import Any, Generic, Literal, Optional, TypeVar, overload

from icepyx.core.exceptions import ExhaustiveTypeGuardException, TypeGuardException
from icepyx.core.types import (
from icepyx.core.harmony import HarmonyTemporal
from icepyx.core.types.api import (
CMRParams,
EGIParamsSubset,
EGIRequiredParams,
)

# ----------------------------------------------------------------------
Expand Down Expand Up @@ -38,18 +37,17 @@ def _fmt_temporal(start, end, key):
assert isinstance(start, dt.datetime)
assert isinstance(end, dt.datetime)

if key == "temporal":
if key == "temporal": # search option.
fmt_timerange = (

Check failure on line 41 in icepyx/core/APIformatting.py

View workflow job for this annotation

GitHub Actions / test

Type "str" is not assignable to declared type "HarmonyTemporal"   "str" is not assignable to "HarmonyTemporal" (reportAssignmentType)
start.strftime("%Y-%m-%dT%H:%M:%SZ")
+ ","
+ end.strftime("%Y-%m-%dT%H:%M:%SZ")
)
elif key == "time":
fmt_timerange = (
start.strftime("%Y-%m-%dT%H:%M:%S")
+ ","
+ end.strftime("%Y-%m-%dT%H:%M:%S")
)
elif key == "time": # subsetting option.
# Format for harmony. This will do subsetting.
# TODO: change `key` to something more clear. `temporal` is the key
# passed into Harmony, so this is very confusing!
fmt_timerange: HarmonyTemporal = {"start": start, "stop": end}
else:
raise ValueError("An invalid time key was submitted for formatting.")

Expand Down Expand Up @@ -212,20 +210,22 @@ def __get__(
self,
instance: 'Parameters[Literal["required"]]',
owner: Any,
) -> EGIRequiredParams: ...
): # -> EGIRequiredParams: ...
...

@overload
def __get__(
self,
instance: 'Parameters[Literal["subset"]]',
owner: Any,
) -> EGIParamsSubset: ...
): # -> EGIParamsSubset: ...
...

def __get__(
self,
instance: "Parameters",
owner: Any,
) -> Union[CMRParams, EGIRequiredParams, EGIParamsSubset]:
) -> CMRParams:
"""
Returns the dictionary of formatted keys associated with the
parameter object.
Expand Down
227 changes: 65 additions & 162 deletions icepyx/core/granules.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@
import re
import time
from typing import Union
from xml.etree import ElementTree as ET
import zipfile

import numpy as np
import requests
from requests.compat import unquote

import icepyx.core.APIformatting as apifmt
from icepyx.core.auth import EarthdataAuthMixin
from icepyx.core.cmr import CMR_PROVIDER
from icepyx.core.cmr import CMR_PROVIDER, get_concept_id
import icepyx.core.exceptions
from icepyx.core.types import (
from icepyx.core.harmony import HarmonyApi
from icepyx.core.types.api import (
CMRParams,
EGIRequiredParamsDownload,
EGIRequiredParamsSearch,
)
from icepyx.core.urls import DOWNLOAD_BASE_URL, GRANULE_SEARCH_BASE_URL, ORDER_BASE_URL
from icepyx.uat import EDL_ACCESS_TOKEN
from icepyx.core.urls import DOWNLOAD_BASE_URL, GRANULE_SEARCH_BASE_URL

# TODO: mix this into existing classes rather than declaring as a global
# variable.
HARMONY_API = HarmonyApi()


def info(grans: list[dict]) -> dict[str, Union[int, float]]:
Expand Down Expand Up @@ -191,7 +191,6 @@ def __init__(
def get_avail(
self,
CMRparams: CMRParams,
reqparams: EGIRequiredParamsSearch,
cloud: bool = False,
):
"""
Expand Down Expand Up @@ -222,24 +221,20 @@ def get_avail(
query.Query.avail_granules
"""

assert (
CMRparams is not None and reqparams is not None
), "Missing required input parameter dictionaries"
assert CMRparams is not None, "Missing required input parameter dictionary"

# if not hasattr(self, 'avail'):
self.avail = []

headers = {
"Accept": "application/json",
"Client-Id": "icepyx",
"Authorization": f"Bearer {EDL_ACCESS_TOKEN}",
}
# note we should also check for errors whenever we ping NSIDC-API -
# make a function to check for errors

params = apifmt.combine_params(
CMRparams,
{k: reqparams[k] for k in ["short_name", "version", "page_size"]},
{"provider": CMR_PROVIDER},
)

Expand Down Expand Up @@ -292,7 +287,7 @@ def get_avail(
def place_order(
self,
CMRparams: CMRParams,
reqparams: EGIRequiredParamsDownload,
reqparams, # : EGIRequiredParamsDownload,
subsetparams,
verbose,
subset=True,
Expand Down Expand Up @@ -337,169 +332,77 @@ def place_order(
--------
query.Query.order_granules
"""
raise icepyx.core.exceptions.RefactoringException
# raise icepyx.core.exceptions.RefactoringException

self.get_avail(CMRparams, reqparams)

# TODO: the harmony API may not support non-subsetting. So we may need
# to provide a list of granules for harmony to download, or use a
# different API.
if subset is False:
request_params = apifmt.combine_params(
CMRparams, reqparams, {"agent": "NO"}
)
else:
request_params = apifmt.combine_params(CMRparams, reqparams, subsetparams)

order_fn = ".order_restart"
concept_id = get_concept_id(
product=request_params["short_name"], version=request_params["version"]
)

total_pages = int(np.ceil(len(self.avail) / reqparams["page_size"]))
print(
"Total number of data order requests is ",
total_pages,
" for ",
len(self.avail),
" granules.",
# TODO: At this point, the request parameters have been formatted into
# strings. `harmony-py` expects python objects (e.g., `dt.datetime` for
# temporal values)

# Place the order.
# TODO: there are probably other options we want to more generically
# expose here. E.g., instead of just accepting a `bounding_box` of a
# particular flavor, we want to be able to pass in a polygon?
job_id = HARMONY_API.place_order(
concept_id=concept_id,
# TODO: why the double-nested bbox dict here?
bounding_box=subsetparams["bbox"]["bbox"],
temporal=subsetparams["time"],
)

if reqparams["page_num"] > 0:
pagenums = [reqparams["page_num"]]
else:
pagenums = range(1, total_pages + 1)
# TODO/Question: should this be changed from a list to a single value?
# There will only be one harmony job per request (I think)
self.orderIDs = [job_id]
order_fn = ".order_restart"
with open(order_fn, "w") as fid:
json.dump({"orderIDs": self.orderIDs}, fid)

for page_num in pagenums:
print("order ID: ", job_id)
status = HARMONY_API.check_order_status(job_id)
print("Initial status of your harmony order request: ", status["status"])
# TODO: confirm which status responses we might expect. "running",
# "paused", or "canceled" are documented here:
# https://harmony.earthdata.nasa.gov/docs#getting-job-status
# I have also seen `running` and `running_with_errors`.
while status["status"].startswith("running"):
print(
"Data request ",
page_num,
" of ",
total_pages,
" is submitting to NSIDC",
"Your harmony order status is still ",
status["status"],
". Please continue waiting... this may take a few moments.",
)
breakpoint()
request_params.update({"page_num": page_num})

request = self.session.get(ORDER_BASE_URL, params=request_params)

# DevGoal: use the request response/number to do some error handling/
# give the user better messaging for failures
# print(request.content)
# root = ET.fromstring(request.content)
# print([subset_agent.attrib for subset_agent in root.iter('SubsetAgent')])

if verbose is True:
print("Request HTTP response: ", request.status_code)
# print('Order request URL: ', request.url)

# Raise bad request: Loop will stop for bad response code.
request.raise_for_status()
esir_root = ET.fromstring(request.content)
if verbose is True:
print("Order request URL: ", unquote(request.url))
print(
"Order request response XML content: ",
request.content.decode("utf-8"),
)

# Look up order ID
orderlist = []
for order in esir_root.findall("./order/"):
# if verbose is True:
# print(order)
orderlist.append(order.text)
orderID = orderlist[0]
print("order ID: ", orderID)

# Create status URL
statusURL = f"{ORDER_BASE_URL}/{orderID}"
if verbose is True:
print("status URL: ", statusURL)

# Find order status
request_response = self.session.get(statusURL)
if verbose is True:
print(
"HTTP response from order response URL: ",
request_response.status_code,
)

# Raise bad request: Loop will stop for bad response code.
request_response.raise_for_status()
request_root = ET.fromstring(request_response.content)
statuslist = []
for status in request_root.findall("./requestStatus/"):
statuslist.append(status.text)
status = statuslist[0]
print("Initial status of your order request at NSIDC is: ", status)

loop_root = None
# If status is already finished without going into pending/processing
if status.startswith("complete"):
loop_response = self.session.get(statusURL)
loop_root = ET.fromstring(loop_response.content)

# Continue loop while request is still processing
while status == "pending" or status == "processing":
print(
"Your order status is still ",
status,
" at NSIDC. Please continue waiting... this may take a few moments.",
)
# print('Status is not complete. Trying again')
time.sleep(10)
loop_response = self.session.get(statusURL)

# Raise bad request: Loop will stop for bad response code.
loop_response.raise_for_status()
loop_root = ET.fromstring(loop_response.content)

# find status
statuslist = []
for status in loop_root.findall("./requestStatus/"):
statuslist.append(status.text)
status = statuslist[0]
# print('Retry request status is: ', status)
if status == "pending" or status == "processing":
continue

if not isinstance(loop_root, ET.Element):
# The typechecker needs help knowing that at this point loop_root is
# set, as it can't tell that the conditionals above are supposed to be
# exhaustive.
raise icepyx.core.exceptions.ExhaustiveTypeGuardException

# Order can either complete, complete_with_errors, or fail:
# Provide complete_with_errors error message:
if status == "complete_with_errors" or status == "failed":
messagelist = []
for message in loop_root.findall("./processInfo/"):
messagelist.append(message.text)
print("Your order is: ", status)
print("NSIDC provided these error messages:")
pprint.pprint(messagelist)

if status == "complete" or status == "complete_with_errors":
print("Your order is:", status)
messagelist = []
for message in loop_root.findall("./processInfo/info"):
messagelist.append(message.text)
if messagelist != []:
print("NSIDC returned these messages")
pprint.pprint(messagelist)
if not hasattr(self, "orderIDs"):
self.orderIDs = []

self.orderIDs.append(orderID)
else:
print("Request failed.")

# DevGoal: save orderIDs more frequently than just at the end for large orders
# (e.g. for len(reqparams['page_num']) > 5 or 10 or something)
# Save orderIDs to file to avoid resubmitting order in case kernel breaks down.
# save orderIDs for every 5 orders when more than 10 orders are submitted.
if reqparams["page_num"] >= 10:
with open(order_fn, "w") as fid:
json.dump({"orderIDs": self.orderIDs}, fid)

# --- Output the final orderIDs
with open(order_fn, "w") as fid:
json.dump({"orderIDs": self.orderIDs}, fid)
# Requesting the status too often can result in a 500 error.
time.sleep(5)
status = HARMONY_API.check_order_status(job_id)

if status["status"] == "complete_with_errors" or status["status"] == "failed":
print("Your order is: ", status["status"])
print("Harmony provided these error messages:")
pprint.pprint(status["errors"])

# TODO: consider always printing the status message. There's no need for
# this check, and the message is relevant regardless of if there are
# errors or not. We could check for a failure status instead.
if status["status"] == "complete" or status["status"] == "complete_with_errors":
print("Your order is:", status["status"])
print("Harmony returned this message:")
pprint.pprint(status["message"])
else:
print(f"Request failed with status {status['status']}.")

return self.orderIDs

Expand Down
Loading

0 comments on commit d80a4ce

Please sign in to comment.