Skip to content

Commit

Permalink
Merge pull request #191 from gisaia/feat/stacsync2
Browse files Browse the repository at this point in the history
Feat/stacsync2
  • Loading branch information
sylvaingaudan authored Nov 8, 2024
2 parents a474d23 + 677e060 commit 749d2f0
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 23 deletions.
16 changes: 16 additions & 0 deletions Dockerfile-stac-geodes
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.10.13-alpine3.19
RUN apk update && apk add --upgrade libexpat
RUN apk --no-cache add curl
COPY test/requirements.txt /app/requirements.txt

WORKDIR /app

RUN pip3 install -r requirements.txt

COPY stac /app/stac
COPY ./airs /app/airs
COPY ./common /app/common

ENV PYTHONPATH=/app

ENTRYPOINT ["python3.10", "-m", "stac.geodes_sync"]
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Functions for ingestion:
- Asynchronously register one archive (`/processes/ingest`) or a directory containing archives (`/processes/directory_ingest`) : ARLAS Processing (APROC)
- List files and archives from a directory: File and Archive Management (FAM)

Note: some STAC synchronisation scrips are provided. See [STAC Synchronisation](#stac-synchronisation)

Functions for download:
- Asynchronously download one or several archives (`/processes/download`) : ARLAS Processing (APROC)
Expand Down Expand Up @@ -581,3 +582,30 @@ To run the tests (this will also start the stack):
```shell
./test/tests.sh
```


# STAC Synchronisation

The following synchronisations are available:
- [GEODES](https://geodes.cnes.fr/)


## GEODES

To ingest products from the GEODES catalogue into AIRS, the process needs to access the AIRS service. The simplest way is to run the docker container within the same network as AIRS. Below is an example:

```shell
docker run --rm \
-v `pwd`:/app/ \
--network arlas-net aia/stac-geodes:latest \
add https://geodes-portal.cnes.fr/api/stac/items http://airs-server:8000/airs geodes S2L1C \
--data-type PEPS_S2_L1C \
--data-type MUSCATE_SENTINEL2_SENTINEL2_L2A \
--data-type MUSCATE_Snow_SENTINEL2_L2B-SNOW \
--data-type MUSCATE_WaterQual_SENTINEL2_L2B-WATER \
--data-type MUSCATE_SENTINEL2_SENTINEL2_L3A \
--product-level L1C \
--max 1000
```

To get some help, simply run `docker run --rm --network arlas-net gisaia/stac-geodes:latest add https://geodes-portal.cnes.fr/api/stac/items --help`
4 changes: 3 additions & 1 deletion airs/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ def __init__(self, items:list[str],reason:str) -> None:

class InternalError(Exception):
component:str
msg: str
reason:str

def __init__(self, component:str,reason:str) -> None:
def __init__(self, component:str, msg: str, reason:str) -> None:
self.component=component
self.reason=reason
self.msg = msg
3 changes: 3 additions & 0 deletions airs/core/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ class Properties(BaseModel, extra=Extra.allow):
programme: str | None = Field(default=None, title="Name of the programme")
constellation: str | None = Field(default=None, title="Name of the constellation")
satellite: str | None = Field(default=None, title="Name of the satellite")
platform: str | None = Field(default=None, title="Name of the satellite platform")
instrument: str | None = Field(default=None, title="Name of the instrument")
sensor: str | None = Field(default=None, title="Name of the sensor")
sensor_mode: str | None = Field(default=None, title="Mode of the sensor during acquisition")
sensor_type: str | None = Field(default=None, title="Type of sensor")
annotations: str | None = Field(default=None, title="Human annotations for the item")
gsd: float | None = Field(default=None, title="Ground Sampling Distance (resolution)")
Expand Down Expand Up @@ -274,6 +276,7 @@ class Properties(BaseModel, extra=Extra.allow):
acq__acquisition_mode: str | None = Field(default=None, title="The name of the acquisition mode.")
acq__acquisition_orbit_direction: str | None = Field(default=None, title="Acquisition orbit direction (ASCENDING or DESCENDING).")
acq__acquisition_type: str | None = Field(default=None, title="Acquisition type (STRIP)")
acq__acquisition_orbit: float | None = Field(default=None, title="Acquisition orbit")
acq__across_track: float | None = Field(default=None, title="Across track angle")
acq__along_track: float | None = Field(default=None, title="Along track angle")
acq__archiving_date: Datetime | None = Field(default=None, title="Archiving date")
Expand Down
27 changes: 16 additions & 11 deletions airs/core/product_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,21 @@ def __upload_file(key, file, content_type)->str:
LOGGER.info("{} uploaded.".format(key))
return key
except Exception as e:
LOGGER.error("Failed to upload {}".format(key))
msg = "Failed to upload {}".format(key)
LOGGER.error(msg)
LOGGER.exception(e)
raise exceptions.InternalError("storage", e)
raise exceptions.InternalError("storage", msg, e)

def __delete_file(key):
try:
LOGGER.info("deleting {} ...".format(key))
s3.get_client().delete_object(Bucket=Configuration.settings.s3.bucket, Key=key)
LOGGER.info("{} deleted.".format(key))
except Exception as e:
LOGGER.error("Failed to delete {} ".format(key))
msg = "Failed to delete {} ".format(key)
LOGGER.error(msg)
LOGGER.exception(e)
raise exceptions.InternalError("storage", e)
raise exceptions.InternalError("storage", msg, e)

def __delete_prefix(prefix:str):
try:
Expand All @@ -153,11 +155,12 @@ def __delete_prefix(prefix:str):
LOGGER.info("{} deleted.".format(object["Key"]))
LOGGER.info("{} deleted.".format(prefix))
except Exception as e:
LOGGER.error("Failed to delete {} ".format(prefix))
msg = "Failed to delete {} ".format(prefix)
LOGGER.error(msg)
LOGGER.exception(e)
raise exceptions.InternalError("storage", e)
raise exceptions.InternalError("storage", msg, e)

def __upload_item(key, item:Item)->str:
def __upload_item(key, item: Item) -> str:
try:
LOGGER.info("uploading {} ...".format(key))
s3.get_client().put_object(
Expand All @@ -169,9 +172,10 @@ def __upload_item(key, item:Item)->str:
LOGGER.info("{} uploaded.".format(key))
return key
except Exception as e:
LOGGER.error("Failed to upload {} on bucket {}".format(key, Configuration.settings.s3.bucket))
msg = "Failed to upload {} on {}".format(key, Configuration.settings.s3.bucket)
LOGGER.error(msg)
LOGGER.exception(e)
raise exceptions.InternalError("storage", e)
raise exceptions.InternalError("storage", msg, e)

def asset_exists(collection:str, item_id:str, asset_name:str)->bool:
""" check whether the asset exists or not on the configured S3
Expand Down Expand Up @@ -210,8 +214,9 @@ def __fetch_mapping__():
if r.ok:
return r.json()["mappings"]
else:
LOGGER.error("Can not fetch the mapping for creating the ARLAS index. Aborting ...")
raise exceptions.InternalError("elasticsearch", r.reason)
msg = "Can not fetch the mapping for creating the ARLAS index. Aborting ..."
LOGGER.error(msg)
raise exceptions.InternalError("elasticsearch", msg, r.reason)
else:
with open(Configuration.settings.arlaseo_mapping_url) as f:
return json.load(f)["mappings"]
Expand Down
17 changes: 10 additions & 7 deletions common/exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ def validation_exception_handler(req: Request, exc: RequestValidationError):
# Format the detail of the error message
detail = ""
for error in exc.errors():
loc = error["loc"][1]
for i in range(2, len(error["loc"])):
if isinstance(error["loc"][i], str):
loc += f'.{error["loc"][i]}'
elif isinstance(error["loc"][i], int):
loc += f'[{str(error["loc"][i])}]'
detail += f'{loc}: {error["msg"]}\n'
if len(error["loc"]) > 1:
loc = error["loc"][1]
for i in range(2, len(error["loc"])):
if isinstance(error["loc"][i], str):
loc += f'.{error["loc"][i]}'
elif isinstance(error["loc"][i], int):
loc += f'[{str(error["loc"][i])}]'
detail += f'{loc}: {error["msg"]}\n'
elif len(error["loc"]) > 0:
detail += f'{error["loc"][0]}: {error["msg"]}\n'
detail = detail[:-1]

return JSONResponse(content=RESTException(
Expand Down
4 changes: 2 additions & 2 deletions conf/airs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ s3:
# if you don't use aws s3, then please prov
endpoint_url: $AIRS_S3_ENDPOINT_URL

arlaseo_mapping_url: $AIRS_MAPPING_URL|"https://raw.githubusercontent.com/gisaia/ARLAS-EO/v0.0.6/mapping.json"
arlaseo_collection_url: $AIRS_COLLECTION_URL|"https://raw.githubusercontent.com/gisaia/ARLAS-EO/v0.0.6/collection.json"
arlaseo_mapping_url: $AIRS_MAPPING_URL|"https://raw.githubusercontent.com/gisaia/ARLAS-EO/v0.0.7/mapping.json"
arlaseo_collection_url: $AIRS_COLLECTION_URL|"https://raw.githubusercontent.com/gisaia/ARLAS-EO/v0.0.7/collection.json"
4 changes: 2 additions & 2 deletions conf/download_drivers.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
arlas_url_search: $ARLAS_URL_SEARCH
arlaseo_mapping_url: $ARLASEO_MAPPING_URL|"https://raw.githubusercontent.com/gisaia/ARLAS-EO/v0.0.6/mapping.json"
download_mapping_url: $DOWNLOAD_MAPPING_URL|"https://raw.githubusercontent.com/gisaia/ARLAS-EO/v0.0.6/downloads_mapping.json"
arlaseo_mapping_url: $ARLASEO_MAPPING_URL|"https://raw.githubusercontent.com/gisaia/ARLAS-EO/v0.0.7/mapping.json"
download_mapping_url: $DOWNLOAD_MAPPING_URL|"https://raw.githubusercontent.com/gisaia/ARLAS-EO/v0.0.7/downloads_mapping.json"


clean_outbox_directory: $CLEAN_DOWNLOAD_OUTBOX_DIR|True
Expand Down
4 changes: 4 additions & 0 deletions release/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ build_and_publish_docker agate
# DOCKER
build_and_publish_docker airs

#--------------- GEODES ----------------
# DOCKER
build_and_publish_docker stac-geodes


# PYTHON PIP
export PYTHONPATH=`pwd`:`pwd`/extensions:`pwd`/test
Expand Down
191 changes: 191 additions & 0 deletions stac/geodes_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import json
import sys

import requests
import typer

from airs.core.models.mapper import item_from_dict, to_json
from airs.core.models.model import Asset, AssetFormat, Item, Role

requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
app = typer.Typer(add_completion=False, no_args_is_help=True)


def to_item(feature, extra_params={}) -> Item:
feature["centroid"] = [
feature.get("centroid").get("lon"),
feature.get("centroid").get("lat"),
]
# CLEANING
feature.pop("stac_version")
feature.pop("stac_extensions")
feature.pop("links")

# TO ITEM
item = item_from_dict(feature)
item.collection = extra_params.get("collection")
item.catalog = extra_params.get("catalog")
item.properties.programme = "Copernicus"
item.properties.constellation = "Sentinel-2"
item.properties.processing__level = feature.get("properties").get("spaceborne:productLevel")
item.properties.sensor = feature.get("properties").get("spaceborne:sensorMode")
item.properties.data_type = feature.get("properties").get("dataType")
item.properties.item_type = feature.get("properties").get("spaceborne:productType")
item.properties.eo__cloud_cover = feature.get("properties").get("spaceborne:cloudCover")
item.properties.platform = feature.get("properties").get("spaceborne:satellitePlatform")
item.properties.sensor = feature.get("properties").get("spaceborne:satelliteSensor")
item.properties.acq__acquisition_orbit = feature.get("properties").get("spaceborne:orbitID")
item.properties.acq__acquisition_orbit_direction = feature.get("properties").get("spaceborne:orbitDirection")

for name, asset in item.assets.items():
asset: Asset = asset
asset.airs__managed = False

# ASSETS
assets = {}
md_count = 1
for name, asset in item.assets.items():
name = None
if asset.roles and len(asset.roles) > 0:
name = "-".join(asset.roles)
if name == Role.data.value:
if asset.type == "application/xml":
name = Role.metadata.value + "-" + str(md_count)
md_count = md_count + 1
asset.asset_format = AssetFormat.xml.value
asset.roles = [Role.metadata.value]
if asset.type == "application/zip":
name = Role.data.value
asset.asset_format = AssetFormat.zip.value
asset.roles = [Role.data.value]
if name:
asset.name = name
assets[name] = asset
item.assets = assets

# LOCATIONS
item.properties.locations = []
for continent in item.properties.model_extra.get("spaceborne__political", {}).get("continents", []):
item.properties.locations.append(continent.get("name"))
for country in continent.get("countries", []):
item.properties.locations.append(country.get("name"))
for region in country.get("regions", []):
item.properties.locations.append(country.get("name"))
if item.properties.model_extra.get("spaceborne__political"):
item.properties.model_extra.pop("spaceborne__political")
if item.properties.model_extra.get("spaceborne__references"):
item.properties.model_extra.pop("spaceborne__references")
item.properties.model_extra.clear()
return item


def search(stac_url: str, start_date: int, end_date: int, data_type: list[str], product_level: list[str], bbox: list[float], max_hits: int, just_count: bool = False, process_function=None, extra_params={}):
data = {
"sortBy": [{"direction": "desc", "field": "temporal:startDate"}],
"limit": 50,
"page": 1,
"query": {
}
}
if data_type:
data["query"]["dataType"] = {"in": data_type}
if product_level:
data["query"]["spaceborne:productLevel"] = {"in": product_level}
if start_date:
data["query"]["temporal:endDate"] = {"gte": start_date}
if end_date:
data["query"]["temporal:startDate"] = {"lte": end_date}
if bbox:
data["bbox"] = bbox
headers = {"content-type": "application/json"}
page = 1
count = 0
to_do = max_hits
while page:
data["page"] = page
r = requests.post(url=stac_url, headers=headers, data=json.dumps(data), verify=False)
if r.ok:
doc = r.json()
to_do = min(doc.get("context", {}).get("matched", 0), max_hits)
if just_count:
print("{} items found.".format(doc.get("context", {}).get("matched", 0)))
return
for feature in doc.get("features", []):
item = to_item(feature, extra_params)
if process_function:
process_function(item)
count = count + 1
if count >= max_hits:
return
pages = list(filter(lambda link: link.get("rel") == "next", doc.get("links", [])))
if len(pages) > 0:
page = page + 1
else:
page = None
print("{} on {}".format(count, to_do))
else:
page = None
print("Failed to fetch items from {}: {} ({})".format(stac_url, r.status_code, r.content), file=sys.stderr)


def add_to_airs(airs_url: str, collection: str, item: Item):
requests.post("/".join([airs_url, "collections", collection, "items"]), )
r = requests.post(url="/".join([airs_url, "collections", collection, "items"]), data=to_json(item), headers={"Content-Type": "application/json"}, verify=False)
if r.status_code >= 200 and r.status_code < 300:
print("{} added".format(item.id))
else:
print("ERROR: Failled to add {}: {} ({})".format(item.id, r.status_code, r.content), file=sys.stderr)


@app.command(help="Add STAC features to ARLAS AIRS")
def add(
stac_url: str = typer.Argument(help="STAC URL (e.g. https://geodes-portal.cnes.fr/api/stac/)"),
airs_url: str = typer.Argument(help="AIRS URL (e.g. https://localhost/airs/)"),

collection: str = typer.Argument(help="Name of the ARLAS Collection)"),
catalog: str = typer.Argument(help="Name of the catalog within the collection)"),
data_type: list[str] = typer.Option(help="Data type ()", default=None),
product_level: list[str] = typer.Option(help="Product levels", default=None),
start_date: str = typer.Option(help="Start date for the STAC search", default=None),
end_date: str = typer.Option(help="End date for the STAC search", default=None),
bbox: list[float] = typer.Option(help="BBOX (lon_min lat_min lon max lat_max)", default=None),
max: int = typer.Option(help="Max number of feature to process", default=1000)
):
try:
search(stac_url, start_date, end_date, data_type, product_level, bbox, max, process_function=lambda i: add_to_airs(airs_url=airs_url, collection=collection, item=i), extra_params={"collection": collection, "catalog": catalog})
except Exception as e:
print("ERROR: Failled to add items: {}".format(e), file=sys.stderr)


@app.command(help="Show STAC features")
def show(
stac_url: str = typer.Argument(help="STAC URL (e.g. https://geodes-portal.cnes.fr/api/stac/items)"),
data_type: list[str] = typer.Option(help="Data type ()", default=None),
product_level: list[str] = typer.Option(help="Product levels", default=None),
start_date: str = typer.Option(help="Start date for the STAC search", default=None),
end_date: str = typer.Option(help="End date for the STAC search", default=None),
bbox: list[float] = typer.Option(help="BBOX (lon_min lat_min lon max lat_max)", default=None),
max: int = typer.Option(help="Max number of feature to process", default=1000)
):
search(stac_url, start_date, end_date, data_type, product_level, bbox, max, process_function=lambda i: print(to_json(i)))


@app.command(help="Count STAC features")
def count(
stac_url: str = typer.Argument(help="STAC URL (e.g. https://geodes-portal.cnes.fr/api/stac/items)"),
data_type: list[str] = typer.Option(help="Data type ()", default=None),
product_level: list[str] = typer.Option(help="Product levels", default=None),
start_date: str = typer.Option(help="Start date for the STAC search", default=None),
end_date: str = typer.Option(help="End date for the STAC search", default=None),
bbox: list[float] = typer.Option(help="BBOX (lon_min lat_min lon max lat_max)", default=None),
max: int = typer.Option(help="Max number of feature to process", default=1000)
):
search(stac_url, start_date, end_date, data_type, product_level, bbox, max, just_count=True)


def main():
app()


if __name__ == "__main__":
main()
Loading

0 comments on commit 749d2f0

Please sign in to comment.