diff --git a/aproc/core/processes/process.py b/aproc/core/processes/process.py index 16d3a41..48ee600 100644 --- a/aproc/core/processes/process.py +++ b/aproc/core/processes/process.py @@ -1,6 +1,7 @@ import logging from abc import ABC, abstractmethod +from celery import Task from pydantic import BaseModel from aproc.core.models.ogc import ProcessDescription, ProcessSummary @@ -21,6 +22,13 @@ def set_logger(cls, logger) -> None: """ cls.LOGGER = logger + @staticmethod + def update_task_status(LOGGER: logging.Logger, task: Task, state: str, meta: dict = {}): + if task.request.id is not None: + task.update_state(state=state, meta=meta) + else: + LOGGER.debug(task.name + " " + state + " " + str(meta)) + @staticmethod @abstractmethod def init(configuration: dict): diff --git a/aproc/service/ogc_processes_api.py b/aproc/service/ogc_processes_api.py index 895952d..b387613 100644 --- a/aproc/service/ogc_processes_api.py +++ b/aproc/service/ogc_processes_api.py @@ -213,6 +213,7 @@ def get_process_description(process_id: str): "model": RESTException } }) + def post_process_execute(process_id: str, execute: Execute, request: Request): process = __get_process(process_id) try: diff --git a/extensions/aproc/proc/download/download_process.py b/extensions/aproc/proc/download/download_process.py index ef13827..1ad35cb 100644 --- a/extensions/aproc/proc/download/download_process.py +++ b/extensions/aproc/proc/download/download_process.py @@ -26,6 +26,7 @@ from extensions.aproc.proc.drivers.driver_manager import DriverManager from extensions.aproc.proc.drivers.exceptions import (DriverException, RegisterException) +from extensions.aproc.proc.processes.process_model import InputProcess from extensions.aproc.proc.variables import (ARLAS_COLLECTION_KEY, ARLAS_ITEM_ID_KEY, DOWNLOAD_FAILED_MSG, EVENT_ACTION, @@ -39,13 +40,7 @@ LOGGER = Logger.logger -def __update_status__(task: Task, state: str, meta: dict = None): - LOGGER.info(task.name + " " + state + " " + str(meta)) - if task.request.id is not None: - task.update_state(state=state, meta=meta) - - -class InputDownloadProcess(BaseModel): +class InputDownloadProcess(InputProcess): requests: list[dict[str, str]] = Field(default=[], title="The list of item (collection, item_id) to download") crop_wkt: str = Field(default=None, title="WKT geometry for cropping the data") target_projection: str = Field(default=None, title="epsg target projection") @@ -88,6 +83,8 @@ def init(configuration: dict): raise DriverException("Invalid configuration for download drivers ({})".format(configuration)) AprocProcess.input_model = InputDownloadProcess Notifications.init() + description.inputs.get("include_drivers").schema_.items.enum = DriverManager.driver_names(summary.id) + description.inputs.get("exclude_drivers").schema_.items.enum = DriverManager.driver_names(summary.id) @staticmethod def get_process_description() -> ProcessDescription: @@ -98,7 +95,7 @@ def get_process_summary() -> ProcessSummary: return summary @staticmethod - def before_execute(headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True) -> dict[str, str]: + def before_execute(headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict[str, str]: (send_to, user_id) = AprocProcess.__get_user_email__(headers.get("authorization")) for request in requests: collection: str = request.get("collection") @@ -170,7 +167,7 @@ def get_resource_id(inputs: BaseModel): return hash_object.hexdigest() @shared_task(bind=True, track_started=True) - def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True) -> dict: + def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict: (send_to, user_id) = AprocProcess.__get_user_email__(headers.get("authorization")) LOGGER.debug("processing download requests from {}".format(send_to)) download_locations = [] @@ -196,11 +193,11 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_ Notifications.report(None, DownloadConfiguration.settings.email_subject_error_download, DownloadConfiguration.settings.email_content_error_download, DownloadConfiguration.settings.notification_admin_emails.split(","), context=mail_context, outcome="failure") raise RegisterException(error_msg) - driver: DownloadDriver = DriverManager.solve(summary.id, item) + driver: DownloadDriver = DriverManager.solve(summary.id, item, include_drivers=include_drivers, exclude_drivers=exclude_drivers) if driver is not None: try: LOGGER.info("Download will be done by {}".format(driver.name)) - __update_status__(self, state='PROGRESS', meta={"ACTION": "DOWNLOAD", "TARGET": item_id}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={"ACTION": "DOWNLOAD", "TARGET": item_id}) target_directory, relative_target_directory = AprocProcess.__get_download_location__(item, send_to) LOGGER.info("Download will be placed in {}".format(target_directory)) mail_context["target_directory"] = target_directory diff --git a/extensions/aproc/proc/drivers/driver_manager.py b/extensions/aproc/proc/drivers/driver_manager.py index 5d5aa92..749e661 100644 --- a/extensions/aproc/proc/drivers/driver_manager.py +++ b/extensions/aproc/proc/drivers/driver_manager.py @@ -29,13 +29,24 @@ def init(process: str, drivers: list[DriverConfiguration]): LOGGER.info("{}: {}".format(driver.priority, driver.name)) @staticmethod - def solve(process: str, ressource) -> AbstractDriver: + def driver_names(process: str) -> list[str]: + return list(map(lambda p: p.name, DriverManager.drivers[process])) + + @staticmethod + def solve(process: str, resource, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> AbstractDriver: DriverManager.__check_drivers(process) - for driver_class in DriverManager.drivers.get(process, []): + drivers = DriverManager.drivers.get(process, []) + if include_drivers and len(include_drivers) > 0: + LOGGER.debug("keep only {}".format(include_drivers)) + drivers = list(filter(lambda driver_class: driver_class.name in include_drivers, drivers)) + if exclude_drivers and len(exclude_drivers) > 0: + LOGGER.debug("exclude {}".format(exclude_drivers)) + drivers = list(filter(lambda driver_class: driver_class.name not in exclude_drivers, drivers)) + for driver_class in drivers: try: LOGGER.debug("Test driver {}".format(driver_class.name)) driver: AbstractDriver = driver_class() - if driver.supports(ressource) is True: + if driver.supports(resource) is True: return driver except Exception as e: LOGGER.exception(e) diff --git a/extensions/aproc/proc/enrich/enrich_process.py b/extensions/aproc/proc/enrich/enrich_process.py index 0587a8f..e0ed86e 100644 --- a/extensions/aproc/proc/enrich/enrich_process.py +++ b/extensions/aproc/proc/enrich/enrich_process.py @@ -15,6 +15,7 @@ from aproc.core.settings import Configuration as AprocConfiguration from aproc.core.utils import base_model2description from extensions.aproc.proc.drivers.driver_manager import DriverManager +from extensions.aproc.proc.processes.process_model import InputProcess from extensions.aproc.proc.variables import EVENT_KIND_KEY, EVENT_CATEGORY_KEY, EVENT_REASON, EVENT_TYPE_KEY, USER_ACTION_KEY, EVENT_ACTION, EVENT_OUTCOME_KEY, EVENT_MODULE_KEY, ARLAS_COLLECTION_KEY, ARLAS_ITEM_ID_KEY, ENRICHMENT_FAILED_MSG from extensions.aproc.proc.drivers.exceptions import DriverException from extensions.aproc.proc.enrich.drivers.enrich_driver import EnrichDriver @@ -25,13 +26,7 @@ LOGGER = Logger.logger -def __update_status__(task: Task, state: str, meta: dict = None): - LOGGER.info(task.name + " " + state + " " + str(meta)) - if task.request.id is not None: - task.update_state(state=state, meta=meta) - - -class InputEnrichProcess(BaseModel): +class InputEnrichProcess(InputProcess): requests: list[dict[str, str]] = Field(default=[], title="The list of items (collection, item_id) to enrich") asset_type: str = Field(default=None, title="Name of the asset type to add (e.g. cog)") @@ -70,6 +65,8 @@ def init(configuration: dict): else: raise DriverException("Invalid configuration for enrich drivers ({})".format(configuration)) AprocProcess.input_model = InputEnrichProcess + description.inputs.get("include_drivers").schema_.items.enum = DriverManager.driver_names(summary.id) + description.inputs.get("exclude_drivers").schema_.items.enum = DriverManager.driver_names(summary.id) @staticmethod def get_process_description() -> ProcessDescription: @@ -80,7 +77,7 @@ def get_process_summary() -> ProcessSummary: return summary @staticmethod - def before_execute(headers: dict[str, str], requests: list[dict[str, str]], asset_type: str) -> dict[str, str]: + def before_execute(headers: dict[str, str], requests: list[dict[str, str]], asset_type: str, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict[str, str]: return {} def get_resource_id(inputs: BaseModel): @@ -89,7 +86,7 @@ def get_resource_id(inputs: BaseModel): return hash_object.hexdigest() @shared_task(bind=True, track_started=True) - def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset_type: str) -> dict: + def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset_type: str, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict: item_locations = [] for request in requests: collection: str = request.get("collection") @@ -100,11 +97,11 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset LOGGER.error(error_msg) LOGGER.info(ENRICHMENT_FAILED_MSG, extra={EVENT_KIND_KEY: "event", EVENT_CATEGORY_KEY: "file", EVENT_TYPE_KEY: USER_ACTION_KEY, EVENT_ACTION: "enrich", EVENT_OUTCOME_KEY: "failure", EVENT_REASON: error_msg, EVENT_MODULE_KEY: "aproc-enrich", ARLAS_COLLECTION_KEY: collection, ARLAS_ITEM_ID_KEY: item_id}) raise DriverException(error_msg) - driver: EnrichDriver = DriverManager.solve(summary.id, item) + driver: EnrichDriver = DriverManager.solve(summary.id, item, include_drivers=include_drivers, exclude_drivers=exclude_drivers) if driver is not None: try: LOGGER.info("ingestion: 1 - enrichment will be done by {}".format(driver.name)) - __update_status__(self, state='PROGRESS', meta={"ACTION": "ENRICH", "TARGET": item_id}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={"ACTION": "ENRICH", "TARGET": item_id}) LOGGER.info("Build asset {}".format(asset_type)) start = time() asset, asset_location = driver.create_asset( @@ -121,14 +118,14 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], asset LOGGER.info("Enrichment success", extra={EVENT_KIND_KEY: "event", EVENT_CATEGORY_KEY: "file", EVENT_TYPE_KEY: USER_ACTION_KEY, EVENT_ACTION: "enrich", EVENT_OUTCOME_KEY: "success", EVENT_MODULE_KEY: "aproc-enrich", ARLAS_COLLECTION_KEY: collection, ARLAS_ITEM_ID_KEY: item_id}) LOGGER.debug("ingestion: 2 - upload asset if needed") - __update_status__(self, state='PROGRESS', meta={'step': 'upload', 'current': 1, 'asset': asset.name, 'total': len(item.assets), "ACTION": "ENRICH", "TARGET": item_id}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'upload', 'current': 1, 'asset': asset.name, 'total': len(item.assets), "ACTION": "ENRICH", "TARGET": item_id}) start = time() IngestAprocProcess.upload_asset_if_managed(item, asset, AprocConfiguration.settings.airs_endpoint) end = time() LOGGER.info("took {} ms".format(end - start)) LOGGER.debug("ingestion: 3 - update") - __update_status__(self, state='PROGRESS', meta={'step': 'update_item', "ACTION": "ENRICH", "TARGET": item_id}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'update_item', "ACTION": "ENRICH", "TARGET": item_id}) item: Item = IngestAprocProcess.insert_or_update_item(item, AprocConfiguration.settings.airs_endpoint) item_locations.append(os.path.join(AprocConfiguration.settings.airs_endpoint, "collections", item.collection, "items", item.id)) except Exception as e: diff --git a/extensions/aproc/proc/ingest/directory_ingest_process.py b/extensions/aproc/proc/ingest/directory_ingest_process.py index 23eee2b..833157e 100644 --- a/extensions/aproc/proc/ingest/directory_ingest_process.py +++ b/extensions/aproc/proc/ingest/directory_ingest_process.py @@ -76,7 +76,7 @@ def get_resource_id(inputs: BaseModel): return InputDirectoryIngestProcess(**inputs.model_dump()).directory @shared_task(bind=True, track_started=True) - def execute(self, headers: dict[str, str], directory: str, collection: str, catalog: str, annotations: str) -> dict: + def execute(self, headers: dict[str, str], directory: str, collection: str, catalog: str, annotations: str, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict: # self is a celery task because bind=True """ ingest the archives contained in the directory url. Every archive ingestion becomes a new process @@ -93,7 +93,7 @@ def execute(self, headers: dict[str, str], directory: str, collection: str, cata for archive in archives: LOGGER.info(archive.model_dump_json()) try: - inputs = InputIngestProcess(url=os.path.join(Configuration.settings.inputs_directory, archive.path), collection=collection, catalog=catalog, annotations=annotations) + inputs = InputIngestProcess(url=os.path.join(Configuration.settings.inputs_directory, archive.path), collection=collection, catalog=catalog, annotations=annotations, include_drivers=include_drivers, exclude_drivers=exclude_drivers) execute = Execute(inputs=inputs.model_dump()) r = requests.post("/".join([Configuration.settings.aproc_endpoint, "processes", "ingest", "execution"]), data=json.dumps(execute.model_dump()), headers=headers) if not r.ok: diff --git a/extensions/aproc/proc/ingest/ingest_process.py b/extensions/aproc/proc/ingest/ingest_process.py index bff3b68..c157a76 100644 --- a/extensions/aproc/proc/ingest/ingest_process.py +++ b/extensions/aproc/proc/ingest/ingest_process.py @@ -17,18 +17,12 @@ from extensions.aproc.proc.drivers.exceptions import ( ConnectionException, DriverException, RegisterException) from extensions.aproc.proc.ingest.settings import Configuration as IngestConfiguration +from extensions.aproc.proc.processes.process_model import InputProcess DRIVERS_CONFIGURATION_FILE_PARAM_NAME = "drivers" LOGGER = Logger.logger -def __update_status__(task: Task, state: str, meta: dict = None): - if task.request.id is not None: - task.update_state(state=state, meta=meta) - else: - LOGGER.debug(task.name + " " + state + " " + str(meta)) - - -class InputIngestProcess(BaseModel): +class InputIngestProcess(InputProcess): collection: str = Field(title="Collection name", description="Name of the collection where the item will be registered", minOccurs=1, maxOccurs=1) catalog: str = Field(title="Catalog name", description="Name of the catalog, within the collection, where the item will be registered", minOccurs=1, maxOccurs=1) url: str = Field(title="Archive URL", description="URL pointing at the archive", minOccurs=1, maxOccurs=1) @@ -72,6 +66,8 @@ def init(configuration: dict): else: raise DriverException("Invalid configuration for ingest drivers ({})".format(configuration)) AprocProcess.input_model = InputIngestProcess + description.inputs.get("include_drivers").schema_.items.enum = DriverManager.driver_names(summary.id) + description.inputs.get("exclude_drivers").schema_.items.enum = DriverManager.driver_names(summary.id) @staticmethod def get_process_description() -> ProcessDescription: @@ -89,7 +85,7 @@ def get_resource_id(inputs: BaseModel): raise DriverException("No driver found for {}".format(url)) @shared_task(bind=True, track_started=True) - def execute(self, headers: dict[str, str], url: str, collection: str, catalog: str, annotations: str) -> dict: + def execute(self, headers: dict[str, str], url: str, collection: str, catalog: str, annotations: str, include_drivers: list[str] = [], exclude_drivers: list[str] = []) -> dict: # self is a celery task because bind=True """ ingest the archive url in 6 step: - identify the driver for ingestion @@ -110,17 +106,17 @@ def execute(self, headers: dict[str, str], url: str, collection: str, catalog: s if not os.path.exists(url): msg = "File or directory {} not found".format(url) LOGGER.warning(msg) - driver: IngestDriver = DriverManager.solve(summary.id, url) + driver: IngestDriver = DriverManager.solve(summary.id, url, include_drivers=include_drivers, exclude_drivers=exclude_drivers) if driver is not None: try: LOGGER.info("Driver {} will be used".format(driver.name)) LOGGER.debug("ingestion: 1 - identify_assets") - __update_status__(self, state='PROGRESS', meta={'step': 'identify_assets', "ACTION": "INGEST", "TARGET": url}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'identify_assets', "ACTION": "INGEST", "TARGET": url}) assets: list[Asset] = driver.identify_assets(url) AprocProcess.__check_assets__(url, assets) LOGGER.debug("ingestion: 2 - fetch_assets") - __update_status__(self, state='PROGRESS', meta={'step': 'fetch_assets', "ACTION": "INGEST", "TARGET": url}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'fetch_assets', "ACTION": "INGEST", "TARGET": url}) try: assets = driver.fetch_assets(url, assets) except Exception as e: @@ -130,12 +126,12 @@ def execute(self, headers: dict[str, str], url: str, collection: str, catalog: s AprocProcess.__check_assets__(url, assets, file_exists=True) LOGGER.debug("ingestion: 3 - transform_assets") - __update_status__(self, state='PROGRESS', meta={'step': 'transform_assets', "ACTION": "INGEST", "TARGET": url}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'transform_assets', "ACTION": "INGEST", "TARGET": url}) assets = driver.transform_assets(url, assets) AprocProcess.__check_assets__(url, assets, file_exists=True) LOGGER.debug("ingestion: 4 - create_item") - __update_status__(self, state='PROGRESS', meta={'step': 'create_item', "ACTION": "INGEST", "TARGET": url}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'create_item', "ACTION": "INGEST", "TARGET": url}) item = driver.to_item(url, assets) item.collection = collection item.catalog = catalog @@ -145,12 +141,12 @@ def execute(self, headers: dict[str, str], url: str, collection: str, catalog: s LOGGER.debug("ingestion: 5 - upload") i: int = 0 for asset_name, asset in item.assets.items(): - __update_status__(self, state='PROGRESS', meta={'step': 'upload', 'current': i, 'asset': asset_name, 'total': len(item.assets), "ACTION": "INGEST", "TARGET": url}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'upload', 'current': i, 'asset': asset_name, 'total': len(item.assets), "ACTION": "INGEST", "TARGET": url}) i += 1 asset: Asset = asset AprocProcess.upload_asset_if_managed(item, asset, Configuration.settings.airs_endpoint) LOGGER.debug("ingestion: 6 - register") - __update_status__(self, state='PROGRESS', meta={'step': 'register_item', "ACTION": "INGEST", "TARGET": url}) + Process.update_task_status(LOGGER, self, state='PROGRESS', meta={'step': 'register_item', "ACTION": "INGEST", "TARGET": url}) item: Item = AprocProcess.insert_or_update_item(item, Configuration.settings.airs_endpoint) return OutputIngestProcess(collection=collection, catalog=catalog, archive_url=url, item_location=os.path.join(Configuration.settings.airs_endpoint, "collections", item.collection, "items", item.id)).model_dump() except Exception as err: diff --git a/extensions/aproc/proc/processes/process_model.py b/extensions/aproc/proc/processes/process_model.py new file mode 100644 index 0000000..cab6c67 --- /dev/null +++ b/extensions/aproc/proc/processes/process_model.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel, Field + + +class InputProcess(BaseModel): + include_drivers: list[str] = Field(default=[], title="List of drivers to include. If none, all are included") + exclude_drivers: list[str] = Field(default=[], title="List of drivers to exclude. If none, none are excluded") diff --git a/test/aproc_ingest_tests.py b/test/aproc_ingest_tests.py index c65fd96..ecd54d6 100644 --- a/test/aproc_ingest_tests.py +++ b/test/aproc_ingest_tests.py @@ -19,8 +19,8 @@ class Tests(unittest.TestCase): def setUp(self): setUpTest() - def ingest(self, url, collection, catalog, expected=StatusCode.successful): - inputs = InputIngestProcess(url=url, collection=collection, catalog=catalog, annotations="") + def ingest(self, url, collection, catalog, expected=StatusCode.successful, include_drivers: list[str] = [], exclude_drivers: list[str] = []): + inputs = InputIngestProcess(url=url, collection=collection, catalog=catalog, annotations="", include_drivers=include_drivers, exclude_drivers=exclude_drivers) execute = Execute(inputs=inputs.model_dump()) r = requests.post("/".join([APROC_ENDPOINT, "processes/ingest/execution"]), data=json.dumps(execute.model_dump()), headers={"Content-Type": "application/json"}) self.assertTrue(r.ok, str(r.status_code) + ": " + str(r.content)) @@ -42,8 +42,8 @@ def ingest_directory(self, url, collection, catalog): status: StatusInfo = StatusInfo(**json.loads(requests.get("/".join([APROC_ENDPOINT, "jobs", status.jobID])).content)) self.assertEqual(status.status, StatusCode.successful) - def async_ingest(self, url, id, assets: list[str], archive=True): - status = self.ingest(url, COLLECTION, CATALOG) + def async_ingest(self, url, id, assets: list[str], archive=True, include_drivers: list[str] = [], exclude_drivers: list[str] = []): + status = self.ingest(url, COLLECTION, CATALOG, include_drivers=include_drivers, exclude_drivers=exclude_drivers) result = json.loads(requests.get("/".join([APROC_ENDPOINT, "jobs", status.jobID, "results"])).content) self.assertEqual(result["item_location"], "http://airs-server:8000/arlas/airs/collections/" + COLLECTION + "/items/" + id, result["item_location"]) item = mapper.item_from_json(requests.get(result["item_location"]).content) @@ -55,6 +55,22 @@ def test_async_ingest_dimap(self): # Driver DIMAP id = "148ddaaa431bdd2ff06b823df1e3725d462f668bd95188603bfff443ff055c71" self.async_ingest(url, id, ["thumbnail", "overview", "data", "metadata", "extent", "airs_item"]) + def test_async_ingest_dimap_driver_include(self): # Driver DIMAP + url = "/inputs/DIMAP/PROD_SPOT6_001/VOL_SPOT6_001_A/IMG_SPOT6_MS_001_A/" + self.ingest(url, COLLECTION, CATALOG, include_drivers=["dimap"]) + + def test_async_ingest_dimap_driver_include_fail(self): # Driver DIMAP + url = "/inputs/DIMAP/PROD_SPOT6_001/VOL_SPOT6_001_A/IMG_SPOT6_MS_001_A/" + self.ingest(url, COLLECTION, CATALOG, include_drivers=["spot5"], expected=StatusCode.failed) + + def test_async_ingest_dimap_driver_exclude(self): # Driver DIMAP + url = "/inputs/DIMAP/PROD_SPOT6_001/VOL_SPOT6_001_A/IMG_SPOT6_MS_001_A/" + self.ingest(url, COLLECTION, CATALOG, exclude_drivers=["spot5"]) + + def test_async_ingest_dimap_driver_exclude_fail(self): # Driver DIMAP + url = "/inputs/DIMAP/PROD_SPOT6_001/VOL_SPOT6_001_A/IMG_SPOT6_MS_001_A/" + self.ingest(url, COLLECTION, CATALOG, exclude_drivers=["dimap"], expected=StatusCode.failed) + def test_async_ingest_ikonos(self): # Driver GEOEYE url = "/inputs/IK2_OPER_OSA_GEO_1P_20080715T105300_N43-318_E003-351_0001.SIP/20081014210521_po_2624415_0000000/po_2624415_blu_0000000.tif" id = "0e73667ac0bd10b5f18bcb5ee40518db973b2946fe8b40d2b4cb988724ac9507"