Skip to content

Commit

Permalink
Feat: optimize dimap download
Browse files Browse the repository at this point in the history
  • Loading branch information
mbarbet committed Jul 11, 2024
1 parent e5bacb8 commit 038f705
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 100 deletions.
29 changes: 11 additions & 18 deletions extensions/aproc/proc/download/download_process.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import hashlib
import json
import os
import time
import requests
from celery import Task, shared_task
from pydantic import BaseModel, Field
import logging
import http.client
from datetime import datetime

from airs.core.models import mapper
from airs.core.models.model import Item
from aproc.core.logger import Logger
from aproc.core.models.ogc import ProcessDescription, ProcessSummary
from aproc.core.models.ogc.enums import JobControlOptions, TransmissionMode
from aproc.core.models.ogc.job import StatusInfo
from aproc.core.processes.process import Process as Process
from aproc.core.settings import Configuration as AprocConfiguration
from aproc.core.utils import base_model2description
Expand All @@ -39,6 +35,7 @@ class InputDownloadProcess(BaseModel):
crop_wkt: str = Field(default=None, title="WKT geometry for cropping the data")
target_projection: str = Field(default=None, title="epsg target projection")
target_format: str = Field(default=None, title="target format")
raw_archive: bool = Field(default=True, title="raw archive")


class OutputDownloadProcess(BaseModel):
Expand Down Expand Up @@ -84,12 +81,13 @@ def get_process_summary() -> ProcessSummary:
return summary

