From 67ba3ab512b0de11f632a380b344ed1f7ba15552 Mon Sep 17 00:00:00 2001 From: QuCMGisaia Date: Tue, 7 Jan 2025 18:17:20 +0100 Subject: [PATCH] (wip)feat: unify data access --- extensions/aproc/proc/access/manager.py | 112 ++++++++++++++++++ extensions/aproc/proc/access/storages/file.py | 7 ++ extensions/aproc/proc/access/storages/gs.py | 40 +++++++ extensions/aproc/proc/access/storages/http.py | 10 ++ .../aproc/proc/access/storages/https.py | 10 ++ .../proc/download/drivers/download_driver.py | 3 +- .../proc/download/drivers/impl/image_file.py | 13 +- .../proc/download/drivers/impl/met_file.py | 6 +- .../aproc/proc/download/drivers/impl/utils.py | 1 - .../aproc/proc/enrich/enrich_process.py | 4 +- extensions/aproc/proc/s3_configuration.py | 56 ++------- 11 files changed, 200 insertions(+), 62 deletions(-) create mode 100644 extensions/aproc/proc/access/manager.py create mode 100644 extensions/aproc/proc/access/storages/file.py create mode 100644 extensions/aproc/proc/access/storages/gs.py create mode 100644 extensions/aproc/proc/access/storages/http.py create mode 100644 extensions/aproc/proc/access/storages/https.py diff --git a/extensions/aproc/proc/access/manager.py b/extensions/aproc/proc/access/manager.py new file mode 100644 index 0000000..0ed5d13 --- /dev/null +++ b/extensions/aproc/proc/access/manager.py @@ -0,0 +1,112 @@ +from pathlib import Path +from typing import Annotated, Union +from urllib.parse import urlparse + +from pydantic import BaseModel, Field + +from aproc.core.logger import Logger +from extensions.aproc.proc.access.storages.file import FileStorage +from extensions.aproc.proc.access.storages.gs import GoogleStorage +from extensions.aproc.proc.access.storages.http import HttpStorage +from extensions.aproc.proc.access.storages.https import HttpsStorage + +AnyStorage = Annotated[Union[FileStorage, GoogleStorage, HttpStorage, HttpsStorage], Field(discriminator="type")] + + +class AccessManager(BaseModel): + storage: AnyStorage | None = Field(None) + logger = Logger.logger + + def __resolve_storage(self, href: str) -> AnyStorage: + """ + Based on the defined storages (TODO), returns the one matching the input href + """ + storage_type = urlparse(href).scheme + netloc = urlparse(href).netloc + + if not storage_type or storage_type == "file": + if self.storage.type is None: + return self.storage + return FileStorage() + + if storage_type == "gs": + if self.storage.type == "gs" and netloc == self.storage.bucket: + return self.storage + return GoogleStorage(bucket=netloc) + + if storage_type == "http": + if self.storage.type == "http" and netloc == self.storage.domain: + return self.storage + return HttpStorage(domain=netloc) + + if storage_type == "https": + if self.storage.type == "https" and netloc == self.storage.domain: + return self.storage + return HttpsStorage(domain=netloc) + + raise NotImplementedError(f"Storage '{storage_type}' not compatible") + + def __get_storage_parameters(self, href: str): + storage = self.__resolve_storage(href) + + if storage.type == "gs": + from google.cloud.storage import Client + from google.oauth2 import service_account + + if storage.api_key is None: + self.logger.warning("No api_key is configured for this Google Storage. Using anonymous credentials") + client = Client.create_anonymous_client() + else: + credentials = service_account.Credentials.from_service_account_info(storage.api_key) + client = Client("APROC", credentials=credentials) + + return {"client": client} + + if storage.type == "http" or storage.type == "https": + return {"headers": storage.headers} + + return {} + + def pull(self): + """ + Pulls a file from a storage to write it in the local storage. + If the input storage is local, then it is a copy. Otherwise it is a download. + """ + ... + + # Will return a yield + def stream(self): + """ + Reads the content of a file in a storage without downloading it. + """ + ... + + def exists(self, href: str) -> bool: + """ + Whether the file exists + """ + storage = self.__resolve_storage(href) + + if storage.type is None: + return Path(href).exists() + + if storage.type == "gs": + from google.cloud.storage import Client + + client: Client = self.__get_storage_parameters(href)["client"] + bucket = client.get_bucket(storage.bucket) + return bucket.blob(href).exists() + + if storage.type == "https" or storage.type == "http": + import requests + + r = requests.head(href, headers=self.__get_storage_parameters(href), verify=False) + return r.status_code < 400 + + raise ValueError("Href matches no storage that is configured") + + def is_download_required(self, href: str): + return self.storage.type == "https" \ + and urlparse(href).scheme == "https" \ + and urlparse(href).netloc == self.storage.domain \ + and self.storage.force_download diff --git a/extensions/aproc/proc/access/storages/file.py b/extensions/aproc/proc/access/storages/file.py new file mode 100644 index 0000000..b67f324 --- /dev/null +++ b/extensions/aproc/proc/access/storages/file.py @@ -0,0 +1,7 @@ +from typing import Literal + +from pydantic import BaseModel + + +class FileStorage(BaseModel): + type: Literal[None] = None diff --git a/extensions/aproc/proc/access/storages/gs.py b/extensions/aproc/proc/access/storages/gs.py new file mode 100644 index 0000000..061702b --- /dev/null +++ b/extensions/aproc/proc/access/storages/gs.py @@ -0,0 +1,40 @@ + + +import enum +from typing import Literal + +from pydantic import BaseModel, Field, computed_field + + +class GoogleStorageConstants(str, enum.Enum): + AUTH_URI = "https://accounts.google.com/o/oauth2/auth" + TOKEN_URI = "https://oauth2.googleapis.com/token" + AUTH_PROVIDER_CERT_URL = "https://www.googleapis.com/oauth2/v1/certs" + + +class GoogleStorageApiKey(BaseModel): + type: Literal["service_account"] = "service_account" + project_id: str + private_key_id: str + private_key: str + client_id: str | None = Field(None) + auth_uri: Literal[GoogleStorageConstants.AUTH_URI] = GoogleStorageConstants.AUTH_URI.value + token_uri: Literal[GoogleStorageConstants.TOKEN_URI] = GoogleStorageConstants.TOKEN_URI.value + auth_provider_x509_cert_url: Literal[GoogleStorageConstants.AUTH_PROVIDER_CERT_URL] = GoogleStorageConstants.AUTH_PROVIDER_CERT_URL.value + universe_domain: Literal["googleapis.com"] = "googleapis.com" + + @computed_field + @property + def client_x509_cert_url(self) -> str: + return f"https://www.googleapis.com/robot/v1/metadata/x509/{self.project_id}%40appspot.gserviceaccount.com" + + @computed_field + @property + def client_email(self) -> str: + return f"{self.project_id}@appspot.gserviceaccount.com" + + +class GoogleStorage(BaseModel): + type: Literal["gs"] = "gs" + bucket: str + api_key: GoogleStorageApiKey | None diff --git a/extensions/aproc/proc/access/storages/http.py b/extensions/aproc/proc/access/storages/http.py new file mode 100644 index 0000000..5130224 --- /dev/null +++ b/extensions/aproc/proc/access/storages/http.py @@ -0,0 +1,10 @@ +from typing import Literal + +from pydantic import BaseModel, Field + + +class HttpStorage(BaseModel): + type: Literal["http"] = "http" + headers: dict[str, str] + domain: str + force_download: bool = Field(default=False) diff --git a/extensions/aproc/proc/access/storages/https.py b/extensions/aproc/proc/access/storages/https.py new file mode 100644 index 0000000..d377540 --- /dev/null +++ b/extensions/aproc/proc/access/storages/https.py @@ -0,0 +1,10 @@ +from typing import Literal + +from pydantic import BaseModel, Field + + +class HttpsStorage(BaseModel): + type: Literal["https"] = "https" + headers: dict[str, str] + domain: str + force_download: bool = Field(default=False) diff --git a/extensions/aproc/proc/download/drivers/download_driver.py b/extensions/aproc/proc/download/drivers/download_driver.py index 948c42b..69f8214 100644 --- a/extensions/aproc/proc/download/drivers/download_driver.py +++ b/extensions/aproc/proc/download/drivers/download_driver.py @@ -7,11 +7,12 @@ class DownloadDriver(AbstractDriver): """ Driver for exporting files for download """ - alternative_asset_href_field: str = None + alternative_asset_href_field: str | None = None def __init__(self): super().__init__() + @staticmethod def init(configuration: dict) -> None: if configuration: DownloadDriver.alternative_asset_href_field = configuration.get("alternative_asset_href_field") diff --git a/extensions/aproc/proc/download/drivers/impl/image_file.py b/extensions/aproc/proc/download/drivers/impl/image_file.py index dd3b443..ca76935 100644 --- a/extensions/aproc/proc/download/drivers/impl/image_file.py +++ b/extensions/aproc/proc/download/drivers/impl/image_file.py @@ -12,6 +12,7 @@ def __init__(self): super().__init__() # Implements drivers method + @staticmethod def init(configuration: dict): DownloadDriver.init(configuration) @@ -33,8 +34,8 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, target_format: str, raw_archive: bool): href = self.get_asset_href(item) if raw_archive: - if item.properties.item_format and ( - item.properties.item_format == ItemFormat.geotiff.value or item.properties.item_format == ItemFormat.jpeg2000.value): + if item.properties.item_format and \ + item.properties.item_format in [ItemFormat.geotiff.value, ItemFormat.jpeg2000.value]: self.LOGGER.debug("Copy {} in {}".format(href, target_directory)) shutil.copy(href, target_directory) if item.assets and item.assets.get(Role.extent.value) and Path( @@ -50,16 +51,13 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, return # Default driver is GTiff driver_target = "GTiff" + extension = '.tif' if (not target_format) or (target_format == 'native'): if item.properties.main_asset_format == AssetFormat.jpg2000.value: driver_target = "JP2OpenJPEG" - else: - driver_target = "GTiff" + extension = '.JP2' elif target_format == "Jpeg2000" or target_format == AssetFormat.jpg2000.value: driver_target = "JP2OpenJPEG" - - extension = '.tif' - if driver_target == "JP2OpenJPEG": extension = '.JP2' if ((not target_projection or target_projection == 'native') and ( @@ -74,6 +72,7 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, return if target_projection == 'native': target_projection = item.properties.proj__epsg + # Some transformation to be done ... from extensions.aproc.proc.download.drivers.impl.utils import extract target_file_name = Path(Path(href).stem).with_suffix(extension) diff --git a/extensions/aproc/proc/download/drivers/impl/met_file.py b/extensions/aproc/proc/download/drivers/impl/met_file.py index 3ab9e85..bddcc18 100644 --- a/extensions/aproc/proc/download/drivers/impl/met_file.py +++ b/extensions/aproc/proc/download/drivers/impl/met_file.py @@ -62,7 +62,7 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, images = list(map(lambda f: [f[0], os.path.splitext(f[1])[0] + extension], self.get_terrasarx_images(met_file, extension))) extract(images, crop_wkt, met_file, driver_target, target_projection, target_directory, target_file_name) - def get_dimap_images(self, href: str, extension: str): + def get_dimap_images(self, href: str, extension: str) -> list[tuple[str, str, str, str]]: dir_name = os.path.dirname(href) tree = ET.parse(href) root = tree.getroot() @@ -76,7 +76,7 @@ def get_dimap_images(self, href: str, extension: str): os.path.splitext(f.attrib["href"])[0] + georef_file_extension], files_elements)) return files - def get_terrasarx_images(self, href: str, extension: str): + def get_terrasarx_images(self, href: str, extension: str) -> list[tuple[str, str, str, str]]: dir_name = os.path.dirname(href) tree = ET.parse(href) root = tree.getroot() @@ -100,7 +100,7 @@ def copy_from_terrasarx(self, href: str, target_directory: str, extension: str): files = self.get_terrasarx_images(href, extension) self.copy_from_met(files, target_directory) - def copy_from_met(self, files, target_directory): + def copy_from_met(self, files: list[tuple[str, str, str, str]], target_directory: str): for f in files: valid_and_exist = os.path.isfile(f[0]) and os.path.exists(f[0]) if valid_and_exist: diff --git a/extensions/aproc/proc/download/drivers/impl/utils.py b/extensions/aproc/proc/download/drivers/impl/utils.py index d456c79..b912c6c 100644 --- a/extensions/aproc/proc/download/drivers/impl/utils.py +++ b/extensions/aproc/proc/download/drivers/impl/utils.py @@ -113,7 +113,6 @@ def make_raw_archive_zip(href: str, target_directory: str): dir_name = os.path.dirname(href) target_file_name = os.path.splitext(file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S") shutil.make_archive(target_directory + "/" + target_file_name, 'zip', dir_name) - return def writeWorldWidefrom_transform(affine, input): diff --git a/extensions/aproc/proc/enrich/enrich_process.py b/extensions/aproc/proc/enrich/enrich_process.py index e0ed86e..986d131 100644 --- a/extensions/aproc/proc/enrich/enrich_process.py +++ b/extensions/aproc/proc/enrich/enrich_process.py @@ -80,8 +80,9 @@ def get_process_summary() -> ProcessSummary: def before_execute(headers: dict[str, str], requests: list[dict[str, str]], asset_type: str, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict[str, str]: return {} + @staticmethod def get_resource_id(inputs: BaseModel): - inputs: InputEnrichProcess = InputEnrichProcess(**inputs.model_dump()) + inputs: InputEnrichProcess = InputEnrichProcess(**inputs.model_dump()) hash_object = hashlib.sha1("/".join(list(map(lambda r: r["collection"] + r["item_id"], inputs.requests))).encode()) return hash_object.hexdigest() @@ -141,6 +142,7 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset raise DriverException(error_msg) return OutputEnrichProcess(item_locations=item_locations).model_dump() + @staticmethod def __get_item_from_airs__(collection: str, item_id: str): try: r = requests.get(url=os.path.join(AprocConfiguration.settings.airs_endpoint, "collections", collection, "items", item_id)) diff --git a/extensions/aproc/proc/s3_configuration.py b/extensions/aproc/proc/s3_configuration.py index bfb3aa4..eae879b 100644 --- a/extensions/aproc/proc/s3_configuration.py +++ b/extensions/aproc/proc/s3_configuration.py @@ -1,64 +1,22 @@ -import enum import json import logging import os import tempfile -from typing import Annotated, Literal, Union +from typing import Annotated, Union from urllib.parse import urlparse -from pydantic import BaseModel, Field, computed_field +from pydantic import BaseModel, Field +from extensions.aproc.proc.access.storages.file import FileStorage +from extensions.aproc.proc.access.storages.gs import GoogleStorage +from extensions.aproc.proc.access.storages.https import HttpsStorage -class GoogleStorageConstants(str, enum.Enum): - AUTH_URI = "https://accounts.google.com/o/oauth2/auth" - TOKEN_URI = "https://oauth2.googleapis.com/token" - AUTH_PROVIDER_CERT_URL = "https://www.googleapis.com/oauth2/v1/certs" - -class GoogleStorageApiKey(BaseModel): - type: Literal["service_account"] = "service_account" - project_id: str - private_key_id: str - private_key: str - client_id: str | None = Field(None) - auth_uri: Literal[GoogleStorageConstants.AUTH_URI] = GoogleStorageConstants.AUTH_URI.value - token_uri: Literal[GoogleStorageConstants.TOKEN_URI] = GoogleStorageConstants.TOKEN_URI.value - auth_provider_x509_cert_url: Literal[GoogleStorageConstants.AUTH_PROVIDER_CERT_URL] = GoogleStorageConstants.AUTH_PROVIDER_CERT_URL.value - universe_domain: Literal["googleapis.com"] = "googleapis.com" - - @computed_field - @property - def client_x509_cert_url(self) -> str: - return f"https://www.googleapis.com/robot/v1/metadata/x509/{self.project_id}%40appspot.gserviceaccount.com" - - @computed_field - @property - def client_email(self) -> str: - return f"{self.project_id}@appspot.gserviceaccount.com" - - -class GoogleStorage(BaseModel): - type: Literal["gs"] = "gs" - bucket: str - api_key: GoogleStorageApiKey - - -class HttpsStorage(BaseModel): - type: Literal["https"] = "https" - headers: dict[str, str] - domain: str - force_download: bool = Field(False) - - -class NoStorage(BaseModel): - type: Literal[None] = None - - -Storage = Annotated[Union[GoogleStorage, HttpsStorage, NoStorage], Field(discriminator="type")] +AnyStorage = Annotated[Union[FileStorage, HttpsStorage, GoogleStorage], Field(discriminator="type")] class S3Configuration(BaseModel): - input: Storage | None = Field(None) + input: AnyStorage | None = Field(None) def get_storage_parameters(self, href: str, LOGGER: logging.Logger) -> dict: storage_type = urlparse(href).scheme