Skip to content

Commit

Permalink
(wip)feat: unify data access
Browse files Browse the repository at this point in the history
  • Loading branch information
QuCMGisaia committed Jan 7, 2025
1 parent 006c815 commit 67ba3ab
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 62 deletions.
112 changes: 112 additions & 0 deletions extensions/aproc/proc/access/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from pathlib import Path
from typing import Annotated, Union
from urllib.parse import urlparse

from pydantic import BaseModel, Field

from aproc.core.logger import Logger
from extensions.aproc.proc.access.storages.file import FileStorage
from extensions.aproc.proc.access.storages.gs import GoogleStorage
from extensions.aproc.proc.access.storages.http import HttpStorage
from extensions.aproc.proc.access.storages.https import HttpsStorage

AnyStorage = Annotated[Union[FileStorage, GoogleStorage, HttpStorage, HttpsStorage], Field(discriminator="type")]


class AccessManager(BaseModel):
storage: AnyStorage | None = Field(None)
logger = Logger.logger

def __resolve_storage(self, href: str) -> AnyStorage:
"""
Based on the defined storages (TODO), returns the one matching the input href
"""
storage_type = urlparse(href).scheme
netloc = urlparse(href).netloc

if not storage_type or storage_type == "file":
if self.storage.type is None:
return self.storage
return FileStorage()

if storage_type == "gs":
if self.storage.type == "gs" and netloc == self.storage.bucket:
return self.storage
return GoogleStorage(bucket=netloc)

if storage_type == "http":
if self.storage.type == "http" and netloc == self.storage.domain:
return self.storage
return HttpStorage(domain=netloc)

if storage_type == "https":
if self.storage.type == "https" and netloc == self.storage.domain:
return self.storage
return HttpsStorage(domain=netloc)

raise NotImplementedError(f"Storage '{storage_type}' not compatible")

def __get_storage_parameters(self, href: str):
storage = self.__resolve_storage(href)

if storage.type == "gs":
from google.cloud.storage import Client
from google.oauth2 import service_account

if storage.api_key is None:
self.logger.warning("No api_key is configured for this Google Storage. Using anonymous credentials")
client = Client.create_anonymous_client()
else:
credentials = service_account.Credentials.from_service_account_info(storage.api_key)
client = Client("APROC", credentials=credentials)

return {"client": client}

if storage.type == "http" or storage.type == "https":
return {"headers": storage.headers}

return {}

def pull(self):
"""
Pulls a file from a storage to write it in the local storage.
If the input storage is local, then it is a copy. Otherwise it is a download.
"""
...

# Will return a yield
def stream(self):
"""
Reads the content of a file in a storage without downloading it.
"""
...

def exists(self, href: str) -> bool:
"""
Whether the file exists
"""
storage = self.__resolve_storage(href)

if storage.type is None:
return Path(href).exists()

if storage.type == "gs":
from google.cloud.storage import Client

client: Client = self.__get_storage_parameters(href)["client"]
bucket = client.get_bucket(storage.bucket)
return bucket.blob(href).exists()

if storage.type == "https" or storage.type == "http":
import requests

r = requests.head(href, headers=self.__get_storage_parameters(href), verify=False)
return r.status_code < 400

raise ValueError("Href matches no storage that is configured")

def is_download_required(self, href: str):
return self.storage.type == "https" \
and urlparse(href).scheme == "https" \
and urlparse(href).netloc == self.storage.domain \
and self.storage.force_download
7 changes: 7 additions & 0 deletions extensions/aproc/proc/access/storages/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import Literal

from pydantic import BaseModel


class FileStorage(BaseModel):
type: Literal[None] = None
40 changes: 40 additions & 0 deletions extensions/aproc/proc/access/storages/gs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@


import enum
from typing import Literal

from pydantic import BaseModel, Field, computed_field


class GoogleStorageConstants(str, enum.Enum):
AUTH_URI = "https://accounts.google.com/o/oauth2/auth"
TOKEN_URI = "https://oauth2.googleapis.com/token"
AUTH_PROVIDER_CERT_URL = "https://www.googleapis.com/oauth2/v1/certs"


class GoogleStorageApiKey(BaseModel):
type: Literal["service_account"] = "service_account"
project_id: str
private_key_id: str
private_key: str
client_id: str | None = Field(None)
auth_uri: Literal[GoogleStorageConstants.AUTH_URI] = GoogleStorageConstants.AUTH_URI.value
token_uri: Literal[GoogleStorageConstants.TOKEN_URI] = GoogleStorageConstants.TOKEN_URI.value
auth_provider_x509_cert_url: Literal[GoogleStorageConstants.AUTH_PROVIDER_CERT_URL] = GoogleStorageConstants.AUTH_PROVIDER_CERT_URL.value
universe_domain: Literal["googleapis.com"] = "googleapis.com"