@staticmethod
def before_execute(headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str, target_format: str = "Geotiff") -> dict[str, str]:
def before_execute(headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive: bool = True) -> dict[str, str]:
(send_to, user_id) = AprocProcess.__get_user_email__(headers.get("authorization"))
for request in requests:
collection: str = request.get("collection")
item_id: str = request.get("item_id")
mail_context = {
"raw_archive": raw_archive,
"target_projection": target_projection,
"target_format": target_format,
"item_id": item_id,
Expand Down Expand Up @@ -137,24 +135,21 @@ def __get_user_email__(authorization: str):
LOGGER.exception(e)
return (send_to, user_id)

def __get_download_location__(item: Item, send_to: str, format: str) -> (str, str):
def __get_download_location__(item: Item, send_to: str) -> str:
if send_to is None: send_to = "anonymous"
target_directory = os.path.join(Configuration.settings.outbox_directory, send_to.split("@")[0].replace(".","_").replace("-","_"), item.id)
target_directory = os.path.join(Configuration.settings.outbox_directory, send_to.split("@")[0].replace(".","_").replace("-","_"), item.id + "_" + datetime.now().strftime("%Y_%m_%d_%H_%M_%S"))
if not os.path.exists(target_directory):
LOGGER.info("create {}".format(target_directory))
os.makedirs(target_directory)
file_name = os.path.basename(item.id.replace("-", "_").replace(" ", "_").replace("/", "_").replace("\\", "_").replace("@", "_"))+"."+format
if os.path.exists(file_name):
file_name = hashlib.md5(str(time.time_ns()).encode("utf-8")).hexdigest()+file_name
return (target_directory, file_name)
return target_directory

def get_resource_id(inputs: BaseModel):
inputs: InputDownloadProcess = InputDownloadProcess(**inputs.model_dump())
hash_object = hashlib.sha1("/".join(list(map(lambda r: r["collection"]+r["item_id"], inputs.requests))).encode())
return hash_object.hexdigest()

@shared_task(bind=True, track_started=True)
def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str, target_format: str = "Geotiff", raw_archive:bool = True) -> dict:
def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_wkt: str, target_projection: str = "native", target_format: str = "native", raw_archive:bool = True) -> dict:
(send_to, user_id) = AprocProcess.__get_user_email__(headers.get("authorization"))
LOGGER.debug("processing download requests from {}".format(send_to))
download_locations = []
Expand Down Expand Up @@ -184,23 +179,21 @@ def execute(self, headers: dict[str, str], requests: list[dict[str, str]], crop_
if driver is not None:
try:
__update_status__(self, state='PROGRESS', meta={"ACTION": "DOWNLOAD", "TARGET": item_id})
(target_directory, file_name) = AprocProcess.__get_download_location__(item, send_to, target_format)
LOGGER.info("Download will be placed in {}/{}".format(target_directory, file_name))
target_directory = AprocProcess.__get_download_location__(item, send_to)
LOGGER.info("Download will be placed in {}".format(target_directory))
mail_context["target_directory"] = target_directory
mail_context["file_name"] = file_name
mail_context = AprocProcess.__update_paths__(mail_context)
driver.fetch_and_transform(
item=item,
target_directory=target_directory,
file_name=file_name,
crop_wkt=crop_wkt,
target_projection=target_projection,
target_format=target_format,
raw_archive=raw_archive)
Notifications.report(item, Configuration.settings.email_subject_user, Configuration.settings.email_content_user, to=[send_to], context=mail_context, outcome="success")
Notifications.report(item, Configuration.settings.email_subject_admin, Configuration.settings.email_content_admin, Configuration.settings.notification_admin_emails.split(","), context=mail_context)
LOGGER.info("Download success", extra={"event.kind": "event", "event.category": "file", "event.type": "user-action", "event.action": "download", "event.outcome": "success", "user.id": user_id, "user.email": send_to, "event.module": "aproc-download", "arlas.collection": collection, "arlas.item.id": item_id})
download_locations.append(os.path.join(target_directory, file_name))
download_locations.append(target_directory)
except Exception as e:
error_msg = "Failed to download the item {}/{} ({})".format(collection, item_id, e.__cause__)
LOGGER.info("Download failed", extra={"event.kind": "event", "event.category": "file", "event.type": "user-action", "event.action": "download", "event.outcome": "failure", "event.reason": error_msg, "user.id": user_id, "user.email": send_to, "event.module": "aproc-download", "arlas.collection": collection, "arlas.item.id": item_id})
Expand Down
3 changes: 1 addition & 2 deletions extensions/aproc/proc/download/drivers/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ def supports(item: Item) -> bool:
...

@abstractmethod
def fetch_and_transform(self, item: Item, target_directory: str, file_name: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool):
def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool):
""" Fetch and transform the item, given the wanted projection, format and crop. The file must be placed in the provided target directory.
Args:
item (Item): item containing the asset to export
target_directory (str): where the file must be placed
file_name (str): name of the file to use
crop_wkt (str): geometry to crop the raster with
target_projection (str): target projection
target_format (str): target format
Expand Down
89 changes: 51 additions & 38 deletions extensions/aproc/proc/download/drivers/impl/met_file.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import os
from airs.core.models.model import Item, Role, ItemFormat
from airs.core.models.model import Item, Role, ItemFormat, AssetFormat
from aproc.core.settings import Configuration
from extensions.aproc.proc.download.drivers.driver import Driver as DownloadDriver
from datetime import datetime

from extensions.aproc.proc.download.drivers.impl.utils import make_raw_archive_zip
import shutil
import xml.etree.ElementTree as ET
from zipfile import ZipFile


class Driver(DownloadDriver):

Expand All @@ -27,71 +26,85 @@ def supports(item: Item) -> bool:
return False

# Implements drivers method
def fetch_and_transform(self, item: Item, target_directory: str, file_name: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool):
def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool):
asset = item.assets.get(Role.metadata.value)
met_file = asset.href
if raw_archive:
make_raw_archive_zip(met_file, target_directory)
return
# If the projetion and the format are natives, just copy the file
if target_projection == target_format == 'native':
if (target_projection == target_format == 'native') and (not crop_wkt):
if item.properties.item_format == ItemFormat.dimap.value:
self.copy_from_dimap(met_file,target_directory)
elif item.properties.item_format == ItemFormat.terrasar.value:
self.copy_from_terrasarx(met_file,target_directory)
return
from extensions.aproc.proc.download.drivers.impl.utils import extract
import pyproj
met_file_name = os.path.basename(met_file)
epsg_target = pyproj.Proj(target_projection)
# Default driver is GTiff
driver_target = "GTiff"
if not target_format:
raise Exception("target_format must be either Geotiff or Jpeg2000")
if target_format == "Geotiff":
driver_target = "GTiff"
target_file_name = os.path.splitext(met_file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")+'.tif'
extension='.tif'
if (not target_format) or (target_format == 'native'):
if item.properties.main_asset_format == AssetFormat.jpg2000.value:
driver_target = "JP2OpenJPEG"
else:
driver_target = "GTiff"
elif target_format == "Jpeg2000":
driver_target = "JP2OpenJPEG"
target_file_name = os.path.splitext(met_file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")+'.JP2'
extract(crop_wkt, met_file, driver_target, epsg_target, target_directory, target_file_name,
target_projection)
if driver_target == "JP2OpenJPEG":
extension='.JP2'
target_file_name = os.path.splitext(met_file_name)[0] + extension
images = []
if item.properties.item_format == ItemFormat.dimap.value:
images =list(map(lambda f: [f[0], os.path.splitext(f[1])[0]+extension],self.get_dimap_images(met_file, extension)))
elif item.properties.item_format == ItemFormat.terrasar.value:
images =list(map(lambda f: [f[0], os.path.splitext(f[1])[0]+extension],self.get_terrasarx_images(met_file, extension)))
extract(images,crop_wkt, met_file, driver_target, target_projection, target_directory ,target_file_name)


def copy_from_dimap(self,href: str, target_directory: str):
def get_dimap_images(self,href,extension):
dir_name = os.path.dirname(href)
file_name = os.path.basename(href)
tree = ET.parse(href)
root = tree.getroot()
georef_file_extension = '.TFW'
if extension == '.JP2':
georef_file_extension = '.J2W'
files_elements = root.findall('./Raster_Data/Data_Access/Data_Files/Data_File/DATA_FILE_PATH')
files = list(map(lambda f: [os.path.join(dir_name, f.attrib["href"]),f.attrib["href"]], files_elements))
self.copy_from_met(files,target_directory,file_name)
files = list(map(lambda f: [os.path.join(dir_name, f.attrib["href"]),
f.attrib["href"],
os.path.join(dir_name, os.path.splitext(f.attrib["href"])[0]+georef_file_extension),
os.path.splitext(f.attrib["href"])[0]+georef_file_extension], files_elements))
return files;

def copy_from_terrasarx(self,href: str, target_directory: str):
def get_terrasarx_images(self,href, extension):
dir_name = os.path.dirname(href)
file_name = os.path.basename(href)
tree = ET.parse(href)
root = tree.getroot()
georef_file_extension = '.TFW'
if extension == '.JP2':
georef_file_extension = '.J2W'
files_elements = root.findall('.productComponents/imageData/file/location')
print(files_elements)
files = []
for file in files_elements:
f = [str(file.find('path').text), str(file.find('filename').text)]
files.append([os.path.join(dir_name,f[0],f[1]),f[1]])
self.copy_from_met(files,target_directory,file_name)
files.append([os.path.join(dir_name,f[0],f[1]),f[1],
os.path.join(dir_name,f[0], os.path.splitext(f[1])[0]+georef_file_extension),
os.path.splitext(f[1])[0]+georef_file_extension])
return files

def copy_from_dimap(self,href: str, target_directory: str):
files = self.get_dimap_images(href)
self.copy_from_met(files,target_directory)

def copy_from_met(self,files,target_directory,file_name):
# If the met_file reference only one file we copy it
if len(files) == 1:
shutil.copyfile(files[0][0], os.path.join(target_directory,files[0][1]))
return
# If the met_file reference several files we zip it in one zip file
elif len(files) > 1:
tif_zip_file = ZipFile(os.path.join(target_directory,file_name+".zip"), mode='a')
for f in files:
valid_and_exist = os.path.isfile(f[0]) and os.path.exists(f[0])
if valid_and_exist:
tif_zip_file.write(f[0],f[1])
tif_zip_file.close()
return
def copy_from_terrasarx(self,href: str, target_directory: str):
files = self.get_terrasarx_images(href)
self.copy_from_met(files,target_directory)

def copy_from_met(self,files,target_directory):
for f in files:
valid_and_exist = os.path.isfile(f[0]) and os.path.exists(f[0])
if valid_and_exist:
shutil.copyfile(f[0],target_directory + "/" + f[1])
valid_and_exist = os.path.isfile(f[2]) and os.path.exists(f[2])
if valid_and_exist:
shutil.copyfile(f[2],target_directory + "/" + f[3])
7 changes: 6 additions & 1 deletion extensions/aproc/proc/download/drivers/impl/simple_copy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import hashlib
import os
import time
import shutil
import requests
from airs.core.models.model import Item, Role
Expand All @@ -19,8 +21,11 @@ def supports(item: Item) -> bool:
return data is not None and data.href is not None and (data.href.startswith("file://") or data.href.startswith("http://") or data.href.startswith("https://"))

# Implements drivers method
def fetch_and_transform(self, item: Item, target_directory: str, file_name: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool):
def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool):
data = item.assets.get(Role.data.value)
file_name = os.path.basename(item.id.replace("-", "_").replace(" ", "_").replace("/", "_").replace("\\", "_").replace("@", "_"))+"."+format
if os.path.exists(file_name):
file_name = hashlib.md5(str(time.time_ns()).encode("utf-8")).hexdigest()+file_name
if data is not None:
if data.href.startswith("file://"):
shutil.copy(data.href, os.path.join(target_directory, file_name))
Expand Down
37 changes: 24 additions & 13 deletions extensions/aproc/proc/download/drivers/impl/tif_file.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import os
import shutil

from airs.core.models.model import Item, Role
from airs.core.models.model import Item, Role, AssetFormat
from aproc.core.settings import Configuration
from extensions.aproc.proc.download.drivers.driver import Driver as DownloadDriver
from datetime import datetime

from extensions.aproc.proc.download.drivers.impl.utils import make_raw_archive_zip

Expand All @@ -27,30 +26,42 @@ def supports(item: Item) -> bool:
return False

# Implements drivers method
def fetch_and_transform(self, item: Item, target_directory: str, file_name: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool):
def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str, target_projection: str, target_format: str, raw_archive: bool):
asset = item.assets.get(Role.data.value)
tif_file = asset.href
tif_file_name = os.path.basename(tif_file)
if raw_archive:
make_raw_archive_zip(tif_file, target_directory)
return
if target_projection == target_format == 'native':
# If the projetion and the format are natives, just copy the file
if (target_projection == target_format == 'native') and (not crop_wkt):
# If the projetion and the format are natives, just copy the file and the georef file
georef_file_extension = '.TFW'
if item.properties.main_asset_format == AssetFormat.jpg2000.value:
georef_file_extension = '.J2W'
shutil.copyfile(tif_file, os.path.join(target_directory, tif_file_name))
georef_file_name = os.path.splitext(tif_file_name)[0]+georef_file_extension
dir_name = os.path.dirname(tif_file)
georef_file = os.path.join(dir_name,georef_file_name)
valid_and_exist = os.path.isfile(georef_file) and os.path.exists(georef_file)
if valid_and_exist:
shutil.copyfile(georef_file, os.path.join(target_directory, georef_file_name))
return
from extensions.aproc.proc.download.drivers.impl.utils import extract
import pyproj
epsg_target = pyproj.Proj(target_projection)
# Default driver is GTiff
driver_target = "GTiff"
if target_format == "Geotiff":
driver_target = "GTiff"
target_file_name = os.path.splitext(tif_file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")+'.tif'
extension='.tif'
if (not target_format) or (target_format == 'native'):
if item.properties.main_asset_format == AssetFormat.jpg2000.value:
driver_target = "JP2OpenJPEG"
else:
driver_target = "GTiff"
elif target_format == "Jpeg2000":
driver_target = "JP2OpenJPEG"
target_file_name = os.path.splitext(tif_file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")+'.JP2'
extract(crop_wkt, tif_file, driver_target, epsg_target, target_directory, target_file_name,
target_projection)
if driver_target == "JP2OpenJPEG":
extension='.JP2'
target_file_name = os.path.splitext(tif_file_name)[0] + extension
extract([],crop_wkt, tif_file, driver_target, target_projection, target_directory, target_file_name)




Loading

0 comments on commit 038f705

Please sign in to comment.