diff --git a/extensions/aproc/proc/download/download_process.py b/extensions/aproc/proc/download/download_process.py index 3af925f3..387f6e97 100644 --- a/extensions/aproc/proc/download/download_process.py +++ b/extensions/aproc/proc/download/download_process.py @@ -1,19 +1,15 @@ import hashlib -import json import os -import time import requests from celery import Task, shared_task from pydantic import BaseModel, Field -import logging -import http.client +from datetime import datetime from airs.core.models import mapper from airs.core.models.model import Item from aproc.core.logger import Logger from aproc.core.models.ogc import ProcessDescription, ProcessSummary from aproc.core.models.ogc.enums import JobControlOptions, TransmissionMode -from aproc.core.models.ogc.job import StatusInfo from aproc.core.processes.process import Process as Process from aproc.core.settings import Configuration as AprocConfiguration from aproc.core.utils import base_model2description @@ -39,6 +35,7 @@ class InputDownloadProcess(BaseModel): crop_wkt: str = Field(default=None, title="WKT geometry for cropping the data") target_projection: str = Field(default=None, title="epsg target projection") target_format: str = Field(default=None, title="target format") + raw_archive: bool = Field(default=True, title="raw archive") class OutputDownloadProcess(BaseModel): @@ -84,12 +81,13 @@ def get_process_summary() -> ProcessSummary: return summary @staticmethod - def before_execute(headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str, target_format: str = "Geotiff") -> dict[str, str]: + def before_execute(headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True) -> dict[str, str]: (send_to, user_id) = AprocProcess.__get_user_email__(headers.get("authorization")) for request in requests: collection: str = request.get("collection") item_id: str = request.get("item_id") mail_context = { + "raw_archive": raw_archive, "target_projection": target_projection, "target_format": target_format, "item_id": item_id, @@ -137,16 +135,13 @@ def __get_user_email__(authorization: str): LOGGER.exception(e) return (send_to, user_id) - def __get_download_location__(item: Item, send_to: str, format: str) -> (str, str): + def __get_download_location__(item: Item, send_to: str) -> str: if send_to is None: send_to = "anonymous" - target_directory = os.path.join(Configuration.settings.outbox_directory, send_to.split("@")[0].replace(".","_").replace("-","_"), item.id) + target_directory = os.path.join(Configuration.settings.outbox_directory, send_to.split("@")[0].replace(".","_").replace("-","_"), item.id + "_" + datetime.now().strftime("%Y_%m_%d_%H_%M_%S")) if not os.path.exists(target_directory): LOGGER.info("create {}".format(target_directory)) os.makedirs(target_directory) - file_name = os.path.basename(item.id.replace("-", "_").replace(" ", "_").replace("/", "_").replace("\\", "_").replace("@", "_"))+"."+format - if os.path.exists(file_name): - file_name = hashlib.md5(str(time.time_ns()).encode("utf-8")).hexdigest()+file_name - return (target_directory, file_name) + return target_directory def get_resource_id(inputs: BaseModel): inputs: InputDownloadProcess = InputDownloadProcess(**inputs.model_dump()) @@ -154,7 +149,7 @@ def get_resource_id(inputs: BaseModel): return hash_object.hexdigest() @shared_task(bind=True, track_started=True) - def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str, target_format: str = "Geotiff", raw_archive:bool = True) -> dict: + def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive:bool = True) -> dict: (send_to, user_id) = AprocProcess.__get_user_email__(headers.get("authorization")) LOGGER.debug("processing download requests from {}".format(send_to)) download_locations = [] @@ -184,15 +179,13 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_ if driver is not None: try: __update_status__(self, state='PROGRESS', meta={"ACTION": "DOWNLOAD", "TARGET": item_id}) - (target_directory, file_name) = AprocProcess.__get_download_location__(item, send_to, target_format) - LOGGER.info("Download will be placed in {}/{}".format(target_directory, file_name)) + target_directory = AprocProcess.__get_download_location__(item, send_to) + LOGGER.info("Download will be placed in {}".format(target_directory)) mail_context["target_directory"] = target_directory - mail_context["file_name"] = file_name mail_context = AprocProcess.__update_paths__(mail_context) driver.fetch_and_transform( item=item, target_directory=target_directory, - file_name=file_name, crop_wkt=crop_wkt, target_projection=target_projection, target_format=target_format, @@ -200,7 +193,7 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_ Notifications.report(item, Configuration.settings.email_subject_user, Configuration.settings.email_content_user, to=[send_to], context=mail_context, outcome="success") Notifications.report(item, Configuration.settings.email_subject_admin, Configuration.settings.email_content_admin, Configuration.settings.notification_admin_emails.split(","), context=mail_context) LOGGER.info("Download success", extra={"event.kind": "event", "event.category": "file", "event.type": "user-action", "event.action": "download", "event.outcome": "success", "user.id": user_id, "user.email": send_to, "event.module": "aproc-download", "arlas.collection": collection, "arlas.item.id": item_id}) - download_locations.append(os.path.join(target_directory, file_name)) + download_locations.append(target_directory) except Exception as e: error_msg = "Failed to download the item {}/{} ({})".format(collection, item_id, e.__cause__) LOGGER.info("Download failed", extra={"event.kind": "event", "event.category": "file", "event.type": "user-action", "event.action": "download", "event.outcome": "failure", "event.reason": error_msg, "user.id": user_id, "user.email": send_to, "event.module": "aproc-download", "arlas.collection": collection, "arlas.item.id": item_id}) diff --git a/extensions/aproc/proc/download/drivers/driver.py b/extensions/aproc/proc/download/drivers/driver.py index c55eb3ac..cb62f1e7 100644 --- a/extensions/aproc/proc/download/drivers/driver.py +++ b/extensions/aproc/proc/download/drivers/driver.py @@ -35,13 +35,12 @@ def supports(item: Item) -> bool: ... @abstractmethod - def fetch_and_transform(self, item: Item, target_directory: str, file_name: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool): + def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool): """ Fetch and transform the item, given the wanted projection, format and crop. The file must be placed in the provided target directory. Args: item (Item): item containing the asset to export target_directory (str): where the file must be placed - file_name (str): name of the file to use crop_wkt (str): geometry to crop the raster with target_projection (str): target projection target_format (str): target format diff --git a/extensions/aproc/proc/download/drivers/impl/met_file.py b/extensions/aproc/proc/download/drivers/impl/met_file.py index dc209ed9..2e6e9210 100644 --- a/extensions/aproc/proc/download/drivers/impl/met_file.py +++ b/extensions/aproc/proc/download/drivers/impl/met_file.py @@ -1,13 +1,12 @@ import os -from airs.core.models.model import Item, Role, ItemFormat +from airs.core.models.model import Item, Role, ItemFormat, AssetFormat from aproc.core.settings import Configuration from extensions.aproc.proc.download.drivers.driver import Driver as DownloadDriver -from datetime import datetime from extensions.aproc.proc.download.drivers.impl.utils import make_raw_archive_zip import shutil import xml.etree.ElementTree as ET -from zipfile import ZipFile + class Driver(DownloadDriver): @@ -27,71 +26,85 @@ def supports(item: Item) -> bool: return False # Implements drivers method - def fetch_and_transform(self, item: Item, target_directory: str, file_name: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool): + def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool): asset = item.assets.get(Role.metadata.value) met_file = asset.href if raw_archive: make_raw_archive_zip(met_file, target_directory) return # If the projetion and the format are natives, just copy the file - if target_projection == target_format == 'native': + if (target_projection == target_format == 'native') and (not crop_wkt): if item.properties.item_format == ItemFormat.dimap.value: self.copy_from_dimap(met_file,target_directory) elif item.properties.item_format == ItemFormat.terrasar.value: self.copy_from_terrasarx(met_file,target_directory) return from extensions.aproc.proc.download.drivers.impl.utils import extract - import pyproj met_file_name = os.path.basename(met_file) - epsg_target = pyproj.Proj(target_projection) # Default driver is GTiff driver_target = "GTiff" - if not target_format: - raise Exception("target_format must be either Geotiff or Jpeg2000") - if target_format == "Geotiff": - driver_target = "GTiff" - target_file_name = os.path.splitext(met_file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")+'.tif' + extension='.tif' + if (not target_format) or (target_format == 'native'): + if item.properties.main_asset_format == AssetFormat.jpg2000.value: + driver_target = "JP2OpenJPEG" + else: + driver_target = "GTiff" elif target_format == "Jpeg2000": driver_target = "JP2OpenJPEG" - target_file_name = os.path.splitext(met_file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")+'.JP2' - extract(crop_wkt, met_file, driver_target, epsg_target, target_directory, target_file_name, - target_projection) + if driver_target == "JP2OpenJPEG": + extension='.JP2' + target_file_name = os.path.splitext(met_file_name)[0] + extension + images = [] + if item.properties.item_format == ItemFormat.dimap.value: + images =list(map(lambda f: [f[0], os.path.splitext(f[1])[0]+extension],self.get_dimap_images(met_file, extension))) + elif item.properties.item_format == ItemFormat.terrasar.value: + images =list(map(lambda f: [f[0], os.path.splitext(f[1])[0]+extension],self.get_terrasarx_images(met_file, extension))) + extract(images,crop_wkt, met_file, driver_target, target_projection, target_directory ,target_file_name) - def copy_from_dimap(self,href: str, target_directory: str): + def get_dimap_images(self,href,extension): dir_name = os.path.dirname(href) - file_name = os.path.basename(href) tree = ET.parse(href) root = tree.getroot() + georef_file_extension = '.TFW' + if extension == '.JP2': + georef_file_extension = '.J2W' files_elements = root.findall('./Raster_Data/Data_Access/Data_Files/Data_File/DATA_FILE_PATH') - files = list(map(lambda f: [os.path.join(dir_name, f.attrib["href"]),f.attrib["href"]], files_elements)) - self.copy_from_met(files,target_directory,file_name) + files = list(map(lambda f: [os.path.join(dir_name, f.attrib["href"]), + f.attrib["href"], + os.path.join(dir_name, os.path.splitext(f.attrib["href"])[0]+georef_file_extension), + os.path.splitext(f.attrib["href"])[0]+georef_file_extension], files_elements)) + return files; - def copy_from_terrasarx(self,href: str, target_directory: str): + def get_terrasarx_images(self,href, extension): dir_name = os.path.dirname(href) - file_name = os.path.basename(href) tree = ET.parse(href) root = tree.getroot() + georef_file_extension = '.TFW' + if extension == '.JP2': + georef_file_extension = '.J2W' files_elements = root.findall('.productComponents/imageData/file/location') - print(files_elements) files = [] for file in files_elements: f = [str(file.find('path').text), str(file.find('filename').text)] - files.append([os.path.join(dir_name,f[0],f[1]),f[1]]) - self.copy_from_met(files,target_directory,file_name) + files.append([os.path.join(dir_name,f[0],f[1]),f[1], + os.path.join(dir_name,f[0], os.path.splitext(f[1])[0]+georef_file_extension), + os.path.splitext(f[1])[0]+georef_file_extension]) + return files + def copy_from_dimap(self,href: str, target_directory: str): + files = self.get_dimap_images(href) + self.copy_from_met(files,target_directory) - def copy_from_met(self,files,target_directory,file_name): - # If the met_file reference only one file we copy it - if len(files) == 1: - shutil.copyfile(files[0][0], os.path.join(target_directory,files[0][1])) - return - # If the met_file reference several files we zip it in one zip file - elif len(files) > 1: - tif_zip_file = ZipFile(os.path.join(target_directory,file_name+".zip"), mode='a') - for f in files: - valid_and_exist = os.path.isfile(f[0]) and os.path.exists(f[0]) - if valid_and_exist: - tif_zip_file.write(f[0],f[1]) - tif_zip_file.close() - return + def copy_from_terrasarx(self,href: str, target_directory: str): + files = self.get_terrasarx_images(href) + self.copy_from_met(files,target_directory) + + def copy_from_met(self,files,target_directory): + for f in files: + valid_and_exist = os.path.isfile(f[0]) and os.path.exists(f[0]) + if valid_and_exist: + shutil.copyfile(f[0],target_directory + "/" + f[1]) + valid_and_exist = os.path.isfile(f[2]) and os.path.exists(f[2]) + if valid_and_exist: + shutil.copyfile(f[2],target_directory + "/" + f[3]) \ No newline at end of file diff --git a/extensions/aproc/proc/download/drivers/impl/simple_copy.py b/extensions/aproc/proc/download/drivers/impl/simple_copy.py index b411efea..058482d5 100644 --- a/extensions/aproc/proc/download/drivers/impl/simple_copy.py +++ b/extensions/aproc/proc/download/drivers/impl/simple_copy.py @@ -1,4 +1,6 @@ +import hashlib import os +import time import shutil import requests from airs.core.models.model import Item, Role @@ -19,8 +21,11 @@ def supports(item: Item) -> bool: return data is not None and data.href is not None and (data.href.startswith("file://") or data.href.startswith("http://") or data.href.startswith("https://")) # Implements drivers method - def fetch_and_transform(self, item: Item, target_directory: str, file_name: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool): + def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool): data = item.assets.get(Role.data.value) + file_name = os.path.basename(item.id.replace("-", "_").replace(" ", "_").replace("/", "_").replace("\\", "_").replace("@", "_"))+"."+format + if os.path.exists(file_name): + file_name = hashlib.md5(str(time.time_ns()).encode("utf-8")).hexdigest()+file_name if data is not None: if data.href.startswith("file://"): shutil.copy(data.href, os.path.join(target_directory, file_name)) diff --git a/extensions/aproc/proc/download/drivers/impl/tif_file.py b/extensions/aproc/proc/download/drivers/impl/tif_file.py index 4817bceb..a8fb5c9e 100644 --- a/extensions/aproc/proc/download/drivers/impl/tif_file.py +++ b/extensions/aproc/proc/download/drivers/impl/tif_file.py @@ -1,10 +1,9 @@ import os import shutil -from airs.core.models.model import Item, Role +from airs.core.models.model import Item, Role, AssetFormat from aproc.core.settings import Configuration from extensions.aproc.proc.download.drivers.driver import Driver as DownloadDriver -from datetime import datetime from extensions.aproc.proc.download.drivers.impl.utils import make_raw_archive_zip @@ -27,30 +26,42 @@ def supports(item: Item) -> bool: return False # Implements drivers method - def fetch_and_transform(self, item: Item, target_directory: str, file_name: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool): + def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool): asset = item.assets.get(Role.data.value) tif_file = asset.href tif_file_name = os.path.basename(tif_file) if raw_archive: make_raw_archive_zip(tif_file, target_directory) return - if target_projection == target_format == 'native': - # If the projetion and the format are natives, just copy the file + if (target_projection == target_format == 'native') and (not crop_wkt): + # If the projetion and the format are natives, just copy the file and the georef file + georef_file_extension = '.TFW' + if item.properties.main_asset_format == AssetFormat.jpg2000.value: + georef_file_extension = '.J2W' shutil.copyfile(tif_file, os.path.join(target_directory, tif_file_name)) + georef_file_name = os.path.splitext(tif_file_name)[0]+georef_file_extension + dir_name = os.path.dirname(tif_file) + georef_file = os.path.join(dir_name,georef_file_name) + valid_and_exist = os.path.isfile(georef_file) and os.path.exists(georef_file) + if valid_and_exist: + shutil.copyfile(georef_file, os.path.join(target_directory, georef_file_name)) return from extensions.aproc.proc.download.drivers.impl.utils import extract - import pyproj - epsg_target = pyproj.Proj(target_projection) # Default driver is GTiff driver_target = "GTiff" - if target_format == "Geotiff": - driver_target = "GTiff" - target_file_name = os.path.splitext(tif_file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")+'.tif' + extension='.tif' + if (not target_format) or (target_format == 'native'): + if item.properties.main_asset_format == AssetFormat.jpg2000.value: + driver_target = "JP2OpenJPEG" + else: + driver_target = "GTiff" elif target_format == "Jpeg2000": driver_target = "JP2OpenJPEG" - target_file_name = os.path.splitext(tif_file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")+'.JP2' - extract(crop_wkt, tif_file, driver_target, epsg_target, target_directory, target_file_name, - target_projection) + if driver_target == "JP2OpenJPEG": + extension='.JP2' + target_file_name = os.path.splitext(tif_file_name)[0] + extension + extract([],crop_wkt, tif_file, driver_target, target_projection, target_directory, target_file_name) + diff --git a/extensions/aproc/proc/download/drivers/impl/utils.py b/extensions/aproc/proc/download/drivers/impl/utils.py index c0116e11..a37bf096 100644 --- a/extensions/aproc/proc/download/drivers/impl/utils.py +++ b/extensions/aproc/proc/download/drivers/impl/utils.py @@ -1,48 +1,95 @@ +from contextlib import contextmanager from datetime import datetime import shutil import os -import xml.etree.ElementTree as ET -from zipfile import ZipFile + +from aproc.core.logger import Logger + +LOGGER = Logger.logger + def setup_gdal(): from osgeo import gdal gdal.SetConfigOption('GDAL_DISABLE_READDIR_ON_OPEN', 'YES') + gdal.SetConfigOption('OSR_USE_ETMERC', 'YES') gdal.UseExceptions() gdal.PushErrorHandler('CPLQuietErrorHandler') gdal.VSICurlClearCache() -def extract(crop_wkt, file, driver_target, epsg_target, target_directory, target_file_name, - target_projection): +def extract(images,crop_wkt, file, driver_target, target_projection, target_directory, target_file_name): import pyproj import rasterio.mask - import shapely.wkt - from osgeo import osr - from rasterio.warp import calculate_default_transform - from shapely.ops import transform - with rasterio.open(file) as src: - epsg_4326 = pyproj.Proj('EPSG:4326') - epsg_src = pyproj.Proj(src.crs) - if not not crop_wkt: + if not not crop_wkt: + with rasterio.open(file) as src: + epsg_4326 = pyproj.Proj('EPSG:4326') + epsg_src = pyproj.Proj(src.crs) + from shapely.ops import transform + import shapely.wkt raw = shapely.wkt.loads(crop_wkt) project = pyproj.Transformer.from_proj(epsg_4326, epsg_src, always_xy=True) geom = transform(project.transform, raw) + out_image, out_transform = rasterio.mask.mask(src, [geom], crop=True) + out_meta = src.meta.copy() + update_params = {"height": out_image.shape[1], + "width": out_image.shape[2], + "transform": out_transform, + "driver":driver_target + } + out_meta.update(update_params) + writeWorldWide(out_image,target_directory + "/" + target_file_name) + with rasterio.open(target_directory + "/" + target_file_name, "w", **out_meta) as dest: + dest.write(out_image) + else: + epsg_target = pyproj.Proj(target_projection) + if images and len(images) > 0 : + for image in images: + with reproject_raster(image[0], epsg_target.crs, driver_target) as in_mem_ds: + kwargs = in_mem_ds.meta.copy() + writeWorldWide(in_mem_ds,target_directory + "/" + image[1]) + with rasterio.open(target_directory + "/" + image[1], "w", **kwargs) as dest: + dest.write(in_mem_ds.read()) else: - from shapely.geometry import box - raw = shapely.wkt.loads(box(*src.bounds).wkt) - project = pyproj.Transformer.from_proj(epsg_src, epsg_src, always_xy=True) - geom = transform(project.transform, raw) - srs = osr.SpatialReference() - srs.ImportFromEPSG(int(str(src.crs).split(":")[1])) - out_image, out_transform = rasterio.mask.mask(src, [geom], crop=crop_wkt is not None) - out_meta = src.meta.copy() - default_transform, width, height = calculate_default_transform(epsg_src.crs, epsg_target.crs, - out_image.shape[2], out_image.shape[1], - *geom.bounds) - out_meta.update( - {"driver": driver_target, "nodata": 0, "height": height, "width": width, "transform": default_transform, - "crs": {'init': target_projection}}) - with rasterio.open(target_directory + "/" + target_file_name, "w", **out_meta) as dest: - dest.write(out_image) + with reproject_raster(file, epsg_target.crs, driver_target) as in_mem_ds: + kwargs = in_mem_ds.meta.copy() + writeWorldWide(in_mem_ds,target_directory + "/" + target_file_name) + with rasterio.open(target_directory + "/" + target_file_name, "w", **kwargs) as dest: + dest.write(in_mem_ds.read()) + + +@contextmanager +def reproject_raster(in_path, crs, driver_target): + import rasterio.mask + from rasterio.io import MemoryFile + from rasterio.warp import calculate_default_transform, reproject, Resampling + # reproject raster to project crs + with rasterio.open(in_path) as src: + src_crs = src.crs + transform, width, height = calculate_default_transform(src_crs, crs, src.width, src.height, *src.bounds) + kwargs = src.meta.copy() + + kwargs.update({ + "driver": driver_target, + 'crs': crs, + 'transform': transform, + 'width': width, + 'height': height}) + + with MemoryFile() as memfile: + with memfile.open(**kwargs) as dst: + for i in range(1, src.count + 1): + reproject( + source=rasterio.band(src, i), + destination=rasterio.band(dst, i), + src_transform=src.transform, + src_crs=src.crs, + dst_transform=transform, + dst_crs=crs, + resampling=Resampling.nearest) + with memfile.open() as dataset: # Reopen as DatasetReader + yield dataset # Note yield not return as we're a contextmanager + + + def make_raw_archive_zip(href: str, target_directory: str): file_name = os.path.basename(href) @@ -51,3 +98,27 @@ def make_raw_archive_zip(href: str, target_directory: str): target_file_name = os.path.splitext(file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S") shutil.make_archive(target_directory + "/" + target_file_name, 'zip', dir_name) return + +def writeWorldWide(dataset,input): + geotransform = dataset.transform + (fpath, fname) = os.path.split(input) + (shortname, ext) = os.path.splitext(fname) + wext = '.' + ext[1] + ext[-1] + 'w' + output = os.path.join(fpath, shortname) + wext + if geotransform is not None: + world_file = open (output, 'w') + x = geotransform[2] + x_size = geotransform[0] + x_rot = geotransform[1] + y = geotransform[5] + y_rot = geotransform[3] + y_size = geotransform[4] + x = x_size/2+x + y = y_size/2+y + world_file.write('%s\n' % x_size) + world_file.write('%s\n' % x_rot) + world_file.write('%s\n' % y_rot) + world_file.write('%s\n' % y_size) + world_file.write('%s\n' % x) + world_file.write('%s\n' % y) + world_file.close() \ No newline at end of file