Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow including directory name in file names #31

Merged
merged 2 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion src/pytroll_watchers/dataspace_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,37 @@
Note:
The OData and S3 services require two different set of credentials.


Example of configuration file to retrieve SAR data from dataspace:

.. code-block:: yaml


backend: dataspace
fs_config:
filter_string: "contains(Name,'IW_GRDH')"
dataspace_auth:
netrc_host: catalogue.dataspace.copernicus.eu
storage_options:
profile: copernicus
polling_interval:
minutes: 10
start_from:
hours: 1
publisher_config:
name: s1_watcher
nameservers: false
port: 3000
message_config:
unpack: directory
include_dir_in_uid: true
subject: /segment/s1/l1b/
atype: file
aliases:
sensor:
SAR: SAR-C


"""

import datetime
Expand Down Expand Up @@ -135,7 +166,7 @@ def generate_download_links(filter_string, dataspace_auth, storage_options):
mda["orbit_number"] = int(attributes["orbitNumber"])

for checksum in metadata["Checksum"]:
if checksum["Algorithm"] == "MD5":
if checksum.get("Algorithm") == "MD5":
mda["checksum"] = dict(algorithm=checksum["Algorithm"], hash=checksum["Value"])
break
mda["size"] = int(metadata["ContentLength"])
Expand Down
6 changes: 3 additions & 3 deletions src/pytroll_watchers/dhus_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
name: s1_watcher
message_config:
subject: /segment/s1/l1b/
atype: file
atype: dataset
unpack: zip
aliases:
sensor:
SAR: SAR-C


"""

import datetime as dt
Expand Down Expand Up @@ -129,7 +129,7 @@ def generate_download_links(server, filter_params):

for entry in entries["d"]["results"]:
mda = dict()
path = UPath(entry["__metadata"]["media_src"])
path = UPath(entry["__metadata"]["media_src"], client_kwargs=dict(trust_env=True))
mda["boundary"] = _extract_boundary_as_geojson(entry)
attributes = _construct_attributes_dict(entry)
mda["platform_name"] = attributes["Satellite name"].capitalize() + attributes["Satellite number"]
Expand Down
17 changes: 13 additions & 4 deletions src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
If it contains "unpack", it is expected to have the archive type (eg "zip"), or "directory", and the
contents of the archive or directory will be published as a "dataset".
contents of the archive or directory will be published as a "dataset". For the case where "directory" is
used, it is also possible to set the boolean "include_dir_in_uid" to true so that the full relative path
of the file is provided.

Side effect:
Publishes posttroll messages containing the location of the file with the following fields:
Expand All @@ -47,12 +49,15 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
publisher = create_publisher_from_dict_config(publisher_config)
publisher.start()
unpack = message_config.pop("unpack", None)
include_dir = message_config.pop("include_dir_in_uid", None)
with closing(publisher):
for file_item, file_metadata in generator:
amended_message_config = deepcopy(message_config)
amended_message_config.setdefault("data", {})
if unpack == "directory":
dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack_dir(file_item)]
dir_to_include = file_item.name if include_dir else None
dataset = [_build_file_location(unpacked_file, dir_to_include)
for unpacked_file in unpack_dir(file_item)]
amended_message_config["data"]["dataset"] = dataset
elif unpack:
dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack_archive(file_item, unpack)]
Expand Down Expand Up @@ -91,10 +96,14 @@ def unpack_dir(path):
**path.storage_options)


def _build_file_location(file_item):
def _build_file_location(file_item, include_dir=None):
file_location = dict()
file_location["uri"] = file_item.as_uri()
file_location["uid"] = file_item.name
if include_dir:
uid = include_dir + file_item.path.rsplit(include_dir, 1)[-1]
else:
uid = file_item.name
file_location["uid"] = uid
with suppress(AttributeError):
file_location["filesystem"] = json.loads(file_item.fs.to_json())
file_location["path"] = file_item.path
Expand Down
5 changes: 4 additions & 1 deletion tests/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
from shutil import make_archive

from posttroll.message import Message
from posttroll.testing import patched_publisher
from upath import UPath

Expand Down Expand Up @@ -52,7 +53,7 @@ def test_unpacking_directory(tmp_path):

publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/olci/l2/", atype="dataset", data=dict(sensor="olci"),
unpack="directory")
unpack="directory", include_dir_in_uid=True)

with patched_publisher() as messages:
file_publisher_from_generator([[path, dict()]],
Expand All @@ -61,3 +62,5 @@ def test_unpacking_directory(tmp_path):

assert "my_dir/file1" in messages[0]
assert "my_dir/file2" in messages[0]
msg = Message.decode(messages[0])
assert msg.data["dataset"][0]["uid"].startswith("my_dir")
Loading