-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a941571
commit 2f85e8c
Showing
8 changed files
with
232 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
from typing import Iterable | ||
|
||
import httpx | ||
from pystac import Collection, Item | ||
|
||
DEFAULT_HEADERS = {"Content-Type": "application/json"} | ||
|
||
|
||
def load_stac_api_items( | ||
url: str, | ||
items: Iterable[Item], | ||
headers: dict[str, str] | None = None, | ||
verify: bool = True, | ||
update: bool = False, | ||
skip_existing: bool = False, | ||
) -> Iterable[Item]: | ||
"""Load multiple items into a STAC API | ||
Args: | ||
url (str): STAC API url | ||
items (Iterable[Item]): A collection of STAC Items | ||
headers (dict[str, str] | None, optional): Headers to add to the request. Defaults to None. | ||
verify (bool, optional): Verify SSL request. Defaults to True. | ||
update (bool, optional): Update STAC Item with new content. Defaults to False. | ||
skip_existing (bool, optional): Skip Item if exists. Defaults to False. | ||
""" | ||
if not headers: | ||
headers = DEFAULT_HEADERS | ||
|
||
for item in items: | ||
collection_id = item.collection_id | ||
items_endpoint = f"{url}/collections/{collection_id}/items" | ||
response = httpx.post( | ||
items_endpoint, | ||
json=item.to_dict(), | ||
headers=headers, | ||
verify=verify, | ||
) | ||
if response.status_code == 409: | ||
if update: | ||
item_endpoint = f"{items_endpoint}/{item.id}" | ||
response = httpx.put( | ||
item_endpoint, json=item.to_dict(), headers=headers, verify=verify | ||
) | ||
if skip_existing: | ||
continue | ||
response.raise_for_status() | ||
yield item | ||
|
||
|
||
def load_stac_api_collections( | ||
url: str, | ||
collections: Iterable[Collection], | ||
headers: dict[str, str] | None = None, | ||
verify: bool = True, | ||
update: bool = False, | ||
skip_existing: bool = False, | ||
) -> Iterable[Collection]: | ||
"""Load multiple collections to a stac API | ||
Args: | ||
url (str): STAC API URL | ||
collections (Iterable[Collection]): A collection of STAC Collections | ||
headers (dict[str, str] | None, optional): Additional headers to send. Defaults to None. | ||
verify (bool, optional): Verify TLS request. Defaults to True. | ||
update (bool, optional): Update the destination Collections. Defaults to False. | ||
skip_existing (bool, optional): Skip existing Collections. Defaults to False. | ||
Returns: | ||
Iterable[Collection]: | ||
""" | ||
|
||
if not headers: | ||
headers = DEFAULT_HEADERS | ||
|
||
collections_endpoint = f"{url}/collections" | ||
for collection in collections: | ||
response = httpx.post( | ||
collections_endpoint, | ||
json=collection.to_dict(), | ||
headers=headers, | ||
verify=verify, | ||
) | ||
if response.status_code == 409: | ||
if update: | ||
collection_endpoint = f"{collections_endpoint}/{collection.id}" | ||
response = httpx.put( | ||
collection_endpoint, | ||
json=collection.to_dict(), | ||
headers=headers, | ||
verify=verify, | ||
) | ||
if skip_existing: | ||
continue | ||
|
||
response.raise_for_status() | ||
yield collection |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,60 @@ | ||
import json | ||
from pathlib import Path | ||
|
||
import pytest | ||
import respx | ||
from httpx import Response | ||
from pytest_mock import MockerFixture | ||
from test_data import STAC_COLLECTIONS, STAC_ITEMS | ||
|
||
|
||
@pytest.fixture() | ||
def mock_stac_collections(mocker: MockerFixture): | ||
def mock_extract_stac_collections(mocker: MockerFixture): | ||
client_mock = mocker.patch("eodm.extract.pystac_client.Client") | ||
client_mock.open().get_collections().__iter__.return_value = STAC_COLLECTIONS | ||
|
||
|
||
@pytest.fixture() | ||
def mock_stac_items(mocker: MockerFixture): | ||
def mock_extract_stac_items(mocker: MockerFixture): | ||
client_mock = mocker.patch("eodm.extract.pystac_client.Client") | ||
client_mock.open().search().item_collection().__iter__.return_value = STAC_ITEMS | ||
|
||
|
||
@pytest.fixture() | ||
def stac_collections(tmp_path: Path): | ||
collection_json_path = tmp_path / "collections" | ||
|
||
with collection_json_path.open("a", encoding="utf-8", newline="\n") as f: | ||
for collection in STAC_COLLECTIONS: | ||
f.write(json.dumps(collection.to_dict())) | ||
f.write("\n") | ||
|
||
return str(collection_json_path) | ||
|
||
|
||
@pytest.fixture() | ||
def stac_items(tmp_path): | ||
item_json_path = tmp_path / "items" | ||
|
||
with item_json_path.open("a", encoding="utf-8", newline="\n") as f: | ||
for item in STAC_ITEMS: | ||
f.write(json.dumps(item.to_dict())) | ||
f.write("\n") | ||
|
||
return str(item_json_path) | ||
|
||
|
||
@pytest.fixture() | ||
def mock_post_stac_api_items_endpoint(respx_mock: respx.MockRouter): | ||
post_mock = respx_mock.post( | ||
"https://example.com/stac-api/collections/sentinel-2-l2a/items" | ||
).mock(return_value=Response(204)) | ||
return post_mock | ||
|
||
|
||
@pytest.fixture() | ||
def mock_post_stac_api_collections_endpoint(respx_mock: respx.MockRouter): | ||
post_mock = respx_mock.post("https://example.com/stac-api/collections").mock( | ||
return_value=Response(204) | ||
) | ||
return post_mock |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters