diff --git a/.github/workflows/build-production.yml b/.github/workflows/build-production.yml index 608c92b..a47b3ef 100644 --- a/.github/workflows/build-production.yml +++ b/.github/workflows/build-production.yml @@ -63,6 +63,7 @@ jobs: push: true build-args: | REGISTRY=${{ vars.DOCKER_REGISTRY }} + GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }} tags: | ${{ vars.DOCKER_REGISTRY }}/geolake-api:${{ env.RELEASE_TAG }} ${{ vars.DOCKER_REGISTRY }}/geolake-api:latest @@ -74,6 +75,19 @@ jobs: push: true build-args: | REGISTRY=${{ vars.DOCKER_REGISTRY }} + GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }} tags: | ${{ vars.DOCKER_REGISTRY }}/geolake-executor:${{ env.RELEASE_TAG }} - ${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest \ No newline at end of file + ${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest + - name: Build and push broker component + uses: docker/build-push-action@v4 + with: + context: ./broker + file: ./broker/Dockerfile + push: true + build-args: | + REGISTRY=${{ vars.DOCKER_REGISTRY }} + GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }} + tags: | + ${{ vars.DOCKER_REGISTRY }}/geolake-broker:${{ env.RELEASE_TAG }} + ${{ vars.DOCKER_REGISTRY }}/geolake-broker:latest \ No newline at end of file diff --git a/.github/workflows/build-staging.yml b/.github/workflows/build-staging.yml index 7c16ff2..1d43755 100644 --- a/.github/workflows/build-staging.yml +++ b/.github/workflows/build-staging.yml @@ -61,6 +61,7 @@ jobs: push: true build-args: | REGISTRY=${{ vars.DOCKER_REGISTRY }} + GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }} tags: | ${{ vars.DOCKER_REGISTRY }}/geolake-api:${{ env.TAG }} ${{ vars.DOCKER_REGISTRY }}/geolake-api:latest @@ -72,6 +73,19 @@ jobs: push: true build-args: | REGISTRY=${{ vars.DOCKER_REGISTRY }} + GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }} tags: | ${{ vars.DOCKER_REGISTRY }}/geolake-executor:${{ env.TAG }} - ${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest \ No newline at end of file + ${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest + - name: Build and push broker component + uses: docker/build-push-action@v4 + with: + context: ./broker + file: ./broker/Dockerfile + push: true + build-args: | + REGISTRY=${{ vars.DOCKER_REGISTRY }} + GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }} + tags: | + ${{ vars.DOCKER_REGISTRY }}/geolake-broker:${{ env.TAG }} + ${{ vars.DOCKER_REGISTRY }}/geolake-broker:latest \ No newline at end of file diff --git a/api/Dockerfile b/api/Dockerfile index a2cfea0..f6d0cfc 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -1,9 +1,11 @@ ARG REGISTRY=rg.nl-ams.scw.cloud/geodds-production ARG TAG=latest +ARG GEODDS_UTILS_PAT FROM $REGISTRY/geolake-datastore:$TAG WORKDIR /app COPY requirements.txt /code/requirements.txt RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt +RUN pip install git+https://opengeokube:${GEODDS_UTILS_PAT}@github.com/opengeokube/geodds-utils.git COPY app /app EXPOSE 80 CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] diff --git a/api/app/auth/backend.py b/api/app/auth/backend.py index c172b58..da58fd9 100644 --- a/api/app/auth/backend.py +++ b/api/app/auth/backend.py @@ -1,4 +1,5 @@ """The module contains authentication backend""" + from uuid import UUID from starlette.authentication import ( diff --git a/api/app/auth/manager.py b/api/app/auth/manager.py index 02bf686..98f6108 100644 --- a/api/app/auth/manager.py +++ b/api/app/auth/manager.py @@ -1,4 +1,5 @@ """Module with access/authentication functions""" + from typing import Optional from utils.api_logging import get_dds_logger diff --git a/api/app/auth/models.py b/api/app/auth/models.py index bff896f..01f07c2 100644 --- a/api/app/auth/models.py +++ b/api/app/auth/models.py @@ -1,4 +1,5 @@ """The module contains models related to the authentication and authorization""" + from starlette.authentication import SimpleUser diff --git a/api/app/callbacks/on_startup.py b/api/app/callbacks/on_startup.py index ec883d3..738bc7e 100644 --- a/api/app/callbacks/on_startup.py +++ b/api/app/callbacks/on_startup.py @@ -1,4 +1,5 @@ """Module with functions call during API server startup""" + from utils.api_logging import get_dds_logger from datastore.datastore import Datastore diff --git a/api/app/decorators_factory.py b/api/app/decorators_factory.py index d2e4b39..4e02e8e 100644 --- a/api/app/decorators_factory.py +++ b/api/app/decorators_factory.py @@ -1,4 +1,5 @@ """Modules with utils for creating decorators""" + from inspect import Signature diff --git a/api/app/endpoint_handlers/dataset.py b/api/app/endpoint_handlers/dataset.py index c03a54b..d6f974f 100644 --- a/api/app/endpoint_handlers/dataset.py +++ b/api/app/endpoint_handlers/dataset.py @@ -1,13 +1,15 @@ """Modules realizing logic for dataset-related endpoints""" -import os -import pika + import json from typing import Optional from fastapi.responses import FileResponse +from geodds_utils.units import make_bytes_readable_dict +from geodds_utils.workflow import log_execution_time + from dbmanager.dbmanager import DBManager, RequestStatus -from intake_geokube.queries.geoquery import GeoQuery +from intake_geokube.queries.geoquery import GeoQuery, Workflow from intake_geokube.queries.workflow import Workflow from datastore.datastore import Datastore, DEFAULT_MAX_REQUEST_SIZE_GB from datastore import exception as datastore_exception @@ -26,7 +28,24 @@ log = get_dds_logger(__name__) data_store = Datastore() -MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"] +PENDING_QUEUE: str = "pending" + + +def convert_to_workflow( + dataset_id: str, product_id: str, geoquery: GeoQuery +) -> Workflow: + raw_task = { + "id": "geoquery", + "op": "subset", + "use": [], + "args": { + "dataset_id": dataset_id, + "product_id": product_id, + "query": geoquery.dict(), + }, + } + return TaskList.parse([raw_task]) + def _is_etimate_enabled(dataset_id, product_id): if dataset_id in ("sentinel-2",): @@ -260,7 +279,9 @@ def async_query( """ log.debug("geoquery: %s", query) if _is_etimate_enabled(dataset_id, product_id): - estimated_size = estimate(dataset_id, product_id, query, "GB").get("value") + estimated_size = estimate(dataset_id, product_id, query, "GB").get( + "value" + ) allowed_size = data_store.product_metadata(dataset_id, product_id).get( "maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB ) @@ -275,36 +296,17 @@ def async_query( raise exc.EmptyDatasetError( dataset_id=dataset_id, product_id=product_id ) - broker_conn = pika.BlockingConnection( - pika.ConnectionParameters( - host=os.getenv("BROKER_SERVICE_HOST", "broker") - ) - ) - broker_channel = broker_conn.channel() request_id = DBManager().create_request( user_id=user_id, dataset=dataset_id, product=product_id, - query=json.dumps(query.model_dump_original()), - ) - - # TODO: find a separator; for the moment use "\" - message = MESSAGE_SEPARATOR.join( - [str(request_id), "query", dataset_id, product_id, query.json()] - ) - - broker_channel.basic_publish( - exchange="", - routing_key="query_queue", - body=message, - properties=pika.BasicProperties( - delivery_mode=2, # make message persistent - ), + query=convert_to_workflow(dataset_id, product_id, query).json(), + status=RequestStatus.PENDING, ) - broker_conn.close() return request_id + @log_execution_time(log) @assert_product_exists def sync_query( @@ -343,29 +345,32 @@ def sync_query( if estimated size is zero """ - + import time + request_id = async_query(user_id, dataset_id, product_id, query) status, _ = DBManager().get_request_status_and_reason(request_id) log.debug("sync query: status: %s", status) - while status in (RequestStatus.RUNNING, RequestStatus.QUEUED, - RequestStatus.PENDING): + while status in ( + RequestStatus.RUNNING, + RequestStatus.QUEUED, + RequestStatus.PENDING, + ): time.sleep(1) status, _ = DBManager().get_request_status_and_reason(request_id) log.debug("sync query: status: %s", status) - + if status is RequestStatus.DONE: download_details = DBManager().get_download_details_for_request_id( - request_id + request_id ) return FileResponse( path=download_details.location_path, filename=download_details.location_path.split(os.sep)[-1], ) raise exc.ProductRetrievingError( - dataset_id=dataset_id, - product_id=product_id, - status=status.name) + dataset_id=dataset_id, product_id=product_id, status=status.name + ) @log_execution_time(log) @@ -400,31 +405,12 @@ def run_workflow( """ log.debug("geoquery: %s", workflow) - broker_conn = pika.BlockingConnection( - pika.ConnectionParameters( - host=os.getenv("BROKER_SERVICE_HOST", "broker") - ) - ) - broker_channel = broker_conn.channel() + request_id = DBManager().create_request( user_id=user_id, dataset=workflow.dataset_id, product=workflow.product_id, query=workflow.json(), + status=RequestStatus.PENDING, ) - - # TODO: find a separator; for the moment use "\" - message = MESSAGE_SEPARATOR.join( - [str(request_id), "workflow", workflow.json()] - ) - - broker_channel.basic_publish( - exchange="", - routing_key="query_queue", - body=message, - properties=pika.BasicProperties( - delivery_mode=2, # make message persistent - ), - ) - broker_conn.close() return request_id diff --git a/api/app/endpoint_handlers/file.py b/api/app/endpoint_handlers/file.py index 04cf562..e98d6e0 100644 --- a/api/app/endpoint_handlers/file.py +++ b/api/app/endpoint_handlers/file.py @@ -1,11 +1,12 @@ """Module with functions to handle file related endpoints""" + import os from fastapi.responses import FileResponse from dbmanager.dbmanager import DBManager, RequestStatus +from geodds_utils.workflow import log_execution_time from utils.api_logging import get_dds_logger -from utils.metrics import log_execution_time import exceptions as exc log = get_dds_logger(__name__) @@ -41,11 +42,8 @@ def download_request_result(request_id: int): "preparing downloads for request id: %s", request_id, ) - ( - request_status, - _, - ) = DBManager().get_request_status_and_reason(request_id=request_id) - if request_status is not RequestStatus.DONE: + request = DBManager().get_request(request_id=request_id) + if request.status is not RequestStatus.DONE: log.debug( "request with id: '%s' does not exist or it is not finished yet!", request_id, diff --git a/api/app/endpoint_handlers/request.py b/api/app/endpoint_handlers/request.py index 93a0636..492bbc1 100644 --- a/api/app/endpoint_handlers/request.py +++ b/api/app/endpoint_handlers/request.py @@ -1,8 +1,10 @@ """Modules with functions realizing logic for requests-related endpoints""" + from dbmanager.dbmanager import DBManager +from geodds_utils.workflow import log_execution_time from utils.api_logging import get_dds_logger -from utils.metrics import log_execution_time + import exceptions as exc log = get_dds_logger(__name__) @@ -26,7 +28,7 @@ def get_requests(user_id: str): requests : list List of all requests done by the user """ - return DBManager().get_requests_for_user_id(user_id=user_id) + return DBManager().get_request(user_id=user_id) @log_execution_time(log) @@ -51,15 +53,14 @@ def get_request_status(user_id: str, request_id: int): Tuple of status and fail reason. """ # NOTE: maybe verification should be added if user checks only him\her requests - try: - status, reason = DBManager().get_request_status_and_reason(request_id) - except IndexError as err: + request = DBManager().get_request(request_id=request_id) + if not request: log.error( "request with id: '%s' was not found!", request_id, ) - raise exc.RequestNotFound(request_id=request_id) from err - return {"status": status.name, "fail_reason": reason} + raise exc.RequestNotFound(request_id=request_id) + return {"status": request.status.name, "fail_reason": request.fail_reason} @log_execution_time(log) @@ -88,8 +89,9 @@ def get_request_resulting_size(request_id: int): if request := DBManager().get_request_details(request_id): size = request.download.size_bytes if not size or size == 0: - raise exc.EmptyDatasetError(dataset_id=request.dataset, - product_id=request.product) + raise exc.EmptyDatasetError( + dataset_id=request.dataset, product_id=request.product + ) return size log.info( "request with id '%s' could not be found", @@ -128,17 +130,14 @@ def get_request_uri(request_id: int): ) raise exc.RequestNotFound(request_id=request_id) from err if download_details is None: - ( - request_status, - _, - ) = DBManager().get_request_status_and_reason(request_id) + request = DBManager().get_request(request_id=request_id) log.info( "download URI not found for request id: '%s'." " Request status is '%s'", request_id, - request_status, + request.status, ) raise exc.RequestStatusNotDone( - request_id=request_id, request_status=request_status + request_id=request_id, request_status=request.status ) return download_details.download_uri diff --git a/api/app/exceptions.py b/api/app/exceptions.py index af4d072..60791b7 100644 --- a/api/app/exceptions.py +++ b/api/app/exceptions.py @@ -1,4 +1,5 @@ """Module with DDS exceptions definitions""" + from typing import Optional from fastapi import HTTPException @@ -181,15 +182,17 @@ def __init__(self, dataset_id, product_id): ) super().__init__(self.msg) + class ProductRetrievingError(BaseDDSException): """Retrieving of the product failed.""" - msg: str = "Retrieving of the product '{dataset_id}.{product_id}' failed with the status {status}" + msg: str = ( + "Retrieving of the product '{dataset_id}.{product_id}' failed with the" + " status {status}" + ) def __init__(self, dataset_id, product_id, status): self.msg = self.msg.format( - dataset_id=dataset_id, - product_id=product_id, - status=status + dataset_id=dataset_id, product_id=product_id, status=status ) - super().__init__(self.msg) \ No newline at end of file + super().__init__(self.msg) diff --git a/api/app/main.py b/api/app/main.py index 2084394..488c68d 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -1,4 +1,5 @@ """Main module with dekube-dds API endpoints defined""" + __version__ = "2.0" import os from typing import Optional @@ -34,21 +35,40 @@ from const import venv, tags from auth import scopes + def map_to_geoquery( - variables: list[str], - format: str, - bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) - time: datetime | None = None, - **format_kwargs + variables: list[str], + format: str, + bbox: ( + str | None + ) = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + time: datetime | None = None, + **format_kwargs, ) -> GeoQuery: - bbox_ = [float(x) for x in bbox.split(',')] - area = { 'west': bbox_[0], 'south': bbox_[1], 'east': bbox_[2], 'north': bbox_[3], } - time_ = { 'year': time.year, 'month': time.month, 'day': time.day, 'hour': time.hour} - query = GeoQuery(variable=variables, time=time_, area=area, - format_args=format_kwargs, format=format) + bbox_ = [float(x) for x in bbox.split(",")] + area = { + "west": bbox_[0], + "south": bbox_[1], + "east": bbox_[2], + "north": bbox_[3], + } + time_ = { + "year": time.year, + "month": time.month, + "day": time.day, + "hour": time.hour, + } + query = GeoQuery( + variable=variables, + time=time_, + area=area, + format_args=format_kwargs, + format=format, + ) return query + logger = get_dds_logger(__name__) # ======== JSON encoders extension ========= # @@ -172,6 +192,7 @@ async def get_product_details( except exc.BaseDDSException as err: raise err.wrap_around_http_exception() from err + @app.get("/datasets/{dataset_id}/{product_id}/map", tags=[tags.DATASET]) @timer( app.state.api_request_duration_seconds, @@ -181,22 +202,24 @@ async def get_map( request: Request, dataset_id: str, product_id: str, -# OGC WMS parameters + # OGC WMS parameters width: int, height: int, layers: str | None = None, - format: str | None = 'png', + format: str | None = "png", time: datetime | None = None, - transparent: bool | None = 'true', - bgcolor: str | None = 'FFFFFF', - bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) - crs: str | None = None, -# OGC map parameters + transparent: bool | None = "true", + bgcolor: str | None = "FFFFFF", + bbox: ( + str | None + ) = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + crs: str | None = None, + # OGC map parameters # subset: str | None = None, # subset_crs: str | None = Query(..., alias="subset-crs"), # bbox_crs: str | None = Query(..., alias="bbox-crs"), ): - + app.state.api_http_requests_total.inc( {"route": "GET /datasets/{dataset_id}/{product_id}/map"} ) @@ -209,39 +232,54 @@ async def get_map( # vertical: Optional[Union[float, List[float], Dict[str, float]]] # filters: Optional[Dict] # format: Optional[str] - query = map_to_geoquery(variables=layers, bbox=bbox, time=time, - format="png", width=width, height=height, - transparent=transparent, bgcolor=bgcolor) + query = map_to_geoquery( + variables=layers, + bbox=bbox, + time=time, + format="png", + width=width, + height=height, + transparent=transparent, + bgcolor=bgcolor, + ) try: return dataset_handler.sync_query( user_id=request.user.id, dataset_id=dataset_id, product_id=product_id, - query=query + query=query, ) except exc.BaseDDSException as err: raise err.wrap_around_http_exception() from err -@app.get("/datasets/{dataset_id}/{product_id}/items/{feature_id}", tags=[tags.DATASET]) + +@app.get( + "/datasets/{dataset_id}/{product_id}/items/{feature_id}", + tags=[tags.DATASET], +) @timer( app.state.api_request_duration_seconds, - labels={"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"}, + labels={ + "route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}" + }, ) async def get_feature( request: Request, dataset_id: str, product_id: str, feature_id: str, -# OGC feature parameters + # OGC feature parameters time: datetime | None = None, - bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) - crs: str | None = None, -# OGC map parameters + bbox: ( + str | None + ) = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + crs: str | None = None, + # OGC map parameters # subset: str | None = None, # subset_crs: str | None = Query(..., alias="subset-crs"), # bbox_crs: str | None = Query(..., alias="bbox-crs"), ): - + app.state.api_http_requests_total.inc( {"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"} ) @@ -255,18 +293,20 @@ async def get_feature( # filters: Optional[Dict] # format: Optional[str] - query = map_to_geoquery(variables=[feature_id], bbox=bbox, time=time, - format="geojson") + query = map_to_geoquery( + variables=[feature_id], bbox=bbox, time=time, format="geojson" + ) try: return dataset_handler.sync_query( user_id=request.user.id, dataset_id=dataset_id, product_id=product_id, - query=query + query=query, ) except exc.BaseDDSException as err: raise err.wrap_around_http_exception() from err + @app.get("/datasets/{dataset_id}/{product_id}/metadata", tags=[tags.DATASET]) @timer( app.state.api_request_duration_seconds, diff --git a/api/app/validation.py b/api/app/validation.py index 51bdbc1..2990144 100644 --- a/api/app/validation.py +++ b/api/app/validation.py @@ -10,8 +10,7 @@ def assert_product_exists(func): - """Decorator for convenient checking if product is defined in the catalog - """ + """Decorator for convenient checking if product is defined in the catalog""" sig = signature(func) assert_parameters_are_defined( sig, required_parameters=[("dataset_id", str), ("product_id", str)] diff --git a/broker/Dockerfile b/broker/Dockerfile new file mode 100644 index 0000000..7d69419 --- /dev/null +++ b/broker/Dockerfile @@ -0,0 +1,8 @@ +FROM rg.nl-ams.scw.cloud/geokube-production/geodds-datastore:latest +ARG GEODDS_UTILS_PAT +WORKDIR /app +COPY requirements.txt /code/requirements.txt +RUN pip install --no-cache-dir -r /code/requirements.txt +RUN pip install git+https://opengeokube:${GEODDS_UTILS_PAT}@github.com/opengeokube/geodds-utils.git +COPY app /app +CMD [ "python", "main.py" ] diff --git a/broker/app/__init__.py b/broker/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/broker/app/main.py b/broker/app/main.py new file mode 100644 index 0000000..403e3cf --- /dev/null +++ b/broker/app/main.py @@ -0,0 +1,124 @@ +import os +import logging +from datetime import datetime + +from dbmanager.dbmanager import DBManager, RequestStatus, Request + +from geodds_utils.meta import LoggableMeta +from geodds_utils.messaging import Publisher +from geodds_utils.workflow import run_in_interval + + +class Broker(metaclass=LoggableMeta): + _LOG = logging.getLogger("geokube.Broker") + QUEUE_NAME: str = "execute" + + def __init__(self): + self._LOG.debug( + "setting up requests broker...", extra={"track_id": "n.a."} + ) + self._request_limit: int = int(os.environ["RUNNING_REQUEST_LIMIT"]) + self._db = DBManager() + self._LOG.debug( + "request broker is ready. request limit set to %d", + self._request_limit, + extra={"track_id": "n.a."}, + ) + + @property + def _eligible_statuses(self) -> tuple[Request]: + return ( + RequestStatus.QUEUED, + RequestStatus.RUNNING, + ) + + @staticmethod + def on_error_callback(err: Exception) -> None: + Broker._LOG.error( + "error occured during broker processing: %s", + err, + extra={"track_id": "n.a."}, + ) + raise err + + @staticmethod + def pre() -> None: + Broker._LOG.debug( + "getting eligible requests for processing...", + extra={"track_id": "n.a."}, + ) + + @staticmethod + def post() -> None: + Broker._LOG.debug( + "messages emitted sucesfully", extra={"track_id": "n.a."} + ) + + def is_request_schedulable(self, request: Request) -> bool: + if ( + user_requests_nbr := len( + self._db.get_request( + user_id=request.user_id, status=self._eligible_statuses + ) + ) + ) < self._request_limit: + return True + self._LOG.debug( + "user %s has too many (%d) requests in eligible status. maximum" + " allowed number is %d", + request.user_id, + user_requests_nbr, + self._request_limit, + extra={"track_id": request.request_id}, + ) + return False + + def _process_single_schedulable_request( + self, publisher: Publisher, request: Request + ) -> None: + timestamp_id: str = str(datetime.utcnow().isoformat()) + publisher.publish(str(request.request_id), timestamp_id) + self._db.update_request( + request_id=request.request_id, + status=RequestStatus.QUEUED, + lock=True, + ) + + @run_in_interval( + every=int(os.environ["REQUEST_STATUS_CHECK_EVERY"]), + retries=-1, + pre=pre, + post=post, + on_error=on_error_callback, + ) + def emit_permitted_messages_in_interval(self): + self._LOG.debug( + "obtaining pending request from the DB...", + extra={"track_id": "n.a."}, + ) + pending_requests: list[Request] = self._db.get_request( + status=RequestStatus.PENDING, sort=True + ) + self._LOG.debug( + "found %d pending requests", + len(pending_requests), + extra={"track_id": "n.a."}, + ) + publisher = Publisher(queue=self.QUEUE_NAME, use_venv_host=True) + emitted_msg_counter: int = 0 + for request in pending_requests: + if not self.is_request_schedulable(request): + continue + self._process_single_schedulable_request( + publisher=publisher, request=request + ) + emitted_msg_counter += 1 + self._LOG.debug( + "%d requests published to the queue", + emitted_msg_counter, + extra={"track_id": "n.a."}, + ) + + +if __name__ == "__main__": + Broker().emit_permitted_messages_in_interval() diff --git a/broker/requirements.txt b/broker/requirements.txt new file mode 100644 index 0000000..1fbe63b --- /dev/null +++ b/broker/requirements.txt @@ -0,0 +1,3 @@ +pika +sqlalchemy +pydantic \ No newline at end of file diff --git a/datastore/datastore/datastore.py b/datastore/datastore/datastore.py index ca402fe..e4a50b3 100644 --- a/datastore/datastore/datastore.py +++ b/datastore/datastore/datastore.py @@ -1,4 +1,5 @@ """Module for catalog management classes and functions""" + from __future__ import annotations import os @@ -102,7 +103,10 @@ def _load_cache(self, datasets: list[str] | None = None): catalog_entry = self.catalog(CACHE_DIR=self.cache_dir)[ dataset_id ][product_id] - if hasattr(catalog_entry, "metadata_caching") and not catalog_entry.metadata_caching: + if ( + hasattr(catalog_entry, "metadata_caching") + and not catalog_entry.metadata_caching + ): self._LOG.info( "`metadata_caching` for product %s.%s set to `False`", dataset_id, @@ -110,16 +114,14 @@ def _load_cache(self, datasets: list[str] | None = None): ) continue try: - self.cache[dataset_id][ - product_id - ] = catalog_entry.read() + self.cache[dataset_id][product_id] = catalog_entry.read() except ValueError: self._LOG.error( "failed to load cache for `%s.%s`", dataset_id, product_id, exc_info=True, - ) + ) except NotImplementedError: pass @@ -392,8 +394,9 @@ def estimate( # NOTE: we always use catalog directly and single product cache self._LOG.debug("loading product...") # NOTE: for estimation we use cached products - kube = self.get_cached_product_or_read(dataset_id, product_id, - query=query) + kube = self.get_cached_product_or_read( + dataset_id, product_id, query=query + ) return Datastore._process_query(kube, geoquery, False).nbytes @log_execution_time(_LOG) @@ -425,7 +428,9 @@ def _process_query(kube, query: GeoQuery, compute: None | bool = False): try: kube = kube.filter(**query.filters) except ValueError as err: - Datastore._LOG.warning("could not filter by one of the key: %s", err) + Datastore._LOG.warning( + "could not filter by one of the key: %s", err + ) if isinstance(kube, Delayed) and compute: kube = kube.compute() if query.variable: diff --git a/datastore/datastore/util.py b/datastore/datastore/util.py index 4122d57..11c69dd 100644 --- a/datastore/datastore/util.py +++ b/datastore/datastore/util.py @@ -1,4 +1,5 @@ """Utils module""" + from functools import wraps import datetime import logging diff --git a/datastore/dbmanager/dbmanager.py b/datastore/dbmanager/dbmanager.py index d4ff293..cf99268 100644 --- a/datastore/dbmanager/dbmanager.py +++ b/datastore/dbmanager/dbmanager.py @@ -197,14 +197,12 @@ def add_user( user_id=user_id, api_key=api_key, contact_name=contact_name ) if roles_names: - user.roles.extend( - [ - session.query(Role) - .where(Role.role_name == role_name) - .all()[0] # NOTE: role_name is unique in the database - for role_name in roles_names - ] - ) + user.roles.extend([ + session.query(Role) + .where(Role.role_name == role_name) + .all()[0] # NOTE: role_name is unique in the database + for role_name in roles_names + ]) session.add(user) session.commit() return user diff --git a/drivers/intake_geokube/sentinel/driver.py b/drivers/intake_geokube/sentinel/driver.py index 4895103..cd85716 100644 --- a/drivers/intake_geokube/sentinel/driver.py +++ b/drivers/intake_geokube/sentinel/driver.py @@ -62,8 +62,10 @@ def _bounding_box_to_polygon( def _timecombo_to_day_range(combo: TimeComboDict) -> tuple[str, str]: - return (f"{combo['year']}-{combo['month']}-{combo['day']}T00:00:00", - f"{combo['year']}-{combo['month']}-{combo['day']}T23:59:59") + return ( + f"{combo['year']}-{combo['month']}-{combo['day']}T00:00:00", + f"{combo['year']}-{combo['month']}-{combo['day']}T23:59:59", + ) def _location_to_valid_point( @@ -156,7 +158,10 @@ def preprocess_sentinel(dset: xr.Dataset) -> xr.Dataset: "latitude": (("x", "y"), lat_vals), "longitude": (("x", "y"), lon_vals), }).rename({"band_data": _get_field_name_from_path(source_path)}) - expanded_timedim_dataarrays = {var_name: dset[var_name].expand_dims('time') for var_name in dset.data_vars} + expanded_timedim_dataarrays = { + var_name: dset[var_name].expand_dims("time") + for var_name in dset.data_vars + } dset = dset.update(expanded_timedim_dataarrays) return dset @@ -257,10 +262,18 @@ def _build_odata_from_geoquery(self, query: GeoQuery) -> ODataRequest: builder = self._force_sentinel_type(builder) if query.time: if isinstance(query.time, dict): - timecombo_start, timecombo_end = _timecombo_to_day_range(query.time) - self.log.debug("filtering by timecombo: [%s, %s] ", timecombo_start, timecombo_end) + timecombo_start, timecombo_end = _timecombo_to_day_range( + query.time + ) + self.log.debug( + "filtering by timecombo: [%s, %s] ", + timecombo_start, + timecombo_end, + ) builder = builder.filter_date( - _SentinelKeys.SENSING_TIME, ge=timecombo_start, le=timecombo_end + _SentinelKeys.SENSING_TIME, + ge=timecombo_start, + le=timecombo_end, ) elif isinstance(query.time, slice): self.log.debug("filtering by slice: %s", query.time) diff --git a/executor/Dockerfile b/executor/Dockerfile index db3cebb..ebf7678 100644 --- a/executor/Dockerfile +++ b/executor/Dockerfile @@ -2,11 +2,13 @@ ARG REGISTRY=rg.nl-ams.scw.cloud/geodds-production ARG TAG=latest ARG SENTINEL_USERNAME=... ARG SENTINEL_PASSWORD=... +ARG GEODDS_UTILS_PAT FROM $REGISTRY/geolake-datastore:$TAG WORKDIR /app ENV SENTINEL_USERNAME=$SENTINEL_USERNAME ENV SENTINEL_PASSWORD=$SENTINEL_PASSWORD COPY requirements.txt /code/requirements.txt RUN pip install --no-cache-dir -r /code/requirements.txt +RUN pip install -force-reinstall git+https://opengeokube:${GEODDS_UTILS_PAT}@github.com/opengeokube/geodds-utils.git COPY app /app CMD [ "python", "main.py" ] diff --git a/executor/app/main.py b/executor/app/main.py index 35b90fe..fe8ab4a 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -95,14 +95,12 @@ def persist_datacube( else: var_names = list(kube.fields.keys()) if len(kube) == 1: - path = "_".join( - [ - var_names[0], - message.dataset_id, - message.product_id, - message.request_id, - ] - ) + path = "_".join([ + var_names[0], + message.dataset_id, + message.product_id, + message.request_id, + ]) else: path = "_".join( [message.dataset_id, message.product_id, message.request_id] @@ -139,7 +137,9 @@ def persist_dataset( def _get_attr_comb(dataframe_item, attrs): return "_".join([dataframe_item[attr_name] for attr_name in attrs]) - def _persist_single_datacube(dataframe_item, base_path, format, format_args=None): + def _persist_single_datacube( + dataframe_item, base_path, format, format_args=None + ): if not format_args: format_args = {} dcube = dataframe_item[dset.DATACUBE_COL] @@ -153,24 +153,20 @@ def _persist_single_datacube(dataframe_item, base_path, format, format_args=None attr_str = _get_attr_comb(dataframe_item, dset._Dataset__attrs) var_names = list(dcube.fields.keys()) if len(dcube) == 1: - path = "_".join( - [ - var_names[0], - message.dataset_id, - message.product_id, - attr_str, - message.request_id, - ] - ) + path = "_".join([ + var_names[0], + message.dataset_id, + message.product_id, + attr_str, + message.request_id, + ]) else: - path = "_".join( - [ - message.dataset_id, - message.product_id, - attr_str, - message.request_id, - ] - ) + path = "_".join([ + message.dataset_id, + message.product_id, + attr_str, + message.request_id, + ]) match format: case "netcdf": full_path = os.path.join(base_path, f"{path}.nc") @@ -194,7 +190,11 @@ def _persist_single_datacube(dataframe_item, base_path, format, format_args=None else: format = "netcdf" datacubes_paths = dset.data.apply( - _persist_single_datacube, base_path=base_path, format=format, format_args=format_args, axis=1 + _persist_single_datacube, + base_path=base_path, + format=format, + format_args=format_args, + axis=1, ) paths = datacubes_paths[~datacubes_paths.isna()] if len(paths) == 0: diff --git a/executor/app/messaging.py b/executor/app/messaging.py index 37ce25a..a30f9a1 100644 --- a/executor/app/messaging.py +++ b/executor/app/messaging.py @@ -1,17 +1,11 @@ -import os import logging -from enum import Enum + +from dbmanager.dbmanager import Request +from geodds_utils.queries import GeoQuery, TaskList from intake_geokube.queries.geoquery import GeoQuery from intake_geokube.queries.workflow import Workflow -MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"] - - -class MessageType(Enum): - QUERY = "query" - WORKFLOW = "workflow" - class Message: _LOG = logging.getLogger("geokube.Message") @@ -19,27 +13,11 @@ class Message: request_id: int dataset_id: str = "" product_id: str = "" - type: MessageType content: GeoQuery | Workflow - def __init__(self, load: bytes) -> None: - self.request_id, msg_type, *query = load.decode().split( - MESSAGE_SEPARATOR - ) - match MessageType(msg_type): - case MessageType.QUERY: - self._LOG.debug("processing content of `query` type") - assert len(query) == 3, "improper content for query message" - self.dataset_id, self.product_id, self.content = query - self.content: GeoQuery = GeoQuery.parse(self.content) - self.type = MessageType.QUERY - case MessageType.WORKFLOW: - self._LOG.debug("processing content of `workflow` type") - assert len(query) == 1, "improper content for workflow message" - self.content: Workflow = Workflow.parse(query[0]) - self.dataset_id = self.content.dataset_id - self.product_id = self.content.product_id - self.type = MessageType.WORKFLOW - case _: - self._LOG.error("type `%s` is not supported", msg_type) - raise ValueError(f"type `{msg_type}` is not supported!") + def __init__(self, request: Request) -> None: + self.request_id = request.request_id + self._LOG.debug("processing workflow content") + self.content: TaskList = TaskList.parse(request.query) + self.dataset_id = self.content.dataset_id + self.product_id = self.content.product_id diff --git a/executor/app/meta.py b/executor/app/meta.py index 739ef62..30c8f74 100644 --- a/executor/app/meta.py +++ b/executor/app/meta.py @@ -1,4 +1,5 @@ """Module with `LoggableMeta` metaclass""" + import os import logging diff --git a/executor/app/naming.py b/executor/app/naming.py new file mode 100644 index 0000000..d37536a --- /dev/null +++ b/executor/app/naming.py @@ -0,0 +1,178 @@ +import os +import datetime +from zipfile import ZipFile + +import numpy as np +from dask.delayed import Delayed +from geokube.core.datacube import DataCube +from geokube.core.dataset import Dataset + +from geodds_utils.queries import GeoQuery + +from messaging import Message + + +def get_file_name_for_climate_downscaled(kube: DataCube, message: Message): + query: GeoQuery = GeoQuery.parse(message.content) + is_time_range = False + if query.time: + is_time_range = "start" in query.time or "stop" in query.time + var_names = list(kube.fields.keys()) + if len(kube) == 1: + if is_time_range: + FILENAME_TEMPLATE = "{ncvar_name}_VHR-PRO_IT2km_CMCC-CM_{product_id}_CCLM5-0-9_1hr_{start_date}_{end_date}_{request_id}" + ncvar_name = kube.fields[var_names[0]].ncvar + return FILENAME_TEMPLATE.format( + product_id=message.product_id, + request_id=message.request_id, + ncvar_name=ncvar_name, + start_date=np.datetime_as_string( + kube.time.values[0], unit="D" + ), + end_date=np.datetime_as_string(kube.time.values[-1], unit="D"), + ) + else: + FILENAME_TEMPLATE = "{ncvar_name}_VHR-PRO_IT2km_CMCC-CM_{product_id}_CCLM5-0-9_1hr_{request_id}" + ncvar_name = kube.fields[var_names[0]].ncvar + return FILENAME_TEMPLATE.format( + product_id=message.product_id, + request_id=message.request_id, + ncvar_name=ncvar_name, + ) + else: + if is_time_range: + FILENAME_TEMPLATE = "VHR-PRO_IT2km_CMCC-CM_{product_id}_CCLM5-0-9_1hr_{start_date}_{end_date}_{request_id}" + return FILENAME_TEMPLATE.format( + product_id=message.product_id, + request_id=message.request_id, + start_date=np.datetime_as_string( + kube.time.values[0], unit="D" + ), + end_date=np.datetime_as_string(kube.time.values[-1], unit="D"), + ) + else: + FILENAME_TEMPLATE = ( + "VHR-PRO_IT2km_CMCC-CM_{product_id}_CCLM5-0-9_1hr_{request_id}" + ) + return FILENAME_TEMPLATE.format( + product_id=message.product_id, + request_id=message.request_id, + ) + + +def rcp85_filename_condition(kube: DataCube, message: Message) -> bool: + return ( + message.dataset_id == "climate-projections-rcp85-downscaled-over-italy" + ) + + +def get_history_message(): + return ( + f"Generated by CMCC DDS version 0.9.0 {str(datetime.datetime.now())}" + ) + + +def persist_datacube( + kube: DataCube, + message: Message, + base_path: str | os.PathLike, +) -> str | os.PathLike: + if rcp85_filename_condition(kube, message): + path = get_file_name_for_climate_downscaled(kube, message) + else: + var_names = list(kube.fields.keys()) + if len(kube) == 1: + path = "_".join([ + var_names[0], + message.dataset_id, + message.product_id, + str(message.request_id), + ]) + else: + path = "_".join([ + message.dataset_id, + message.product_id, + str(message.request_id), + ]) + kube._properties["history"] = get_history_message() + if isinstance(message.content, GeoQuery): + format = message.content.format + else: + format = "netcdf" + match format: + case "netcdf": + full_path = os.path.join(base_path, f"{path}.nc") + kube.to_netcdf(full_path) + case "geojson": + full_path = os.path.join(base_path, f"{path}.json") + kube.to_geojson(full_path) + case _: + raise ValueError(f"format `{format}` is not supported") + return full_path + + +def persist_dataset( + dset: Dataset, + message: Message, + base_path: str | os.PathLike, +): + def _get_attr_comb(dataframe_item, attrs): + return "_".join([dataframe_item[attr_name] for attr_name in attrs]) + + def _persist_single_datacube(dataframe_item, base_path, format): + dcube = dataframe_item[dset.DATACUBE_COL] + if isinstance(dcube, Delayed): + dcube = dcube.compute() + if len(dcube) == 0: + return None + for field in dcube.fields.values(): + if 0 in field.shape: + return None + attr_str = _get_attr_comb(dataframe_item, dset._Dataset__attrs) + var_names = list(dcube.fields.keys()) + if len(dcube) == 1: + path = "_".join([ + var_names[0], + message.dataset_id, + message.product_id, + attr_str, + str(message.request_id), + ]) + else: + path = "_".join([ + message.dataset_id, + message.product_id, + attr_str, + str(message.request_id), + ]) + match format: + case "netcdf": + full_path = os.path.join(base_path, f"{path}.nc") + dcube.to_netcdf(full_path) + case "geojson": + full_path = os.path.join(base_path, f"{path}.json") + dcube.to_geojson(full_path) + return full_path + + if isinstance(message.content, GeoQuery): + format = message.content.format + else: + format = "netcdf" + datacubes_paths = dset.data.apply( + _persist_single_datacube, base_path=base_path, format=format, axis=1 + ) + paths = datacubes_paths[~datacubes_paths.isna()] + if len(paths) == 0: + return None + elif len(paths) == 1: + return paths.iloc[0] + zip_name = "_".join( + [message.dataset_id, message.product_id, str(message.request_id)] + ) + path = os.path.join(base_path, f"{zip_name}.zip") + with ZipFile(path, "w") as archive: + for file in paths: + archive.write(file, arcname=os.path.basename(file)) + for file in paths: + os.remove(file) + return path