@computed_field
@property
def client_x509_cert_url(self) -> str:
return f"https://www.googleapis.com/robot/v1/metadata/x509/{self.project_id}%40appspot.gserviceaccount.com"

@computed_field
@property
def client_email(self) -> str:
return f"{self.project_id}@appspot.gserviceaccount.com"


class GoogleStorage(BaseModel):
type: Literal["gs"] = "gs"
bucket: str
api_key: GoogleStorageApiKey | None
10 changes: 10 additions & 0 deletions extensions/aproc/proc/access/storages/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Literal

from pydantic import BaseModel, Field


class HttpStorage(BaseModel):
type: Literal["http"] = "http"
headers: dict[str, str]
domain: str
force_download: bool = Field(default=False)
10 changes: 10 additions & 0 deletions extensions/aproc/proc/access/storages/https.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Literal

from pydantic import BaseModel, Field


class HttpsStorage(BaseModel):
type: Literal["https"] = "https"
headers: dict[str, str]
domain: str
force_download: bool = Field(default=False)
3 changes: 2 additions & 1 deletion extensions/aproc/proc/download/drivers/download_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
class DownloadDriver(AbstractDriver):
""" Driver for exporting files for download
"""
alternative_asset_href_field: str = None
alternative_asset_href_field: str | None = None

def __init__(self):
super().__init__()

@staticmethod
def init(configuration: dict) -> None:
if configuration:
DownloadDriver.alternative_asset_href_field = configuration.get("alternative_asset_href_field")
Expand Down
13 changes: 6 additions & 7 deletions extensions/aproc/proc/download/drivers/impl/image_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def __init__(self):
super().__init__()

# Implements drivers method
@staticmethod
def init(configuration: dict):
DownloadDriver.init(configuration)

Expand All @@ -33,8 +34,8 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str,
target_format: str, raw_archive: bool):
href = self.get_asset_href(item)
if raw_archive:
if item.properties.item_format and (
item.properties.item_format == ItemFormat.geotiff.value or item.properties.item_format == ItemFormat.jpeg2000.value):
if item.properties.item_format and \
item.properties.item_format in [ItemFormat.geotiff.value, ItemFormat.jpeg2000.value]:
self.LOGGER.debug("Copy {} in {}".format(href, target_directory))
shutil.copy(href, target_directory)
if item.assets and item.assets.get(Role.extent.value) and Path(
Expand All @@ -50,16 +51,13 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str,
return
# Default driver is GTiff
driver_target = "GTiff"
extension = '.tif'
if (not target_format) or (target_format == 'native'):
if item.properties.main_asset_format == AssetFormat.jpg2000.value:
driver_target = "JP2OpenJPEG"
else:
driver_target = "GTiff"
extension = '.JP2'
elif target_format == "Jpeg2000" or target_format == AssetFormat.jpg2000.value:
driver_target = "JP2OpenJPEG"

extension = '.tif'
if driver_target == "JP2OpenJPEG":
extension = '.JP2'

if ((not target_projection or target_projection == 'native') and (
Expand All @@ -74,6 +72,7 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str,
return
if target_projection == 'native':
target_projection = item.properties.proj__epsg

# Some transformation to be done ...
from extensions.aproc.proc.download.drivers.impl.utils import extract
target_file_name = Path(Path(href).stem).with_suffix(extension)
Expand Down
6 changes: 3 additions & 3 deletions extensions/aproc/proc/download/drivers/impl/met_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str,
images = list(map(lambda f: [f[0], os.path.splitext(f[1])[0] + extension], self.get_terrasarx_images(met_file, extension)))
extract(images, crop_wkt, met_file, driver_target, target_projection, target_directory, target_file_name)

def get_dimap_images(self, href: str, extension: str):
def get_dimap_images(self, href: str, extension: str) -> list[tuple[str, str, str, str]]:
dir_name = os.path.dirname(href)
tree = ET.parse(href)
root = tree.getroot()
Expand All @@ -76,7 +76,7 @@ def get_dimap_images(self, href: str, extension: str):
os.path.splitext(f.attrib["href"])[0] + georef_file_extension], files_elements))
return files

def get_terrasarx_images(self, href: str, extension: str):
def get_terrasarx_images(self, href: str, extension: str) -> list[tuple[str, str, str, str]]:
dir_name = os.path.dirname(href)
tree = ET.parse(href)
root = tree.getroot()
Expand All @@ -100,7 +100,7 @@ def copy_from_terrasarx(self, href: str, target_directory: str, extension: str):
files = self.get_terrasarx_images(href, extension)
self.copy_from_met(files, target_directory)

