Skip to content

Commit

Permalink
feat: add is_file and is_dir methods + test for download from cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
QuCMGisaia committed Jan 10, 2025
1 parent 0a16654 commit b4ea875
Show file tree
Hide file tree
Showing 15 changed files with 254 additions and 83 deletions.
24 changes: 19 additions & 5 deletions extensions/aproc/proc/access/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ def resolve_storage(href: str) -> AnyStorage:
"""

for s in AccessManager.storages:
if s.supports(href):
return s

try:
if s.supports(href):
return s
except Exception:
...
raise NotImplementedError(f"Storage for {href} is not configured")

@staticmethod
Expand All @@ -60,13 +62,13 @@ def get_storage_parameters(href: str):
return storage.get_storage_parameters()

@staticmethod
def pull(href: str, dst: str):
def pull(href: str, dst: str, is_dst_dir: bool):
"""
Pulls a file from a storage to write it in the local storage.
If the input storage is local, then it is a copy. Otherwise it is a download.
"""
storage = AccessManager.resolve_storage(href)
storage.pull(href, dst)
storage.pull(href, dst, is_dst_dir)

# Will return a yield
@staticmethod
Expand Down Expand Up @@ -126,3 +128,15 @@ def zip(href: str, target_directory: str):
dir_name = os.path.dirname(href)
target_file_name = os.path.splitext(file_name)[0] + datetime.now().strftime("%d-%m-%Y-%H-%M-%S")
shutil.make_archive(target_directory + "/" + target_file_name, 'zip', dir_name)

@staticmethod
def is_file(href: str):
storage = AccessManager.resolve_storage(href)

return storage.is_file(href)

@staticmethod
def is_dir(href: str):
storage = AccessManager.resolve_storage(href)

return storage.is_dir(href)
27 changes: 26 additions & 1 deletion extensions/aproc/proc/access/storages/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ def get_rasterio_session(self):
...

@abstractmethod
def pull(self, href: str, dst: str):
def pull(self, href: str, dst: str, is_dst_dir: bool):
"""Copy/Download the desired file from the file system to write it locally
Args:
href (str): File to fetch
dst (str): Destination of the file
is_dst_dir (bool): Whether the destination is a directory
"""
# Check that dst is local
scheme = urlparse(dst).scheme
Expand All @@ -81,3 +82,27 @@ def prepare_for_local_process(self, href: str) -> str:

self.pull(href, dst)
return dst

@abstractmethod
def is_file(self, href: str) -> bool:
"""Returns whether the specified href is a file
Args:
href(str): The href to test
Returns:
bool: Whether the input is a file
"""
...

@abstractmethod
def is_dir(self, href: str) -> bool:
"""Returns whether the specified href is a directory
Args:
href(str): The href to test
Returns:
bool: Whether the input is a directory
"""
...
11 changes: 9 additions & 2 deletions extensions/aproc/proc/access/storages/file.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import shutil
from pathlib import Path
from typing import Literal
Expand All @@ -23,11 +24,17 @@ def exists(self, href: str):
def get_rasterio_session(self):
return None

def pull(self, href: str, dst: str):
super().pull(href, dst)
def pull(self, href: str, dst: str, is_dst_dir: bool):
super().pull(href, dst, is_dst_dir)
shutil.copy(href, dst)

# @override method of AbstractStorage
def prepare_for_local_process(self, href: str):
# Skip pull as file is already present locally
return href

def is_file(self, href: str):
return os.path.isfile(href)

def is_dir(self, href: str):
return os.path.isdir(href)
16 changes: 12 additions & 4 deletions extensions/aproc/proc/access/storages/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,21 @@ def get_rasterio_session(self):

return rasterio.session.GSSession(credentials)

def pull(self, href: str, dst: str):
super().pull(href, dst)
def pull(self, href: str, dst: str, is_dst_dir: bool):
super().pull(href, dst, is_dst_dir)

bucket = self.__get_bucket()
blob = bucket.blob(urlparse(href).path[1:])

if os.path.isdir(dst):
if is_dst_dir:
# If it is a directory, then add filename at the end of the path to match shutil.copy behaviour
dst = os.path.join(dst, os.path.basename(dst))
dst = os.path.join(dst, os.path.basename(href))
blob.download_to_filename(dst)

def is_file(self, href: str):
return self.exists(href)

def is_dir(self, href: str):
# Does not handle empty folders
blobs = list(self.__get_bucket().list_blobs(prefix=href.removesuffix("/") + "/"))
return len(blobs) > 1
12 changes: 9 additions & 3 deletions extensions/aproc/proc/access/storages/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ def get_rasterio_session(self):
# Might not work
return None

def pull(self, href: str, dst: str):
super().pull(href, dst)
requests_get(href, dst, self.headers)
def pull(self, href: str, dst: str, is_dst_dir: bool):
super().pull(href, dst, is_dst_dir)
requests_get(href, dst, is_dst_dir, self.headers)

def is_file(self, href: str):
return self.exists(href)

def is_dir(self, href: str):
return False
12 changes: 9 additions & 3 deletions extensions/aproc/proc/access/storages/https.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ def get_rasterio_session(self):
# Might not work
return None

def pull(self, href: str, dst: str):
super().pull(href, dst)
requests_get(href, dst, self.headers)
def pull(self, href: str, dst: str, is_dst_dir: bool):
super().pull(href, dst, is_dst_dir)
requests_get(href, dst, is_dst_dir, self.headers)

def is_file(self, href: str):
return self.exists(href)

def is_dir(self, href: str):
return False
6 changes: 3 additions & 3 deletions extensions/aproc/proc/access/storages/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import requests


def requests_get(href: str, dst: str, headers: dict):
def requests_get(href: str, dst: str, is_dst_dir: bool, headers: dict):
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
r = requests.get(href, headers=headers, stream=True, verify=False)

if os.path.isdir(dst):
if is_dst_dir:
# If it is a directory, then add filename at the end of the path to match shutil.copy behaviour
dst = os.path.join(dst, os.path.basename(dst))
dst = os.path.join(dst, os.path.basename(href))

with open(dst, "wb") as out_file:
shutil.copyfileobj(r.raw, out_file)
Expand Down
10 changes: 5 additions & 5 deletions extensions/aproc/proc/download/drivers/impl/image_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str,
if item.properties.item_format and \
item.properties.item_format in [ItemFormat.geotiff.value, ItemFormat.jpeg2000.value]:
self.LOGGER.debug("Copy {} in {}".format(href, target_directory))
AccessManager.pull(href, target_directory)
AccessManager.pull(href, target_directory, True)
if item.assets and item.assets.get(Role.extent.value) and AccessManager.exists(
item.assets.get(Role.extent.value).href):
self.LOGGER.debug("Geo file {} detected and copied".format(item.assets.get(Role.extent.value).href))
AccessManager.pull(item.assets.get(Role.extent.value).href, target_directory)
AccessManager.pull(item.assets.get(Role.extent.value).href, target_directory, True)
if item.assets and item.assets.get(Role.metadata.value) and AccessManager.exists(
item.assets.get(Role.metadata.value).href):
self.LOGGER.debug("Metadata {} detected and copied".format(item.assets.get(Role.metadata.value).href))
AccessManager.pull(item.assets.get(Role.metadata.value).href, target_directory)
AccessManager.pull(item.assets.get(Role.metadata.value).href, target_directory, True)
else:
AccessManager.zip(href, target_directory)
return
Expand All @@ -65,9 +65,9 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str,
if item.assets and item.assets.get(Role.extent.value) is not None and AccessManager.exists(item.assets.get(Role.extent.value).href):
geo_ext_file = item.assets.get(Role.extent.value).href
self.LOGGER.info("Copy {} to {}".format(geo_ext_file, target_directory))
AccessManager.pull(geo_ext_file, target_directory)
AccessManager.pull(geo_ext_file, target_directory, True)
self.LOGGER.debug("Copy {} in {}".format(href, target_directory))
AccessManager.pull(href, target_directory)
AccessManager.pull(href, target_directory, True)
return
if target_projection == 'native':
target_projection = item.properties.proj__epsg
Expand Down
9 changes: 4 additions & 5 deletions extensions/aproc/proc/download/drivers/impl/met_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ def copy_from_terrasarx(self, href: str, target_directory: str, extension: str):

def copy_from_met(self, files: list[tuple[str, str, str, str]], target_directory: str):
for f in files:
storage = AccessManager.resolve_storage(f[0])
if AccessManager.exists(f[0]) and (os.path.isfile(f[0]) if storage.type == "file" else True):
AccessManager.pull(f[0], target_directory + "/" + f[1])
if AccessManager.exists(f[2]) and (os.path.isfile(f[2]) if storage.type == "file" else True):
AccessManager.pull(f[2], target_directory + "/" + f[3])
if AccessManager.exists(f[0]) and AccessManager.is_file(f[0]):
AccessManager.pull(f[0], target_directory + "/" + f[1], False)
if AccessManager.exists(f[2]) and AccessManager.is_file(f[2]):
AccessManager.pull(f[2], target_directory + "/" + f[3], False)
2 changes: 1 addition & 1 deletion extensions/aproc/proc/download/drivers/impl/simple_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str,
href = self.get_asset_href(item)
file_name = get_file_name(item.id)

AccessManager.pull(href, os.path.join(target_directory, file_name))
AccessManager.pull(href, os.path.join(target_directory, file_name), False)
3 changes: 1 addition & 2 deletions extensions/aproc/proc/download/drivers/impl/zarr_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,14 @@ def fetch_and_transform(self, item: Item, target_directory: str, crop_wkt: str,

if AccessManager.is_download_required(asset_href):
self.LOGGER.info("Downloading archive for Zarr creation.")
storage = AccessManager.resolve_storage(asset_href)

# Create tmp file where data will be downloaded
tmp_asset = os.path.join(tempfile.gettempdir(), os.path.basename(asset_href))
if (os.path.splitext(tmp_asset)[1] != ".zip"):
tmp_asset = os.path.splitext(tmp_asset)[0] + ".zip"

# Download archive then extract it
storage.pull(asset_href, tmp_asset)
AccessManager.pull(asset_href, tmp_asset, False)
raster_files = self.__find_raster_files(tmp_asset)

asset_href = f"file://{tmp_asset}"
Expand Down
2 changes: 1 addition & 1 deletion extensions/aproc/proc/enrich/drivers/impl/safe.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __download_TCI(self, href: str):
tmp_file = tempfile.NamedTemporaryFile("w+", suffix=".zip", delete=False).name

# Download archive then extract it
storage.pull(href, tmp_file)
storage.pull(href, tmp_file, False)
tci_file_path = self.__extract(tmp_file)

# Clean-up
Expand Down
Loading

0 comments on commit b4ea875

Please sign in to comment.