Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/disabledrivers #223

Merged
merged 3 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions aproc/core/processes/process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from abc import ABC, abstractmethod

from celery import Task
from pydantic import BaseModel

from aproc.core.models.ogc import ProcessDescription, ProcessSummary
Expand All @@ -21,6 +22,13 @@ def set_logger(cls, logger) -> None:
"""
cls.LOGGER = logger

@staticmethod
def update_task_status(LOGGER: logging.Logger, task: Task, state: str, meta: dict = {}):
if task.request.id is not None:
task.update_state(state=state, meta=meta)
else:
LOGGER.debug(task.name + " " + state + " " + str(meta))

@staticmethod
@abstractmethod
def init(configuration: dict):
Expand Down
1 change: 1 addition & 0 deletions aproc/service/ogc_processes_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def get_process_description(process_id: str):
"model": RESTException
}
})

def post_process_execute(process_id: str, execute: Execute, request: Request):
process = __get_process(process_id)
try:
Expand Down
19 changes: 8 additions & 11 deletions extensions/aproc/proc/download/download_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from extensions.aproc.proc.drivers.driver_manager import DriverManager
from extensions.aproc.proc.drivers.exceptions import (DriverException,
RegisterException)
from extensions.aproc.proc.processes.process_model import InputProcess
from extensions.aproc.proc.variables import (ARLAS_COLLECTION_KEY,
ARLAS_ITEM_ID_KEY,
DOWNLOAD_FAILED_MSG, EVENT_ACTION,
Expand All @@ -39,13 +40,7 @@
LOGGER = Logger.logger


def __update_status__(task: Task, state: str, meta: dict = None):
LOGGER.info(task.name + " " + state + " " + str(meta))
if task.request.id is not None:
task.update_state(state=state, meta=meta)


class InputDownloadProcess(BaseModel):
class InputDownloadProcess(InputProcess):
requests: list[dict[str, str]] = Field(default=[], title="The list of item (collection, item_id) to download")
crop_wkt: str = Field(default=None, title="WKT geometry for cropping the data")
target_projection: str = Field(default=None, title="epsg target projection")
Expand Down Expand Up @@ -88,6 +83,8 @@ def init(configuration: dict):
raise DriverException("Invalid configuration for download drivers ({})".format(configuration))
AprocProcess.input_model = InputDownloadProcess
Notifications.init()
description.inputs.get("include_drivers").schema_.items.enum = DriverManager.driver_names(summary.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you have to access to the properties this way, and not by just doing description.inputs.include_drivers

description.inputs.get("exclude_drivers").schema_.items.enum = DriverManager.driver_names(summary.id)

@staticmethod
def get_process_description() -> ProcessDescription:
Expand All @@ -98,7 +95,7 @@ def get_process_summary() -> ProcessSummary:
return summary

@staticmethod
def before_execute(headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True) -> dict[str, str]:
def before_execute(headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict[str, str]:
(send_to, user_id) = AprocProcess.__get_user_email__(headers.get("authorization"))
for request in requests:
collection: str = request.get("collection")
Expand Down Expand Up @@ -170,7 +167,7 @@ def get_resource_id(inputs: BaseModel):
return hash_object.hexdigest()

@shared_task(bind=True, track_started=True)
def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True) -> dict:
def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict:
(send_to, user_id) = AprocProcess.__get_user_email__(headers.get("authorization"))
LOGGER.debug("processing download requests from {}".format(send_to))
download_locations = []
Expand All @@ -196,11 +193,11 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_
Notifications.report(None, DownloadConfiguration.settings.email_subject_error_download, DownloadConfiguration.settings.email_content_error_download, DownloadConfiguration.settings.notification_admin_emails.split(","), context=mail_context, outcome="failure")
raise RegisterException(error_msg)

driver: DownloadDriver = DriverManager.solve(summary.id, item)
driver: DownloadDriver = DriverManager.solve(summary.id, item, include_drivers=include_drivers, exclude_drivers=exclude_drivers)
if driver is not None:
try:
LOGGER.info("Download will be done by {}".format(driver.name))
__update_status__(self, state='PROGRESS', meta={"ACTION": "DOWNLOAD", "TARGET": item_id})
Process.update_task_status(LOGGER, self, state='PROGRESS', meta={"ACTION": "DOWNLOAD", "TARGET": item_id})
target_directory, relative_target_directory = AprocProcess.__get_download_location__(item, send_to)
LOGGER.info("Download will be placed in {}".format(target_directory))
mail_context["target_directory"] = target_directory
Expand Down
17 changes: 14 additions & 3 deletions extensions/aproc/proc/drivers/driver_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,24 @@ def init(process: str, drivers: list[DriverConfiguration]):
LOGGER.info("{}: {}".format(driver.priority, driver.name))

@staticmethod
def solve(process: str, ressource) -> AbstractDriver:
def driver_names(process: str) -> list[str]:
return list(map(lambda p: p.name, DriverManager.drivers[process]))

@staticmethod
def solve(process: str, resource, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> AbstractDriver:
DriverManager.__check_drivers(process)
for driver_class in DriverManager.drivers.get(process, []):
drivers = DriverManager.drivers.get(process, [])
if include_drivers and len(include_drivers) > 0:
LOGGER.debug("keep only {}".format(include_drivers))
drivers = list(filter(lambda driver_class: driver_class.name in include_drivers, drivers))
if exclude_drivers and len(exclude_drivers) > 0:
LOGGER.debug("exclude {}".format(exclude_drivers))
drivers = list(filter(lambda driver_class: driver_class.name not in exclude_drivers, drivers))
for driver_class in drivers:
try:
LOGGER.debug("Test driver {}".format(driver_class.name))
driver: AbstractDriver = driver_class()
if driver.supports(ressource) is True:
if driver.supports(resource) is True:
return driver
except Exception as e:
LOGGER.exception(e)
Expand Down
23 changes: 10 additions & 13 deletions extensions/aproc/proc/enrich/enrich_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from aproc.core.settings import Configuration as AprocConfiguration
from aproc.core.utils import base_model2description
from extensions.aproc.proc.drivers.driver_manager import DriverManager
from extensions.aproc.proc.processes.process_model import InputProcess
from extensions.aproc.proc.variables import EVENT_KIND_KEY, EVENT_CATEGORY_KEY, EVENT_REASON, EVENT_TYPE_KEY, USER_ACTION_KEY, EVENT_ACTION, EVENT_OUTCOME_KEY, EVENT_MODULE_KEY, ARLAS_COLLECTION_KEY, ARLAS_ITEM_ID_KEY, ENRICHMENT_FAILED_MSG
from extensions.aproc.proc.drivers.exceptions import DriverException
from extensions.aproc.proc.enrich.drivers.enrich_driver import EnrichDriver
Expand All @@ -25,13 +26,7 @@
LOGGER = Logger.logger


def __update_status__(task: Task, state: str, meta: dict = None):
LOGGER.info(task.name + " " + state + " " + str(meta))
if task.request.id is not None:
task.update_state(state=state, meta=meta)


class InputEnrichProcess(BaseModel):
class InputEnrichProcess(InputProcess):
requests: list[dict[str, str]] = Field(default=[], title="The list of items (collection, item_id) to enrich")
asset_type: str = Field(default=None, title="Name of the asset type to add (e.g. cog)")

Expand Down Expand Up @@ -70,6 +65,8 @@ def init(configuration: dict):
else:
raise DriverException("Invalid configuration for enrich drivers ({})".format(configuration))
AprocProcess.input_model = InputEnrichProcess
description.inputs.get("include_drivers").schema_.items.enum = DriverManager.driver_names(summary.id)
description.inputs.get("exclude_drivers").schema_.items.enum = DriverManager.driver_names(summary.id)

@staticmethod
def get_process_description() -> ProcessDescription:
Expand All @@ -80,7 +77,7 @@ def get_process_summary() -> ProcessSummary:
return summary

@staticmethod
def before_execute(headers: dict[str, str], requests: list[dict[str, str]], asset_type: str) -> dict[str, str]:
def before_execute(headers: dict[str, str], requests: list[dict[str, str]], asset_type: str, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict[str, str]:
return {}

def get_resource_id(inputs: BaseModel):
Expand All @@ -89,7 +86,7 @@ def get_resource_id(inputs: BaseModel):
return hash_object.hexdigest()

@shared_task(bind=True, track_started=True)
def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset_type: str) -> dict:
def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset_type: str, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict:
item_locations = []
for request in requests:
collection: str = request.get("collection")
Expand All @@ -100,11 +97,11 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset
LOGGER.error(error_msg)
LOGGER.info(ENRICHMENT_FAILED_MSG, extra={EVENT_KIND_KEY: "event", EVENT_CATEGORY_KEY: "file", EVENT_TYPE_KEY: USER_ACTION_KEY, EVENT_ACTION: "enrich", EVENT_OUTCOME_KEY: "failure", EVENT_REASON: error_msg, EVENT_MODULE_KEY: "aproc-enrich", ARLAS_COLLECTION_KEY: collection, ARLAS_ITEM_ID_KEY: item_id})
raise DriverException(error_msg)
driver: EnrichDriver = DriverManager.solve(summary.id, item)
driver: EnrichDriver = DriverManager.solve(summary.id, item, include_drivers=include_drivers, exclude_drivers=exclude_drivers)
if driver is not None:
try:
LOGGER.info("ingestion: 1 - enrichment will be done by {}".format(driver.name))
__update_status__(self, state='PROGRESS', meta={"ACTION": "ENRICH", "TARGET": item_id})
Process.update_task_status(LOGGER, self, state='PROGRESS', meta={"ACTION": "ENRICH", "TARGET": item_id})
LOGGER.info("Build asset {}".format(asset_type))
start = time()
asset, asset_location = driver.create_asset(
Expand All @@ -121,14 +118,14 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset
LOGGER.info("Enrichment success", extra={EVENT_KIND_KEY: "event", EVENT_CATEGORY_KEY: "file", EVENT_TYPE_KEY: USER_ACTION_KEY, EVENT_ACTION: "enrich", EVENT_OUTCOME_KEY: "success", EVENT_MODULE_KEY: "aproc-enrich", ARLAS_COLLECTION_KEY: collection, ARLAS_ITEM_ID_KEY: item_id})

LOGGER.debug("ingestion: 2 - upload asset if needed")
__update_status__(self, state='PROGRESS', meta={'step': 'upload', 'current': 1, 'asset': asset.name, 'total': len(item.assets), "ACTION": "ENRICH", "TARGET": item_id})
Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'upload', 'current': 1, 'asset': asset.name, 'total': len(item.assets), "ACTION": "ENRICH", "TARGET": item_id})
start = time()
IngestAprocProcess.upload_asset_if_managed(item, asset, AprocConfiguration.settings.airs_endpoint)
end = time()
LOGGER.info("took {} ms".format(end - start))

LOGGER.debug("ingestion: 3 - update")
__update_status__(self, state='PROGRESS', meta={'step': 'update_item', "ACTION": "ENRICH", "TARGET": item_id})
Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'update_item', "ACTION": "ENRICH", "TARGET": item_id})
item: Item = IngestAprocProcess.insert_or_update_item(item, AprocConfiguration.settings.airs_endpoint)
item_locations.append(os.path.join(AprocConfiguration.settings.airs_endpoint, "collections", item.collection, "items", item.id))
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions extensions/aproc/proc/ingest/directory_ingest_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_resource_id(inputs: BaseModel):
return InputDirectoryIngestProcess(**inputs.model_dump()).directory

@shared_task(bind=True, track_started=True)
def execute(self, headers: dict[str, str], directory: str, collection: str, catalog: str, annotations: str) -> dict:
def execute(self, headers: dict[str, str], directory: str, collection: str, catalog: str, annotations: str, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict:
# self is a celery task because bind=True
""" ingest the archives contained in the directory url. Every archive ingestion becomes a new process

Expand All @@ -93,7 +93,7 @@ def execute(self, headers: dict[str, str], directory: str, collection: str, cata
for archive in archives:
LOGGER.info(archive.model_dump_json())
try:
inputs = InputIngestProcess(url=os.path.join(Configuration.settings.inputs_directory, archive.path), collection=collection, catalog=catalog, annotations=annotations)
inputs = InputIngestProcess(url=os.path.join(Configuration.settings.inputs_directory, archive.path), collection=collection, catalog=catalog, annotations=annotations, include_drivers=include_drivers, exclude_drivers=exclude_drivers)
execute = Execute(inputs=inputs.model_dump())
r = requests.post("/".join([Configuration.settings.aproc_endpoint, "processes", "ingest", "execution"]), data=json.dumps(execute.model_dump()), headers=headers)
if not r.ok:
Expand Down
Loading
Loading