def copy_from_met(self, files, target_directory):
def copy_from_met(self, files: list[tuple[str, str, str, str]], target_directory: str):
for f in files:
valid_and_exist = os.path.isfile(f[0]) and os.path.exists(f[0])
if valid_and_exist:
Expand Down
1 change: 0 additions & 1 deletion extensions/aproc/proc/download/drivers/impl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ def make_raw_archive_zip(href: str, target_directory: str):
dir_name = os.path.dirname(href)
target_file_name = os.path.splitext(file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")
shutil.make_archive(target_directory + "/" + target_file_name, 'zip', dir_name)
return


def writeWorldWidefrom_transform(affine, input):
Expand Down
4 changes: 3 additions & 1 deletion extensions/aproc/proc/enrich/enrich_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ def get_process_summary() -> ProcessSummary:
def before_execute(headers: dict[str, str], requests: list[dict[str, str]], asset_type: str, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict[str, str]:
return {}

@staticmethod
def get_resource_id(inputs: BaseModel):
inputs: InputEnrichProcess = InputEnrichProcess(**inputs.model_dump())
inputs: InputEnrichProcess = InputEnrichProcess(**inputs.model_dump())
hash_object = hashlib.sha1("/".join(list(map(lambda r: r["collection"] + r["item_id"], inputs.requests))).encode())
return hash_object.hexdigest()

Expand Down Expand Up @@ -141,6 +142,7 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset
raise DriverException(error_msg)
return OutputEnrichProcess(item_locations=item_locations).model_dump()

@staticmethod
def __get_item_from_airs__(collection: str, item_id: str):
try:
r = requests.get(url=os.path.join(AprocConfiguration.settings.airs_endpoint, "collections", collection, "items", item_id))
Expand Down
56 changes: 7 additions & 49 deletions extensions/aproc/proc/s3_configuration.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,22 @@
import enum
import json
import logging
import os
import tempfile
from typing import Annotated, Literal, Union
from typing import Annotated, Union
from urllib.parse import urlparse

from pydantic import BaseModel, Field, computed_field
from pydantic import BaseModel, Field

from extensions.aproc.proc.access.storages.file import FileStorage
from extensions.aproc.proc.access.storages.gs import GoogleStorage
from extensions.aproc.proc.access.storages.https import HttpsStorage

class GoogleStorageConstants(str, enum.Enum):
AUTH_URI = "https://accounts.google.com/o/oauth2/auth"
TOKEN_URI = "https://oauth2.googleapis.com/token"
AUTH_PROVIDER_CERT_URL = "https://www.googleapis.com/oauth2/v1/certs"


class GoogleStorageApiKey(BaseModel):
type: Literal["service_account"] = "service_account"
project_id: str
private_key_id: str
private_key: str
client_id: str | None = Field(None)
auth_uri: Literal[GoogleStorageConstants.AUTH_URI] = GoogleStorageConstants.AUTH_URI.value
token_uri: Literal[GoogleStorageConstants.TOKEN_URI] = GoogleStorageConstants.TOKEN_URI.value
auth_provider_x509_cert_url: Literal[GoogleStorageConstants.AUTH_PROVIDER_CERT_URL] = GoogleStorageConstants.AUTH_PROVIDER_CERT_URL.value
universe_domain: Literal["googleapis.com"] = "googleapis.com"

@computed_field
@property
def client_x509_cert_url(self) -> str:
return f"https://www.googleapis.com/robot/v1/metadata/x509/{self.project_id}%40appspot.gserviceaccount.com"

@computed_field
@property
def client_email(self) -> str:
return f"{self.project_id}@appspot.gserviceaccount.com"


class GoogleStorage(BaseModel):
type: Literal["gs"] = "gs"
bucket: str
api_key: GoogleStorageApiKey


class HttpsStorage(BaseModel):
type: Literal["https"] = "https"
headers: dict[str, str]
domain: str
force_download: bool = Field(False)


class NoStorage(BaseModel):
type: Literal[None] = None


Storage = Annotated[Union[GoogleStorage, HttpsStorage, NoStorage], Field(discriminator="type")]
AnyStorage = Annotated[Union[FileStorage, HttpsStorage, GoogleStorage], Field(discriminator="type")]


class S3Configuration(BaseModel):
input: Storage | None = Field(None)
input: AnyStorage | None = Field(None)

def get_storage_parameters(self, href: str, LOGGER: logging.Logger) -> dict:
storage_type = urlparse(href).scheme
Expand Down

0 comments on commit 67ba3ab

Please sign in to comment.