diff --git a/.github/workflows/build_on_pull_request.yaml b/.github/workflows/build_on_pull_request.yaml index f32be5a..92f1dca 100644 --- a/.github/workflows/build_on_pull_request.yaml +++ b/.github/workflows/build_on_pull_request.yaml @@ -20,9 +20,11 @@ jobs: build --user - name: Build a binary wheel and a source for drivers - run: python3 -m build ./drivers + run: python3 -m build ./drivers - name: Set Docker image tag name run: echo "TAG=$(date +'%Y.%m.%d.%H.%M')" >> $GITHUB_ENV + - name: TAG ECHO + run: echo ${{ env.TAG }} - name: Login to Scaleway Container Registry uses: docker/login-action@v2 with: @@ -30,48 +32,56 @@ jobs: password: ${{ secrets.DOCKER_PASSWORD }} registry: ${{ vars.DOCKER_REGISTRY }} - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v2 - name: Build and push drivers - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./drivers file: ./drivers/Dockerfile push: true + cache-from: type=gha + cache-to: type=gha,mode=max build-args: | REGISTRY=${{ vars.GEOKUBE_REGISTRY }} tags: | ${{ vars.DOCKER_REGISTRY }}/geolake-drivers:${{ env.TAG }} - ${{ vars.DOCKER_REGISTRY }}/geolake-drivers:latest + ${{ vars.DOCKER_REGISTRY }}/geolake-drivers:latest - name: Build and push datastore component - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./datastore file: ./datastore/Dockerfile push: true build-args: | REGISTRY=${{ vars.DOCKER_REGISTRY }} + cache-from: type=gha + cache-to: type=gha,mode=max tags: | ${{ vars.DOCKER_REGISTRY }}/geolake-datastore:${{ env.TAG }} - ${{ vars.DOCKER_REGISTRY }}/geolake-datastore:latest + ${{ vars.DOCKER_REGISTRY }}/geolake-datastore:latest - name: Build and push api component - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./api file: ./api/Dockerfile push: true build-args: | REGISTRY=${{ vars.DOCKER_REGISTRY }} + cache-from: type=gha + cache-to: type=gha,mode=max tags: | ${{ vars.DOCKER_REGISTRY }}/geolake-api:${{ env.TAG }} - ${{ vars.DOCKER_REGISTRY }}/geolake-api:latest + ${{ vars.DOCKER_REGISTRY }}/geolake-api:latest - name: Build and push executor component - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./executor file: ./executor/Dockerfile push: true build-args: | REGISTRY=${{ vars.DOCKER_REGISTRY }} + cache-from: type=gha + cache-to: type=gha,mode=max tags: | ${{ vars.DOCKER_REGISTRY }}/geolake-executor:${{ env.TAG }} - ${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest \ No newline at end of file + ${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest \ No newline at end of file diff --git a/.github/workflows/build_on_release.yaml b/.github/workflows/build_on_release.yaml index 4a8da66..4c7d8be 100644 --- a/.github/workflows/build_on_release.yaml +++ b/.github/workflows/build_on_release.yaml @@ -32,17 +32,20 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - name: Build and push drivers - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./drivers file: ./drivers/Dockerfile push: true build-args: | REGISTRY=${{ vars.GEOKUBE_REGISTRY }} + TAG=v0.2a6 + cache-from: type=gha + cache-to: type=gha,mode=max tags: | ${{ vars.GEOLAKE_REGISTRY }}/geolake-drivers:${{ env.RELEASE_TAG }} - name: Build and push datastore component - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./datastore file: ./datastore/Dockerfile @@ -50,10 +53,12 @@ jobs: build-args: | REGISTRY=${{ vars.GEOLAKE_REGISTRY }} TAG=${{ env.RELEASE_TAG }} + cache-from: type=gha + cache-to: type=gha,mode=max tags: | ${{ vars.GEOLAKE_REGISTRY }}/geolake-datastore:${{ env.RELEASE_TAG }} - name: Build and push api component - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./api file: ./api/Dockerfile @@ -61,10 +66,12 @@ jobs: build-args: | REGISTRY=${{ vars.GEOLAKE_REGISTRY }} TAG=${{ env.RELEASE_TAG }} + cache-from: type=gha + cache-to: type=gha,mode=max tags: | ${{ vars.GEOLAKE_REGISTRY }}/geolake-api:${{ env.RELEASE_TAG }} - name: Build and push executor component - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: ./executor file: ./executor/Dockerfile @@ -72,5 +79,7 @@ jobs: build-args: | REGISTRY=${{ vars.GEOLAKE_REGISTRY }} TAG=${{ env.RELEASE_TAG }} + cache-from: type=gha + cache-to: type=gha,mode=max tags: | ${{ vars.GEOLAKE_REGISTRY }}/geolake-executor:${{ env.RELEASE_TAG }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 44f8be3..edf0d98 100644 --- a/.gitignore +++ b/.gitignore @@ -112,5 +112,7 @@ venv.bak/ _catalogs/ _old/ -# Netcdf files -*.nc +.DS_Store +db_init +*.zarr +*.nc \ No newline at end of file diff --git a/CITATION.cff b/CITATION.cff new file mode 100644 index 0000000..02b3311 --- /dev/null +++ b/CITATION.cff @@ -0,0 +1,51 @@ +# This CITATION.cff file was generated with cffinit. +# Visit https://bit.ly/cffinit to generate yours today! + +cff-version: 1.2.0 +title: geolake +message: >- + If you use this software, please cite it using the + metadata from this file. +type: software +authors: + - given-names: Marco + family-names: Mancini + orcid: 'https://orcid.org/0000-0002-9150-943X' + - given-names: Jakub + family-names: Walczak + orcid: 'https://orcid.org/0000-0002-5632-9484' + - given-names: Mirko + family-names: Stojiljković + - given-names: Valentina + family-names: Scardigno + orcid: 'https://orcid.org/0000-0002-0123-5368' +identifiers: + - type: doi + value: 10.5281/zenodo.10598417 +repository-code: 'https://github.com/CMCC-Foundation/geolake' +abstract: >+ + geolake is an open source framework for management, + storage, and analytics of Earth Science data. geolake + implements the concept of a data lake as a central + location that holds a large amount of data in its native + and raw format. geolake does not impose any schema when + ingesting the data, however it provides a unified Data + Model and API for geoscientific datasets. The data is kept + in the original format and storage, and the in-memory data + structure is built on-the-fly for the processing analysis. + + The system has been designed using a cloud-native + architecture, based on containerized microservices, that + facilitates the development, deployment and maintenance of + the system itself. It has been implemented by integrating + different open source frameworks, tools and libraries and + can be easily deployed using the Kubernetes platform and + related tools such as kubectl. + +keywords: + - python framework + - earth science + - data analytics +license: Apache-2.0 +version: 0.1.0 +date-released: '2024-01-29' diff --git a/README.md b/README.md index e1490aa..4d775a5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.10598417.svg)](https://doi.org/10.5281/zenodo.10598417) + # geolake ## Description diff --git a/api/Dockerfile b/api/Dockerfile index c038cb3..ad73842 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -1,9 +1,19 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geolake ARG TAG=latest FROM $REGISTRY/geolake-datastore:$TAG + +RUN apt update && apt install -y cron curl + WORKDIR /app COPY requirements.txt /code/requirements.txt RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt COPY app /app EXPOSE 80 + +COPY ./healtcheck.* /opt/ + +RUN chmod +x /opt/healtcheck.sh +RUN crontab -u root /opt/healtcheck.cron + CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] + diff --git a/api/app/main.py b/api/app/main.py index 79797bb..893a907 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -1,11 +1,13 @@ -"""Main module with dekube-dds API endpoints defined""" -__version__ = "2.0" +"""Main module with geolake API endpoints defined""" +__version__ = "0.1.0" import os -from typing import Optional - +import re +from typing import Optional, Dict from datetime import datetime -from fastapi import FastAPI, HTTPException, Request, status, Query +from oai_dcat.metadata_provider import BASE_URL +from oai_dcat.oai_utils import serialize_and_concatenate_graphs, convert_to_dcat_ap_it +from fastapi import FastAPI, HTTPException, Request, status, Query, Response from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.authentication import AuthenticationMiddleware from starlette.authentication import requires @@ -18,7 +20,6 @@ ) from aioprometheus.asgi.starlette import metrics -from geoquery.geoquery import GeoQuery from geoquery.task import TaskList from geoquery.geoquery import GeoQuery @@ -34,20 +35,32 @@ from encoders import extend_json_encoders from const import venv, tags from auth import scopes +from oai_dcat import oai_server def map_to_geoquery( variables: list[str], format: str, bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) time: datetime | None = None, + filters: Optional[Dict] = None, **format_kwargs ) -> GeoQuery: - bbox_ = [float(x) for x in bbox.split(',')] - area = { 'west': bbox_[0], 'south': bbox_[1], 'east': bbox_[2], 'north': bbox_[3], } - time_ = { 'year': time.year, 'month': time.month, 'day': time.day, 'hour': time.hour} - query = GeoQuery(variable=variables, time=time_, area=area, - format_args=format_kwargs, format=format) + if bbox: + bbox_ = [float(x) for x in bbox.split(',')] + area = { 'west': bbox_[0], 'south': bbox_[1], 'east': bbox_[2], 'north': bbox_[3], } + else: + area = None + if time: + time_ = { 'year': time.year, 'month': time.month, 'day': time.day, 'hour': time.hour} + else: + time_ = None + if filters: + query = GeoQuery(variable=variables, time=time_, area=area, filters=filters, + format_args=format_kwargs, format=format) + else: + query = GeoQuery(variable=variables, time=time_, area=area, + format_args=format_kwargs, format=format) return query logger = get_dds_logger(__name__) @@ -56,12 +69,12 @@ def map_to_geoquery( extend_json_encoders() app = FastAPI( - title="geokube-dds API", - description="REST API for geokube-dds", + title="geolake API", + description="REST API for geolake", version=__version__, contact={ - "name": "geokube Contributors", - "email": "geokube@googlegroups.com", + "name": "geolake Contributors", + "email": "geolake@googlegroups.com", }, license_info={ "name": "Apache 2.0", @@ -108,9 +121,9 @@ def map_to_geoquery( # ======== Endpoints definitions ========= # @app.get("/", tags=[tags.BASIC]) -async def dds_info(): - """Return current version of the DDS API""" - return f"DDS API {__version__}" +async def geolake_info(): + """Return current version of the geolake API""" + return f"geolake API {__version__}" @app.get("/datasets", tags=[tags.DATASET]) @@ -185,13 +198,17 @@ async def get_map( # OGC WMS parameters width: int, height: int, + dpi: int | None = 100, layers: str | None = None, format: str | None = 'png', time: datetime | None = None, transparent: bool | None = 'true', bgcolor: str | None = 'FFFFFF', + cmap: str | None = 'RdBu_r', bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) - crs: str | None = None, + crs: str | None = None, + vmin: float | None = None, + vmax: float | None = None # OGC map parameters # subset: str | None = None, # subset_crs: str | None = Query(..., alias="subset-crs"), @@ -210,9 +227,89 @@ async def get_map( # vertical: Optional[Union[float, List[float], Dict[str, float]]] # filters: Optional[Dict] # format: Optional[str] - query = map_to_geoquery(variables=layers, bbox=bbox, time=time, - format="png", width=width, height=height, - transparent=transparent, bgcolor=bgcolor) + + query = map_to_geoquery(variables=layers, bbox=bbox, time=time, + format="png", width=width, height=height, + transparent=transparent, bgcolor=bgcolor, + dpi=dpi, cmap=cmap, projection=crs, + vmin=vmin, vmax=vmax) + try: + return dataset_handler.sync_query( + user_id=request.user.id, + dataset_id=dataset_id, + product_id=product_id, + query=query + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + +@app.get("/datasets/{dataset_id}/{product_id}/{filters:path}/map", tags=[tags.DATASET]) +@timer( + app.state.api_request_duration_seconds, + labels={"route": "GET /datasets/{dataset_id}/{product_id}"}, +) +async def get_map_with_filters( + request: Request, + dataset_id: str, + product_id: str, + filters: str, +# OGC WMS parameters + width: int, + height: int, + dpi: int | None = 100, + layers: str | None = None, + format: str | None = 'png', + time: datetime | None = None, + transparent: bool | None = 'true', + bgcolor: str | None = 'FFFFFF', + cmap: str | None = 'RdBu_r', + bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + crs: str | None = None, + vmin: float | None = None, + vmax: float | None = None +# OGC map parameters + # subset: str | None = None, + # subset_crs: str | None = Query(..., alias="subset-crs"), + # bbox_crs: str | None = Query(..., alias="bbox-crs"), +): + filters_vals = filters.split("/") + + if dataset_id in ['rs-indices', 'pasture']: + filters_dict = {'pasture': filters_vals[0]} + + else: + try: + product_info = dataset_handler.get_product_details( + user_roles_names=request.auth.scopes, + dataset_id=dataset_id, + product_id=product_id, + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + + filters_keys = product_info['metadata']['filters'] + filters_dict = {} + for i in range(0, len(filters_vals)): + filters_dict[filters_keys[i]['name']] = filters_vals[i] + + app.state.api_http_requests_total.inc( + {"route": "GET /datasets/{dataset_id}/{product_id}/map"} + ) + # query should be the OGC query + # map OGC parameters to GeoQuery + # variable: Optional[Union[str, List[str]]] + # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]] + # area: Optional[Dict[str, float]] + # location: Optional[Dict[str, Union[float, List[float]]]] + # vertical: Optional[Union[float, List[float], Dict[str, float]]] + # filters: Optional[Dict] + # format: Optional[str] + + query = map_to_geoquery(variables=layers, bbox=bbox, time=time, filters=filters_dict, + format="png", width=width, height=height, + transparent=transparent, bgcolor=bgcolor, + dpi=dpi, cmap=cmap, projection=crs, vmin=vmin, vmax=vmax) + try: return dataset_handler.sync_query( user_id=request.user.id, @@ -267,7 +364,73 @@ async def get_feature( ) except exc.BaseDDSException as err: raise err.wrap_around_http_exception() from err + +@app.get("/datasets/{dataset_id}/{product_id}/{filters:path}/items/{feature_id}", tags=[tags.DATASET]) +@timer( + app.state.api_request_duration_seconds, + labels={"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"}, +) +async def get_feature_with_filters( + request: Request, + dataset_id: str, + product_id: str, + feature_id: str, + filters: str, +# OGC feature parameters + time: datetime | None = None, + bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + crs: str | None = None, +# OGC map parameters + # subset: str | None = None, + # subset_crs: str | None = Query(..., alias="subset-crs"), + # bbox_crs: str | None = Query(..., alias="bbox-crs"), +): + filters_vals = filters.split("/") + + if dataset_id in ['rs-indices', 'pasture']: + filters_dict = {'pasture': filters_vals[0]} + + else: + try: + product_info = dataset_handler.get_product_details( + user_roles_names=request.auth.scopes, + dataset_id=dataset_id, + product_id=product_id, + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + + filters_keys = product_info['metadata']['filters'] + filters_dict = {} + for i in range(0, len(filters_vals)): + filters_dict[filters_keys[i]['name']] = filters_vals[i] + + app.state.api_http_requests_total.inc( + {"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"} + ) + # query should be the OGC query + # feature OGC parameters to GeoQuery + # variable: Optional[Union[str, List[str]]] + # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]] + # area: Optional[Dict[str, float]] + # location: Optional[Dict[str, Union[float, List[float]]]] + # vertical: Optional[Union[float, List[float], Dict[str, float]]] + # filters: Optional[Dict] + # format: Optional[str] + query = map_to_geoquery(variables=[feature_id], bbox=bbox, time=time, filters=filters_dict, + format="geojson") + try: + return dataset_handler.sync_query( + user_id=request.user.id, + dataset_id=dataset_id, + product_id=product_id, + query=query + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + + @app.get("/datasets/{dataset_id}/{product_id}/metadata", tags=[tags.DATASET]) @timer( app.state.api_request_duration_seconds, @@ -467,3 +630,53 @@ async def download_request_result( raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="File was not found!" ) from err + +# Define OAI-PMH endpoint route +@app.get("/oai/{dataset_id}") +@app.post("/oai/{dataset_id}") +async def oai(request: Request, dataset_id: str): + params = dict(request.query_params) + + # Add dataset_id to the parameters as "set_", which is a parameter from the OAI-PMH protocol + params['set'] = dataset_id + # params['scopes'] = request.auth.scopes + + # Making sure it uses the dcat_ap metadata prefix + if 'metadataPrefix' not in params: + params['metadataPrefix'] = 'dcat_ap' + + # handleRequest points the request to the appropriate method in metadata_provider.py + response = oai_server.oai_server.handleRequest(params) + logger.debug(f"OAI-PMH Response: {response}") + # Replace date in datestamp by empty string + response = re.sub(b'.*', b'', response) + return Response(content=response, media_type="text/xml") + +# Define an endpoint for getting all the datasets +@app.get("/oai") +@app.post("/oai") +async def oai_all_datasets(request: Request): + params = dict(request.query_params) + + # Making sure it uses the dcat_ap metadata prefix + if 'metadataPrefix' not in params: + params['metadataPrefix'] = 'dcat_ap' + + # handleRequest points the request to the appropriate method in metadata_provider.py + response = oai_server.oai_server.handleRequest(params) + logger.debug(f"OAI-PMH Response: {response}") + # Replace date in datestamp by empty string + response = re.sub(b'.*', b'', response) + return Response(content=response, media_type="text/xml") + +# Endpoint for generating DCAT-AP IT catalog +@app.get("/dcatapit") +async def dcatapit(request: Request): + data = dataset_handler.get_datasets( + user_roles_names=request.auth.scopes + ) + catalog_graph, datasets_graph, distributions_graph, vcard_graph = convert_to_dcat_ap_it(data, BASE_URL) + # response = dcatapit_graph.serialize(format='pretty-xml') + response = serialize_and_concatenate_graphs(catalog_graph, datasets_graph, distributions_graph, vcard_graph) + + return Response(content=response, media_type="application/rdf+xml") \ No newline at end of file diff --git a/api/app/oai_dcat/__init__.py b/api/app/oai_dcat/__init__.py new file mode 100644 index 0000000..3654022 --- /dev/null +++ b/api/app/oai_dcat/__init__.py @@ -0,0 +1,3 @@ +# from . import metadata_provider as metadata_provider +from . import oai_server as oai_server +# from . import oai_utils as oai_utils diff --git a/api/app/oai_dcat/metadata_provider.py b/api/app/oai_dcat/metadata_provider.py new file mode 100644 index 0000000..6f140ff --- /dev/null +++ b/api/app/oai_dcat/metadata_provider.py @@ -0,0 +1,101 @@ +from oaipmh.common import Identify, Metadata, Header +from datetime import datetime +from lxml import etree +from lxml.etree import Element +import json +from .oai_utils import convert_to_dcat_ap +import logging + +import exceptions as exc +from endpoint_handlers import dataset_handler + +BASE_URL = "https://sebastien-datalake.cmcc.it/api/v2/datasets" + +# Logging config +logging.basicConfig(level=logging.DEBUG) + +# Each method in this class is a verb from the OAI-PMH protocol. Only listRecords is used by the data.europa harvester +class MyMetadataProvider: + # Method to list records, only method used by data.europa harvester + def listRecords(self, metadataPrefix='dcat_ap', from_=None, until=None, set=None): + logging.debug("Fetching data from API") + # Fetch data from the dataset endpoint + ''' + data = main.fetch_data( + f"{BASE_URL}{set}" + ) + ''' + if set: + dataset_url = f"{BASE_URL}/{set}" + + try: + data = dataset_handler.get_product_details( + user_roles_names=['public'], + dataset_id=set, + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + + else: + dataset_url = BASE_URL + data = dataset_handler.get_datasets(user_roles_names=['public']) + + logging.debug(f"Fetched data: {data}") + + # Convert to RDF graph with proper DCAT-AP fields (URL is being used to fill the accessURL field) + rdf_graph = convert_to_dcat_ap(data, f"{BASE_URL}{set}") + + # Serialize the RDF graph into a string, 'pretty-xml' format makes it more readable + rdf_string = rdf_graph.serialize(format='pretty-xml') + logging.debug(f"RDF string: {rdf_string}") + + # Create a header (mandatory for OAI-PMH) + header_element = Element("header") + header = Header(deleted=False, element=header_element, identifier="", datestamp=datetime.utcnow(), setspec=[]) + + # Create metadata element and fill it with the RDF/XML string + metadata_element = Element("metadata") + metadata = Metadata(element=metadata_element, map={"rdf": rdf_string}) + + + return [(header, metadata, [])], None + + # The remaining methods are only present because they are mandatory for the OAI-PMH protocol + + # Minimal implementation for identify + def identify(self): + return Identify( + repositoryName='Sebastien DataLake', # Name of the repository + baseURL='', # Base URL of the OAI-PMH endpoint + protocolVersion='2.0', # OAI-PMH protocol version + adminEmails=['sebastien_support@cmcc.it'], # List of admin email addresses + earliestDatestamp=datetime(2024, 1, 1), # Earliest datestamp for records + deletedRecord='no', # Policy on deleted records + granularity='YYYY-MM-DDThh:mm:ssZ', # Date granularity + compression=['identity'], # Supported compression methods + ) + + # Minimal implementation for getRecord + def getRecord(self, identifier, metadataPrefix='oai_dc'): + # Create a header + header = Header(identifier=identifier, datestamp=datetime.now(), setspec=[]) + + # Create metadata element (empty in this example) + metadata = Metadata(element=Element("record"), map={}) + + return header, metadata, [] + + # Minimal implementation for listIdentifiers + def listIdentifiers(self, metadataPrefix='oai_dc', from_=None, until=None, set_=None): + # Create a header + header = Header(identifier="id1", datestamp=datetime.now(), setspec=[]) + + return [header] + + # Minimal implementation for listMetadataFormats + def listMetadataFormats(self, identifier=None): + return [('oai_dc', 'http://www.openarchives.org/OAI/2.0/oai_dc.xsd', 'http://www.openarchives.org/OAI/2.0/oai_dc/')] + + # Minimal implementation for listSets + def listSets(self): + return [] diff --git a/api/app/oai_dcat/oai_server.py b/api/app/oai_dcat/oai_server.py new file mode 100644 index 0000000..c2ba898 --- /dev/null +++ b/api/app/oai_dcat/oai_server.py @@ -0,0 +1,34 @@ +from oaipmh.server import ServerBase, oai_dc_writer +from oaipmh.metadata import MetadataRegistry, oai_dc_reader +from . import metadata_provider +from lxml.etree import fromstring, tostring +import logging + +# Function to write metadata in dcat_ap format (RDF/XML), otherwise it would use the default format (oai_dc) +def dcat_ap_writer(metadata_element, metadata): + rdf_string = metadata["rdf"] + rdf_element = fromstring(bytes(rdf_string, encoding='utf-8')) + + for child in rdf_element: + metadata_element.append(child) + logging.debug(f"Metadata Element: {tostring(metadata_element, pretty_print=True).decode('utf-8')}") + + +# Create reader for dcat_ap metadata +def dcat_ap_reader(element): + rdf_string = tostring(element, encoding='unicode') + return {"rdf": rdf_string} + +# Server class, it defines writers and readers, as well as the metadata provider (metadata_provider.py) +class MyServer(ServerBase): + def __init__(self): + metadata_registry = MetadataRegistry() + metadata_registry.registerWriter("oai_dc", oai_dc_writer) + metadata_registry.registerReader("oai_dc", oai_dc_reader) + metadata_registry.registerWriter("dcat_ap", dcat_ap_writer) + metadata_registry.registerReader("dcat_ap", dcat_ap_reader) + server = metadata_provider.MyMetadataProvider() + super(MyServer, self).__init__(server, metadata_registry) + + +oai_server = MyServer() diff --git a/api/app/oai_dcat/oai_utils.py b/api/app/oai_dcat/oai_utils.py new file mode 100644 index 0000000..b45714f --- /dev/null +++ b/api/app/oai_dcat/oai_utils.py @@ -0,0 +1,499 @@ +import re +from rdflib import Graph, Literal, Namespace, RDF, URIRef, BNode +from rdflib.namespace import DCAT, DCTERMS, FOAF, RDF, XSD +import logging +from datetime import datetime + +# Dictionary with accrualPeriodicity values for somw known datasets +ACCRUAL_PERIODICITY = { + "blue-tongue": "AS_NEEDED", + "iot-animal": "HOURLY", + "pasture": "BIWEEKLY", + "pi": "DAILY", + "pi-long-term": "AS_NEEDED", + "thi": "DAILY", + "iot-environmental" : "IRREG" +} + +# Logging config +logging.basicConfig(level=logging.DEBUG) + +# Namespaces for DCAT-AP, to be binded to the RDF graph +DCAT = Namespace("http://www.w3.org/ns/dcat#") +DCT = Namespace("http://purl.org/dc/terms/") +FOAF = Namespace("http://xmlns.com/foaf/0.1/") +VCARD = Namespace("http://www.w3.org/2006/vcard/ns#") +EDP = Namespace("https://europeandataportal.eu/voc#") +SPDX = Namespace("http://spdx.org/rdf/terms#") +ADMS = Namespace("http://www.w3.org/ns/adms#") +DQV = Namespace("http://www.w3.org/ns/dqv#") +SKOS = Namespace("http://www.w3.org/2004/02/skos/core#") +SCHEMA = Namespace("http://schema.org/") +# Namespace for DCAT-AP IT +DCATAPIT = Namespace("http://dati.gov.it/onto/dcatapit#") + + +# Define classes for DCAT-AP entities (Dataset, Distribution and ContactPoint) + +class ContactPoint: + def __init__(self, name=None, email=None, webpage=None): + self.name = name + self.email = email + self.webpage = webpage + + +class Distribution: + def __init__(self, access_url=None, description=None, download_url=None, + media_type=None, format=None, rights=None, license=None, identifier=None): + self.access_url = access_url + self.description = description + self.download_url = download_url + self.media_type = media_type + self.format = format + self.rights = rights + self.license = license + self.identifier = identifier + + # Build the RDF graph for the distribution + def to_graph(self, g): + distribution = URIRef(self.uri) + g.add((distribution, RDF.type, DCAT.Distribution)) + if self.access_url: + g.add((distribution, DCAT.accessURL, URIRef(self.access_url))) + if self.description: + g.add((distribution, DCTERMS.description, Literal(self.description))) + if self.download_url: + g.add((distribution, DCAT.downloadURL, URIRef(self.download_url))) + if self.media_type: + g.add((distribution, DCTERMS.mediaType, URIRef(self.media_type))) + if self.format: + g.add((distribution, DCTERMS.format, URIRef(self.format))) + if self.rights: + rights_bnode = BNode() + g.add((distribution, DCTERMS.rights, rights_bnode)) + g.add((rights_bnode, RDF.type, DCTERMS.RightsStatement)) + g.add((rights_bnode, DCTERMS.rights, URIRef(self.rights))) + if self.license: + license_bnode = BNode() + g.add((distribution, DCTERMS.license, license_bnode)) + g.add((license_bnode, RDF.type, DCTERMS.LicenseDocument)) + g.add((license_bnode, DCTERMS.license, URIRef(self.license))) + if self.identifier: + g.add((distribution, DCTERMS.identifier, Literal(self.identifier))) + return g + + +class DatasetDCAT: + def __init__(self, uri, title=None, description=None, issued=None, identifier=None, contact_point=None): + self.uri = uri + self.title = title + self.description = description + self.issued = issued + self.identifier = identifier + self.contact_point = contact_point + self.distributions = [] + + def add_distribution(self, distribution): + self.distributions.append(distribution) + + # Build the RDF graph for the dataset + def to_graph(self, g): + dataset = URIRef(self.uri) + g.add((dataset, RDF.type, DCAT.Dataset)) + logging.debug(f"Adding to graph {g.identifier}: {dataset} a type {DCAT.Dataset}") + + if self.title: + g.add((dataset, DCTERMS.title, Literal(self.title))) + if self.description: + g.add((dataset, DCTERMS.description, Literal(self.description))) + if self.issued: + g.add((dataset, DCTERMS.issued, Literal(self.issued, datatype=DCTERMS.W3CDTF))) + if self.identifier: + g.add((dataset, DCTERMS.identifier, Literal(self.identifier))) + + if self.contact_point: + contact_bnode = BNode() + g.add((dataset, DCAT.contactPoint, contact_bnode)) + g.add((contact_bnode, RDF.type, VCARD.Kind)) + if self.contact_point.name: + g.add((contact_bnode, VCARD.fn, Literal(self.contact_point.name))) + if self.contact_point.email: + g.add((contact_bnode, VCARD.hasEmail, URIRef(f"mailto:{self.contact_point.email}"))) + if self.contact_point.webpage: + g.add((contact_bnode, VCARD.hasURL, URIRef(self.contact_point.webpage))) + + for dist in self.distributions: + distribution_bnode = BNode() + g.add((dataset, DCAT.distribution, distribution_bnode)) + g.add((distribution_bnode, RDF.type, DCAT.Distribution)) + if dist.access_url: + g.add((distribution_bnode, DCAT.accessURL, URIRef(dist.access_url))) + if dist.description: + g.add((distribution_bnode, DCTERMS.description, Literal(dist.description))) + if dist.download_url: + g.add((distribution_bnode, DCAT.downloadURL, URIRef(dist.download_url))) + if dist.media_type: + g.add((distribution_bnode, DCTERMS.mediaType, URIRef(dist.media_type))) + if dist.format: + g.add((distribution_bnode, DCTERMS.format, URIRef(dist.format))) + if dist.rights: + rights_bnode = BNode() + g.add((distribution_bnode, DCTERMS.rights, rights_bnode)) + g.add((rights_bnode, RDF.type, DCTERMS.RightsStatement)) + g.add((rights_bnode, DCTERMS.rights, URIRef(dist.rights))) + if dist.license: + license_bnode = BNode() + g.add((distribution_bnode, DCTERMS.license, license_bnode)) + g.add((license_bnode, RDF.type, DCTERMS.LicenseDocument)) + g.add((license_bnode, DCTERMS.license, URIRef(dist.license))) + if dist.identifier: + g.add((distribution_bnode, DCTERMS.identifier, Literal(dist.identifier))) + + return g + + +# Define classes for DCAT-AP IT entities (Catalog, Dataset, Distribution, and ContactPoint) + +class ContactPointIT: + def __init__(self, name=None, email=None, webpage=None): + self.name = name + self.email = email + self.webpage = webpage + + def to_graph(self, g, parent): + contact_bnode = BNode() + g.add((parent, DCAT.contactPoint, contact_bnode)) + g.add((contact_bnode, RDF.type, VCARD.Kind)) + if self.name: + g.add((contact_bnode, VCARD.fn, Literal(self.name))) + if self.email: + g.add((contact_bnode, VCARD.hasEmail, URIRef(f"mailto:{self.email}"))) + if self.webpage: + g.add((contact_bnode, VCARD.hasURL, URIRef(self.webpage))) + + +class DistributionIT: + def __init__(self, uri, access_url=None, description=None, download_url=None, + media_type=None, format=None, rights=None, license=None, identifier=None): + self.uri = uri + self.access_url = access_url + self.description = description + self.download_url = download_url + self.media_type = media_type + self.format = format + self.rights = rights + self.license = license + self.identifier = identifier + + def to_graph(self, g): + distribution = URIRef(self.uri) + g.add((distribution, RDF.type, DCATAPIT.Distribution)) + if self.access_url: + g.add((distribution, DCAT.accessURL, URIRef(self.access_url))) + if self.description: + g.add((distribution, DCTERMS.description, Literal(self.description))) + if self.download_url: + g.add((distribution, DCAT.downloadURL, URIRef(self.download_url))) + if self.media_type: + g.add((distribution, DCTERMS.mediaType, URIRef(self.media_type))) + if self.format: + g.add((distribution, DCTERMS.format, URIRef(self.format))) + if self.rights: + rights_bnode = BNode() + g.add((distribution, DCTERMS.rights, rights_bnode)) + g.add((rights_bnode, RDF.type, DCTERMS.RightsStatement)) + g.add((rights_bnode, DCTERMS.rights, URIRef(self.rights))) + if self.license: + license_bnode = BNode() + g.add((distribution, DCTERMS.license, license_bnode)) + g.add((license_bnode, RDF.type, DCTERMS.LicenseDocument)) + g.add((license_bnode, DCTERMS.license, URIRef(self.license))) + if self.identifier: + g.add((distribution, DCTERMS.identifier, Literal(self.identifier))) + + +class DatasetDCATAPIT: + def __init__(self, uri, title=None, description=None, issued=None, identifier=None, contact_point=None): + self.uri = uri + self.title = title + self.description = description + self.issued = issued + self.identifier = identifier + self.contact_point = contact_point + self.distributions = [] + + def add_distribution(self, distribution): + self.distributions.append(distribution) + + def to_graph(self, g): + dataset = URIRef(self.uri) + g.add((dataset, RDF.type, DCATAPIT.Dataset)) + if self.title: + g.add((dataset, DCTERMS.title, Literal(self.title))) + if self.description: + g.add((dataset, DCTERMS.description, Literal(self.description))) + if self.issued: + g.add((dataset, DCTERMS.issued, Literal(self.issued, datatype=DCTERMS.W3CDTF))) + if self.identifier: + g.add((dataset, DCTERMS.identifier, Literal(self.identifier))) + + if self.contact_point: + self.contact_point.to_graph(g, dataset) + + for dist in self.distributions: + distribution_uri = URIRef(dist.uri) + g.add((dataset, DCAT.distribution, distribution_uri)) + + return g + + +class CatalogIT: + def __init__(self, uri, title, description, modified, publisher_name, publisher_identifier, publisher_homepage, + publisher_email, dataset_uris=None): + self.uri = uri + self.title = title + self.description = description + self.modified = modified + self.publisher_name = publisher_name + self.publisher_identifier = publisher_identifier + self.publisher_homepage = publisher_homepage + self.publisher_email = publisher_email + self.dataset_uris = dataset_uris if dataset_uris is not None else [] + + def add_dataset(self, dataset_uri): + self.dataset_uris.append(dataset_uri) + + def to_graph(self, g): + catalog = URIRef(self.uri) + g.add((catalog, RDF.type, DCATAPIT.Catalog)) + g.add((catalog, DCTERMS.title, Literal(self.title))) + g.add((catalog, DCTERMS.description, Literal(self.description))) + g.add((catalog, DCTERMS.modified, Literal(self.modified, datatype=DCTERMS.W3CDTF))) + + catalog_publisher_node = BNode() + g.add((catalog, DCTERMS.publisher, catalog_publisher_node)) + g.add((catalog_publisher_node, RDF.type, FOAF.Agent)) + g.add((catalog_publisher_node, RDF.type, DCATAPIT.Agent)) + g.add((catalog_publisher_node, FOAF.name, Literal(self.publisher_name))) + g.add((catalog_publisher_node, DCTERMS.identifier, Literal(self.publisher_identifier))) + g.add((catalog_publisher_node, FOAF.homepage, URIRef(self.publisher_homepage))) + g.add((catalog_publisher_node, FOAF.mbox, URIRef(f"mailto:{self.publisher_email}"))) + + for dataset_uri in self.dataset_uris: + g.add((catalog, DCAT.dataset, URIRef(dataset_uri))) + + return g + + # Function to convert to DCAT-AP IT format + + +def convert_to_dcat_ap_it(data, url): + # Create separate graphs + catalog_graph = Graph() + datasets_graph = Graph() + distributions_graph = Graph() + vcard_graph = Graph() + + # Bind namespaces to all graphs + for g in [catalog_graph, datasets_graph, distributions_graph, vcard_graph]: + g.bind("dcatapit", DCATAPIT) + g.bind("foaf", FOAF) + g.bind("dcat", DCAT) + g.bind("dct", DCT) + g.bind("vcard", VCARD) + g.bind("rdf", RDF) + + # Contact point URI + contact_point_uri = URIRef("https://www.cmcc.it") + + # Create catalog + catalog_uri = URIRef(url) + catalog_graph.add((catalog_uri, RDF.type, DCATAPIT.Catalog)) + catalog_graph.add((catalog_uri, RDF.type, DCAT.Catalog)) + catalog_graph.add((catalog_uri, DCTERMS.title, Literal("Sebastien Catalog"))) + catalog_graph.add((catalog_uri, DCTERMS.description, Literal("A catalog of Sebastien datasets"))) + catalog_graph.add((catalog_uri, FOAF.homepage, Literal(url))) + catalog_graph.add( + (catalog_uri, DCTERMS.language, Literal("http://publications.europa.eu/resource/authority/language/ITA"))) + catalog_graph.add((catalog_uri, DCTERMS.modified, Literal(datetime.now(), datatype=XSD.dateTime))) + + # Add publisher information + publisher = BNode() + catalog_graph.add((catalog_uri, DCTERMS.publisher, publisher)) + catalog_graph.add((publisher, RDF.type, FOAF.Agent)) + catalog_graph.add((publisher, RDF.type, DCATAPIT.Agent)) + catalog_graph.add((publisher, FOAF.name, Literal("CMCC Foundation"))) + catalog_graph.add((publisher, DCTERMS.identifier, Literal("XW88C90Q"))) + catalog_graph.add((publisher, FOAF.homepage, URIRef("https://www.cmcc.it"))) + catalog_graph.add((publisher, FOAF.mbox, URIRef("mailto:dds-support@cmcc.it"))) + + for i, dataset in enumerate(data, 1): + if "dataset" not in dataset: + dataset = {"dataset": dataset} + dataset_id = dataset.get("dataset", {}).get("metadata", {}).get("id") + dataset_uri = URIRef(f'{url}/{i}') + + # Add dataset reference to catalog + catalog_graph.add((catalog_uri, DCAT.dataset, dataset_uri)) + + # Create dataset + datasets_graph.add((dataset_uri, RDF.type, DCATAPIT.Dataset)) + datasets_graph.add((dataset_uri, RDF.type, DCAT.Dataset)) + datasets_graph.add( + (dataset_uri, DCTERMS.title, Literal(dataset.get("dataset", {}).get("metadata", {}).get("label")))) + datasets_graph.add((dataset_uri, DCTERMS.description, + Literal(dataset.get("dataset", {}).get("metadata", {}).get("description")))) + datasets_graph.add((dataset_uri, DCTERMS.issued, Literal( + datetime.strptime(str(dataset.get("dataset", {}).get("metadata", {}).get("publication_date")), '%Y-%m-%d'), + datatype=XSD.dateTime))) + datasets_graph.add((dataset_uri, DCTERMS.identifier, Literal(f"XW88C90Q:{dataset_id}"))) + datasets_graph.add( + (dataset_uri, DCTERMS.language, Literal("http://publications.europa.eu/resource/authority/language/ITA"))) + # Add dct:modified, dcat:theme, dct:rightsHolder and dct:accrualPeriodicity + datasets_graph.add((dataset_uri, DCTERMS.modified, Literal(datetime.now(), datatype=XSD.dateTime))) + datasets_graph.add( + (dataset_uri, DCAT.theme, URIRef("http://publications.europa.eu/resource/authority/data-theme/AGRI"))) + datasets_graph.add((dataset_uri, DCTERMS.accrualPeriodicity, URIRef( + f"http://publications.europa.eu/resource/authority/frequency/{ACCRUAL_PERIODICITY.get(dataset_id)}"))) + # Add publisher info on dataset + publisher_dataset = BNode() + datasets_graph.add((dataset_uri, DCTERMS.publisher, publisher_dataset)) + datasets_graph.add((publisher_dataset, RDF.type, FOAF.Agent)) + datasets_graph.add((publisher_dataset, RDF.type, DCATAPIT.Agent)) + datasets_graph.add((publisher_dataset, FOAF.name, Literal("CMCC Foundation"))) + datasets_graph.add((publisher_dataset, DCTERMS.identifier, Literal("XW88C90Q"))) + # Add rights holder BNode + rights_holder_uri = BNode() + datasets_graph.add((dataset_uri, DCTERMS.rightsHolder, rights_holder_uri)) + datasets_graph.add((rights_holder_uri, RDF.type, DCATAPIT.Agent)) + datasets_graph.add((rights_holder_uri, RDF.type, FOAF.Agent)) + datasets_graph.add((rights_holder_uri, DCTERMS.identifier, Literal("XW88C90Q"))) + datasets_graph.add((rights_holder_uri, FOAF.name, Literal("CMCC Foundation"))) + + # Add contact point + contact = dataset.get("dataset", {}).get("metadata", {}).get("contact") + datasets_graph.add((dataset_uri, DCAT.contactPoint, contact_point_uri)) + + # Create distribution + # products = dataset.get("dataset", {}).get("metadata", {}).get("products", {}).get("monthly", {}) + distribution_uri = URIRef(f'{url}/{dataset_id}') + datasets_graph.add((dataset_uri, DCAT.distribution, distribution_uri)) + distributions_graph.add((distribution_uri, RDF.type, DCAT.Distribution)) + distributions_graph.add((distribution_uri, DCAT.accessURL, distribution_uri)) + distributions_graph.add((distribution_uri, DCTERMS.title, + Literal(dataset.get("dataset", {}).get("metadata", {}).get("description")))) + distributions_graph.add((distribution_uri, DCTERMS.description, + Literal(dataset.get("dataset", {}).get("metadata", {}).get("description")))) + license_uri = URIRef("https://w3id.org/italia/controlled-vocabulary/licences/A21_CCBY40") + license_document = BNode() + distributions_graph.add((distribution_uri, DCTERMS.license, license_document)) + distributions_graph.add((license_document, RDF.type, DCATAPIT.LicenseDocument)) + distributions_graph.add( + (license_document, DCTERMS.type, URIRef("http://purl.org/adms/licencetype/Attribution"))) + distributions_graph.add( + (license_document, FOAF.name, Literal("Creative Commons Attribuzione 4.0 Internazionale (CC BY 4.0)"))) + distributions_graph.add((distribution_uri, DCTERMS.format, + URIRef("http://publications.europa.eu/resource/authority/file-type/JSON"))) + distributions_graph.add((distribution_uri, RDF.type, DCATAPIT.Distribution)) + + # Create vcard:Organization node + contact = dataset.get("dataset", {}).get("metadata", {}).get("contact") + vcard_graph.add((contact_point_uri, RDF.type, VCARD.Organization)) + vcard_graph.add((contact_point_uri, RDF.type, URIRef("http://dati.gov.it/onto/dcatapit#Organization"))) + vcard_graph.add((contact_point_uri, RDF.type, URIRef("http://xmlns.com/foaf/0.1/Organization"))) + vcard_graph.add((contact_point_uri, RDF.type, URIRef("http://www.w3.org/2006/vcard/ns#Kind"))) + vcard_graph.add((contact_point_uri, VCARD.fn, Literal(contact.get("name")))) + vcard_graph.add((contact_point_uri, VCARD.hasEmail, URIRef(f"mailto:{contact.get('email')}"))) + vcard_graph.add((contact_point_uri, VCARD.hasURL, URIRef(contact.get("webpage")))) + + return catalog_graph, datasets_graph, distributions_graph, vcard_graph + + +def serialize_and_concatenate_graphs(catalog_graph, datasets_graph, distributions_graph, vcard_graph): + # Serialize each graph to a string + catalog_str = catalog_graph.serialize(format='pretty-xml') + datasets_str = datasets_graph.serialize(format='pretty-xml') + distributions_str = distributions_graph.serialize(format='pretty-xml') + vcard_str = vcard_graph.serialize(format='pretty-xml') + + # Remove XML headers and opening tags from datasets and distributions and vcard strings + datasets_str = re.sub(r'<\?xml[^>]+\?>', '', datasets_str) + datasets_str = re.sub(r']*>', '', datasets_str, count=1).rsplit('', 1)[0] + distributions_str = re.sub(r'<\?xml[^>]+\?>', '', distributions_str) + distributions_str = re.sub(r']*>', '', distributions_str, count=1).rsplit('', 1)[0] + vcard_str = re.sub(r'<\?xml[^>]+\?>', '', vcard_str) + vcard_str = re.sub(r']*>', '', vcard_str, count=1).rsplit('', 1)[0] + + # Concatenate the strings + final_str = catalog_str.rsplit('', 1)[0] + datasets_str + distributions_str + vcard_str + '' + + # Manually add the vcard namespace declaration + final_str = final_str.replace( + '- + Remote Sensing Indices derived from SENTINEL S2A data + contact: + name: Data Deliver System Support Team + email: dds-support@cmcc.it + webpage: https://www.cmcc.it/research-organization/research-divisions/advanced-scientific-computing-division#1553329820238-2055494b-9aa6 + label: Remote Sensing Indices from Sentinel S2A + image: null + doi: null + update_frequency: null + license: null + publication_date: 2023-11-22 + related_data: null + +sources: + 10m: + description: Remote Sensing Indices at 10m + metadata: + role: public + filters: + - name: pasture + user_defined: T + label: Pasture + driver: geokube_netcdf + args: + path: '{{ CATALOG_DIR }}/datasets/RS_indices/*/10m/*.nc' + pattern: '{{ CATALOG_DIR }}/datasets/RS_indices/{pasture}/10m/{}.nc' + field_id: '{standard_name}' + mapping: + NDVI: {'name': 'NDVI', 'description': 'Normalized Difference Vegetation Index'} + NDWI: {'name': 'NDWI', 'description': 'Normalized Difference Water Index'} + GLI: {'name': 'GLI', 'description': 'Green Leaf Index'} + GCI: {'name': 'GCI', 'description': 'Green Chlorophyll Index'} + RGR: {'name': 'RGR', 'description': 'Red-Green Ratio'} + metadata_caching: false + metadata_cache_path: '{{ CACHE_DIR }}/s2-indices-10m.cache' + xarray_kwargs: + parallel: true + decode_coords: 'all' \ No newline at end of file diff --git a/catalog/cache.py b/catalog/cache.py new file mode 100644 index 0000000..15bca03 --- /dev/null +++ b/catalog/cache.py @@ -0,0 +1,22 @@ +import argparse +import intake + +parser = argparse.ArgumentParser( + prog="Cache generator", + description="The script generating cache for the catalog", +) +parser.add_argument( + "--cachedir", + type=str, + help="Directory where the cache should be saved. Default: .cache", + default=".cache", +) + +if __name__ == "__main__": + args = parser.parse_args() + catalog = intake.open_catalog("catalog.yaml") + for ds in list(catalog): + for p in list(catalog[ds]): + print(f"dataset: {ds} product: {p}:") + catalog = catalog(CACHE_DIR=args.cachedir) + kube = catalog[ds][p].read() \ No newline at end of file diff --git a/catalog/catalog.yaml b/catalog/catalog.yaml new file mode 100644 index 0000000..19e5913 --- /dev/null +++ b/catalog/catalog.yaml @@ -0,0 +1,23 @@ +metadata: + version: 0.1 + parameters: + CACHE_DIR: + type: str + description: folder to store metadata cache files + default: .cache + +sources: + era5-downscaled: + driver: yaml_file_cat + args: + path: '{{ CATALOG_DIR }}/era5_downscaled.yaml' + + thi: + driver: yaml_file_cat + args: + path: '{{ CATALOG_DIR }}/thi.yaml' + + rs-indices: + driver: yaml_file_cat + args: + path: '{{ CATALOG_DIR }}/RS_indices.yaml' diff --git a/catalog/datasets/RS_indices/Donnola/10m/regular.nc b/catalog/datasets/RS_indices/Donnola/10m/regular.nc new file mode 100644 index 0000000..18e011a Binary files /dev/null and b/catalog/datasets/RS_indices/Donnola/10m/regular.nc differ diff --git a/catalog/datasets/THI/20240101.nc b/catalog/datasets/THI/20240101.nc new file mode 100644 index 0000000..f764727 Binary files /dev/null and b/catalog/datasets/THI/20240101.nc differ diff --git a/catalog/era5_downscaled.yaml b/catalog/era5_downscaled.yaml new file mode 100644 index 0000000..672500c --- /dev/null +++ b/catalog/era5_downscaled.yaml @@ -0,0 +1,31 @@ +metadata: + description: >- + This dataset is related to ERA5 downscaled over Italy at 2km. + contact: + name: Data Deliver System Support Team + email: dds-support@cmcc.it + webpage: https://www.cmcc.it/research-organization/research-divisions/advanced-scientific-computing-division#1553329820238-2055494b-9aa6 + label: Remote Sensing Indices from Sentinel S2A + image: null + doi: null + update_frequency: null + license: null + publication_date: 2023-11-22 + related_data: null + +sources: + hourly: + description: ERA5 downscaled at 2km over italy hourly. + driver: geokube_netcdf + args: + path: '{{ CATALOG_DIR }}/datasets/era5_downscaled.nc' + metadata_caching: false + metadata_cache_path: '{{ CACHE_DIR }}/era5_downscaled.cache' + + zarr: + description: ERA5 downscaled at 2km over italy hourly. + driver: geokube_netcdf + args: + path: '{{ CATALOG_DIR }}/datasets/era5_downscaled.zarr' + metadata_caching: false + metadata_cache_path: '{{ CACHE_DIR }}/era5_downscaled.cache' \ No newline at end of file diff --git a/catalog/thi.yaml b/catalog/thi.yaml new file mode 100644 index 0000000..6bc9c97 --- /dev/null +++ b/catalog/thi.yaml @@ -0,0 +1,37 @@ +metadata: + description: >- + Thermohygrometric Indices derived from MISTRAL COSMO-2I data + contact: + name: Data Deliver System Support Team + email: dds-support@cmcc.it + webpage: https://www.cmcc.it/research-organization/research-divisions/advanced-scientific-computing-division#1553329820238-2055494b-9aa6 + label: Thermohygrometric Indices over Italy + image: null + doi: null + update_frequency: null + license: null + publication_date: 2023-06-19 + related_data: null + +sources: + hourly: + description: Hourly Thermohygrometric Indices + metadata: + role: public + filters: + - name: date + user_defined: T + label: Date + driver: geokube_netcdf + args: + path: '{{ CATALOG_DIR }}/datasets/THI/*.nc' + pattern: '{{ CATALOG_DIR }}/datasets/THI/{date}.nc' + field_id: '{standard_name}' + mapping: + THI_ext: {'name': 'external_thermohygrometric_index', 'description': 'External Thermohygrometric Index'} + THI_int: {'name': 'internal_thermohygrometric_index', 'description': 'Internal Thermohygrometric Index'} + metadata_caching: false + metadata_cache_path: '{{ CACHE_DIR }}/thi-hourly.cache' + xarray_kwargs: + parallel: true + decode_coords: 'all' \ No newline at end of file diff --git a/datastore/Dockerfile b/datastore/Dockerfile index 5dc465a..7e051cc 100644 --- a/datastore/Dockerfile +++ b/datastore/Dockerfile @@ -1,10 +1,9 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geolake ARG TAG=latest FROM $REGISTRY/geolake-drivers:$TAG -RUN conda install -c conda-forge --yes --freeze-installed psycopg2 \ - && conda clean -afy + COPY requirements.txt /app/requirements.txt -RUN pip install --no-cache-dir -r /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt COPY ./datastore /app/datastore COPY ./workflow /app/workflow COPY ./dbmanager /app/dbmanager @@ -13,3 +12,4 @@ COPY ./utils /app/utils COPY ./tests /app/tests COPY ./wait-for-it.sh / +WORKDIR / \ No newline at end of file diff --git a/datastore/datastore/datastore.py b/datastore/datastore/datastore.py index b6052e5..3c84bfe 100644 --- a/datastore/datastore/datastore.py +++ b/datastore/datastore/datastore.py @@ -50,7 +50,7 @@ def __init__(self) -> None: @log_execution_time(_LOG) def get_cached_product_or_read( - self, dataset_id: str, product_id: str, query: GeoQuery | None = None + self, dataset_id: str, product_id: str, ) -> DataCube | Dataset: """Get product from the cache instead of loading files indicated in the catalog if `metadata_caching` set to `True`. @@ -81,7 +81,7 @@ def get_cached_product_or_read( ) return self.catalog(CACHE_DIR=self.cache_dir)[dataset_id][ product_id - ].get(geoquery=query, compute=False).read_chunked() + ].read_chunked() return self.cache[dataset_id][product_id] @log_execution_time(_LOG) @@ -358,9 +358,9 @@ def query( self._LOG.debug("loading product...") kube = self.catalog(CACHE_DIR=self.cache_dir)[dataset_id][ product_id - ].get(geoquery=geoquery, compute=compute).process_with_query() + ].read_chunked() self._LOG.debug("original kube len: %s", len(kube)) - return kube + return Datastore._process_query(kube, geoquery, compute) @log_execution_time(_LOG) def estimate( @@ -391,8 +391,7 @@ def estimate( # NOTE: we always use catalog directly and single product cache self._LOG.debug("loading product...") # NOTE: for estimation we use cached products - kube = self.get_cached_product_or_read(dataset_id, product_id, - query=query) + kube = self.get_cached_product_or_read(dataset_id, product_id) self._LOG.debug("original kube len: %s", len(kube)) return Datastore._process_query(kube, geoquery, False).nbytes @@ -440,9 +439,33 @@ def _process_query(kube, query: GeoQuery, compute: None | bool = False): kube = kube.locations(**query.location) if query.time: Datastore._LOG.debug("subsetting by time...") - kube = kube.sel(time=query.time) + kube = kube.sel( + **{ + "time": Datastore._maybe_convert_dict_slice_to_slice( + query.time + ) + } + ) if query.vertical: Datastore._LOG.debug("subsetting by vertical...") - method = None if isinstance(query.vertical, slice) else "nearest" - kube = kube.sel(vertical=query.vertical, method=method) + if isinstance( + vertical := Datastore._maybe_convert_dict_slice_to_slice( + query.vertical + ), + slice, + ): + method = None + else: + method = "nearest" + kube = kube.sel(vertical=vertical, method=method) return kube.compute() if compute else kube + + @staticmethod + def _maybe_convert_dict_slice_to_slice(dict_vals): + if "start" in dict_vals or "stop" in dict_vals: + return slice( + dict_vals.get("start"), + dict_vals.get("stop"), + dict_vals.get("step"), + ) + return dict_vals \ No newline at end of file diff --git a/datastore/requirements.txt b/datastore/requirements.txt index 449eb47..0279878 100644 --- a/datastore/requirements.txt +++ b/datastore/requirements.txt @@ -1,2 +1,3 @@ networkx -pydantic<2.0.0 \ No newline at end of file +pydantic<2.0.0 +psycopg2-binary \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 995b7ca..c510542 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,19 +1,18 @@ version: "3" services: api: - build: - context: ./ - dockerfile: ./api/Dockerfile + image: local/geolake-api:latest ports: - "8080:80" depends_on: + - datastore - broker - db links: - broker - db environment: - CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml + CATALOG_PATH: /catalog/catalog.yaml POSTGRES_DB: dds POSTGRES_USER: dds POSTGRES_PASSWORD: dds @@ -21,11 +20,9 @@ services: POSTGRES_PORT: 5432 volumes: - downloads:/downloads:ro - command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"] + command: ["./../wait-for-it.sh", "broker:5672", "--", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"] executor: - build: - context: ./ - dockerfile: ./executor/Dockerfile + image: local/geolake-executor:latest depends_on: - broker - db @@ -37,14 +34,20 @@ services: environment: EXECUTOR_TYPES: query,info,estimate CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds + POSTGRES_DB: geolake + POSTGRES_USER: geolake + POSTGRES_PASSWORD: geolake POSTGRES_HOST: db POSTGRES_PORT: 5432 volumes: - downloads:/downloads:rw - command: ["./wait-for-it.sh", "broker:5672", "--", "python", "./app/main.py"] + command: ["./../wait-for-it.sh", "broker:5672", "--", "python", "./app/main.py"] + datastore: + image: local/geolake-datastore:latest + depends_on: + - drivers + drivers: + image: local/geolake-drivers:latest broker: image: rabbitmq:3 db: @@ -55,31 +58,12 @@ services: ports: - 5432:5432 environment: - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds + POSTGRES_DB: geolake + POSTGRES_USER: geolake + POSTGRES_PASSWORD: geolake POSTGRES_HOST: db POSTGRES_PORT: 5432 - web: - build: - context: ./ - dockerfile: ./web/Dockerfile - ports: - - "8080:80" - depends_on: - - db - links: - - db - environment: - CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds - POSTGRES_HOST: db - POSTGRES_PORT: 5432 - volumes: - - downloads:/downloads:ro - command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "web.main:app", "--host", "0.0.0.0", "--port", "80"] volumes: - downloads: \ No newline at end of file + downloads: + catalog: \ No newline at end of file diff --git a/drivers/Dockerfile b/drivers/Dockerfile index 6d5ffad..57874a2 100644 --- a/drivers/Dockerfile +++ b/drivers/Dockerfile @@ -1,8 +1,9 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geokube -ARG TAG=v0.2.6b1 +#ARG TAG=v0.2.6b2 +ARG TAG=latest FROM $REGISTRY/geokube:$TAG -RUN conda install -c conda-forge --yes --freeze-installed intake=0.6.6 -RUN conda clean -afy + COPY dist/intake_geokube-0.1a0-py3-none-any.whl / + RUN pip install /intake_geokube-0.1a0-py3-none-any.whl RUN rm /intake_geokube-0.1a0-py3-none-any.whl diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py new file mode 100644 index 0000000..62cdc21 --- /dev/null +++ b/drivers/intake_geokube/afm.py @@ -0,0 +1,99 @@ +"""geokube driver for intake.""" + +from typing import Mapping, Optional +import geokube +import numpy as np +import xarray as xr +from .base import GeokubeSource +from geokube import open_datacube, open_dataset +from geokube.core.datacube import DataCube + +_PROJECTION = {"grid_mapping_name": "latitude_longitude"} + +def postprocess_afm(ds: xr.Dataset, **post_process_chunks): + if isinstance(ds, geokube.core.datacube.DataCube): + ds = ds.to_xarray() + latitude = ds['lat'].values + longitude = ds['lon'].values + # ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0)) + ds = ds.drop('lat') + ds = ds.drop('lon') + ds = ds.drop('certainty') + deduplicated = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1, 0)) + # print(deduplicated.dims) + for dim in deduplicated.dims: + indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} + deduplicated = deduplicated.isel(indexes) + return DataCube.from_xarray( + deduplicated.sortby('time').sortby('latitude').sortby('longitude').chunk(post_process_chunks)) + +def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset: + """Add projection information to the dataset""" + coords = dset.coords + coords["crs"] = xr.DataArray(data=np.array(1), attrs=_PROJECTION) + for var in dset.data_vars.values(): + enc = var.encoding + enc["grid_mapping"] = "crs" + return dset + + +class CMCCAFMSource(GeokubeSource): + name = "cmcc_afm_geokube" + + def __init__( + self, + path: str, + pattern: str = None, + field_id: str = None, + delay_read_cubes: bool = False, + metadata_caching: bool = False, + metadata_cache_path: str = None, + storage_options: dict = None, + xarray_kwargs: dict = None, + metadata=None, + mapping: Optional[Mapping[str, Mapping[str, str]]] = None, + load_files_on_persistance: Optional[bool] = True, + postprocess_chunk: Optional = None + ): + self._kube = None + self.path = path + self.pattern = pattern + self.field_id = field_id + self.delay_read_cubes = delay_read_cubes + self.metadata_caching = metadata_caching + self.metadata_cache_path = metadata_cache_path + self.storage_options = storage_options + self.mapping = mapping + self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs + self.load_files_on_persistance = load_files_on_persistance + self.postprocess_chunk = postprocess_chunk + # self.preprocess = preprocess_afm + super(CMCCAFMSource, self).__init__(metadata=metadata) + + def _open_dataset(self): + if self.pattern is None: + self._kube =\ + postprocess_afm( + open_datacube( + path=self.path, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + # preprocess=self.preprocess + ), + **self.postprocess_chunk + ).resample('maximum', frequency='1H') + else: + self._kube = open_dataset( + path=self.path, + pattern=self.pattern, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + # preprocess=self.preprocess + ).apply(postprocess_afm,**self.postprocess_chunk).resample('maximum', frequency='1H') + return self._kube diff --git a/executor/Dockerfile b/executor/Dockerfile index 3888c93..e3404e5 100644 --- a/executor/Dockerfile +++ b/executor/Dockerfile @@ -1,8 +1,18 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geolake ARG TAG=latest FROM $REGISTRY/geolake-datastore:$TAG + +RUN apt update && apt install -y cron curl + + WORKDIR /app COPY requirements.txt /code/requirements.txt RUN pip install --no-cache-dir -r /code/requirements.txt COPY app /app + +COPY ./healtcheck.* /opt/ + +RUN chmod +x /opt/healtcheck.sh +RUN crontab -u root /opt/healtcheck.cron + CMD [ "python", "main.py" ] diff --git a/executor/app/main.py b/executor/app/main.py index 71e5b46..6dc3566 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -126,6 +126,9 @@ def persist_datacube( case "jpeg": full_path = os.path.join(base_path, f"{path}.jpg") kube.to_image(full_path, **format_args) + case "csv": + full_path = os.path.join(base_path, f"{path}.csv") + kube.to_csv(full_path) case _: raise ValueError(f"format `{format}` is not supported") return full_path @@ -184,6 +187,9 @@ def _persist_single_datacube(dataframe_item, base_path, format, format_args=None case "jpeg": full_path = os.path.join(base_path, f"{path}.jpg") dcube.to_image(full_path, **format_args) + case "csv": + full_path = os.path.join(base_path, f"{path}.csv") + dcube.to_csv(full_path) return full_path if isinstance(message.content, GeoQuery): @@ -247,7 +253,7 @@ def process(message: Message, compute: bool): class Executor(metaclass=LoggableMeta): _LOG = logging.getLogger("geokube.Executor") - def __init__(self, broker, store_path): + def __init__(self, broker, store_path, dask_cluster_opts): self._store = store_path broker_conn = pika.BlockingConnection( pika.ConnectionParameters(host=broker, heartbeat=10), @@ -255,18 +261,12 @@ def __init__(self, broker, store_path): self._conn = broker_conn self._channel = broker_conn.channel() self._db = DBManager() + self.dask_cluster_opts = dask_cluster_opts def create_dask_cluster(self, dask_cluster_opts: dict = None): if dask_cluster_opts is None: - dask_cluster_opts = {} - dask_cluster_opts["scheduler_port"] = int( - os.getenv("DASK_SCHEDULER_PORT", 8188) - ) - dask_cluster_opts["processes"] = True - port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) - dask_cluster_opts["dashboard_address"] = f":{port}" - dask_cluster_opts["n_workers"] = None - dask_cluster_opts["memory_limit"] = "auto" + dask_cluster_opts = self.dask_cluster_opts + self._worker_id = self._db.create_worker( status="enabled", dask_scheduler_port=dask_cluster_opts["scheduler_port"], @@ -278,10 +278,11 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): extra={"track_id": self._worker_id}, ) dask_cluster = LocalCluster( - n_workers=dask_cluster_opts["n_workers"], + n_workers=dask_cluster_opts['n_workers'], scheduler_port=dask_cluster_opts["scheduler_port"], dashboard_address=dask_cluster_opts["dashboard_address"], memory_limit=dask_cluster_opts["memory_limit"], + threads_per_worker=dask_cluster_opts['thread_per_worker'], ) self._LOG.info( "creating Dask Client...", extra={"track_id": self._worker_id} @@ -372,7 +373,7 @@ def retry_until_timeout( ) status = RequestStatus.FAILED fail_reason = f"{type(e).__name__}: {str(e)}" - return (location_path, status, fail_reason) + return location_path, status, fail_reason def handle_message(self, connection, channel, delivery_tag, body): message: Message = Message(body) @@ -398,6 +399,9 @@ def handle_message(self, connection, channel, delivery_tag, body): message=message, compute=False, ) + + #future = asyncio.run(process(message,compute=False)) + location_path, status, fail_reason = self.retry_until_timeout( future, message=message, @@ -463,7 +467,19 @@ def get_size(self, location_path): executor_types = os.getenv("EXECUTOR_TYPES", "query").split(",") store_path = os.getenv("STORE_PATH", ".") - executor = Executor(broker=broker, store_path=store_path) + dask_cluster_opts = {} + dask_cluster_opts["scheduler_port"] = int( + os.getenv("DASK_SCHEDULER_PORT", 8188) + ) + dask_cluster_opts["processes"] = True + port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) + dask_cluster_opts["dashboard_address"] = f":{port}" + dask_cluster_opts["n_workers"] = int(os.getenv("DASK_N_WORKERS", 1)) + dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto") + dask_cluster_opts['thread_per_worker'] = int(os.getenv("DASK_THREADS_PER_WORKER", 8)) + + + executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts) print("channel subscribe") for etype in executor_types: if etype == "query": diff --git a/executor/healtcheck.cron b/executor/healtcheck.cron new file mode 100644 index 0000000..e36bf23 --- /dev/null +++ b/executor/healtcheck.cron @@ -0,0 +1 @@ +*/10 * * * * bash -c '/opt/healtcheck.sh' diff --git a/executor/healtcheck.sh b/executor/healtcheck.sh new file mode 100644 index 0000000..4d8e479 --- /dev/null +++ b/executor/healtcheck.sh @@ -0,0 +1 @@ +curl https://hc-ping.com/$HEALTCHECKS \ No newline at end of file