Skip to content

Commit

Permalink
Merge pull request #31 from mraspaud/feature-unpack-dirs
Browse files Browse the repository at this point in the history
Allow including directory name in file names
  • Loading branch information
mraspaud authored Sep 13, 2024
2 parents d9312c4 + d9c937e commit b533065
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 9 deletions.
33 changes: 32 additions & 1 deletion src/pytroll_watchers/dataspace_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,37 @@
Note:
The OData and S3 services require two different set of credentials.
Example of configuration file to retrieve SAR data from dataspace:
.. code-block:: yaml
backend: dataspace
fs_config:
filter_string: "contains(Name,'IW_GRDH')"
dataspace_auth:
netrc_host: catalogue.dataspace.copernicus.eu
storage_options:
profile: copernicus
polling_interval:
minutes: 10
start_from:
hours: 1
publisher_config:
name: s1_watcher
nameservers: false
port: 3000
message_config:
unpack: directory
include_dir_in_uid: true
subject: /segment/s1/l1b/
atype: file
aliases:
sensor:
SAR: SAR-C
"""

import datetime
Expand Down Expand Up @@ -135,7 +166,7 @@ def generate_download_links(filter_string, dataspace_auth, storage_options):
mda["orbit_number"] = int(attributes["orbitNumber"])

for checksum in metadata["Checksum"]:
if checksum["Algorithm"] == "MD5":
if checksum.get("Algorithm") == "MD5":
mda["checksum"] = dict(algorithm=checksum["Algorithm"], hash=checksum["Value"])
break
mda["size"] = int(metadata["ContentLength"])
Expand Down
6 changes: 3 additions & 3 deletions src/pytroll_watchers/dhus_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
name: s1_watcher
message_config:
subject: /segment/s1/l1b/
atype: file
atype: dataset
unpack: zip
aliases:
sensor:
SAR: SAR-C
"""

import datetime as dt
Expand Down Expand Up @@ -129,7 +129,7 @@ def generate_download_links(server, filter_params):

for entry in entries["d"]["results"]:
mda = dict()
path = UPath(entry["__metadata"]["media_src"])
path = UPath(entry["__metadata"]["media_src"], client_kwargs=dict(trust_env=True))
mda["boundary"] = _extract_boundary_as_geojson(entry)
attributes = _construct_attributes_dict(entry)
mda["platform_name"] = attributes["Satellite name"].capitalize() + attributes["Satellite number"]
Expand Down
17 changes: 13 additions & 4 deletions src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
If it contains "unpack", it is expected to have the archive type (eg "zip"), or "directory", and the
contents of the archive or directory will be published as a "dataset".
contents of the archive or directory will be published as a "dataset". For the case where "directory" is
used, it is also possible to set the boolean "include_dir_in_uid" to true so that the full relative path
of the file is provided.
Side effect:
Publishes posttroll messages containing the location of the file with the following fields:
Expand All @@ -47,12 +49,15 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
publisher = create_publisher_from_dict_config(publisher_config)
publisher.start()
unpack = message_config.pop("unpack", None)
include_dir = message_config.pop("include_dir_in_uid", None)
with closing(publisher):
for file_item, file_metadata in generator:
amended_message_config = deepcopy(message_config)
amended_message_config.setdefault("data", {})
if unpack == "directory":
dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack_dir(file_item)]
dir_to_include = file_item.name if include_dir else None
dataset = [_build_file_location(unpacked_file, dir_to_include)
for unpacked_file in unpack_dir(file_item)]
amended_message_config["data"]["dataset"] = dataset
elif unpack:
dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack_archive(file_item, unpack)]
Expand Down Expand Up @@ -91,10 +96,14 @@ def unpack_dir(path):
**path.storage_options)


def _build_file_location(file_item):
def _build_file_location(file_item, include_dir=None):
file_location = dict()
file_location["uri"] = file_item.as_uri()
file_location["uid"] = file_item.name
if include_dir:
uid = include_dir + file_item.path.rsplit(include_dir, 1)[-1]
else:
uid = file_item.name
file_location["uid"] = uid
with suppress(AttributeError):
file_location["filesystem"] = json.loads(file_item.fs.to_json())
file_location["path"] = file_item.path
Expand Down
5 changes: 4 additions & 1 deletion tests/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
from shutil import make_archive

from posttroll.message import Message
from posttroll.testing import patched_publisher
from upath import UPath

Expand Down Expand Up @@ -52,7 +53,7 @@ def test_unpacking_directory(tmp_path):

publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/olci/l2/", atype="dataset", data=dict(sensor="olci"),
unpack="directory")
unpack="directory", include_dir_in_uid=True)

with patched_publisher() as messages:
file_publisher_from_generator([[path, dict()]],
Expand All @@ -61,3 +62,5 @@ def test_unpacking_directory(tmp_path):

assert "my_dir/file1" in messages[0]
assert "my_dir/file2" in messages[0]
msg = Message.decode(messages[0])
assert msg.data["dataset"][0]["uid"].startswith("my_dir")

0 comments on commit b533065

Please sign in to comment.