From 2f2439dfc183a37b4499ebe2d30e8dd5f296dde8 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Wed, 11 Sep 2024 18:46:13 +0200 Subject: [PATCH 1/3] Rename archive_format to unpack --- docs/source/cli.rst | 2 +- src/pytroll_watchers/publisher.py | 17 ++++++++--------- tests/test_publisher.py | 2 +- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/docs/source/cli.rst b/docs/source/cli.rst index 6b0b3c0..f857eab 100644 --- a/docs/source/cli.rst +++ b/docs/source/cli.rst @@ -26,7 +26,7 @@ in the message config part, for example:: message_config: subject: /segment/viirs/l1b/ atype: dataset - archive_format: zip + unpack: zip data: sensor: viirs aliases: diff --git a/src/pytroll_watchers/publisher.py b/src/pytroll_watchers/publisher.py index 259cd2f..18c0059 100644 --- a/src/pytroll_watchers/publisher.py +++ b/src/pytroll_watchers/publisher.py @@ -27,8 +27,8 @@ def file_publisher_from_generator(generator, publisher_config, message_config): publisher_config: The configuration dictionary to pass to the posttroll publishing functions. message_config: The information needed to complete the posttroll message generation. Will be amended with the file metadata, and passed directly to posttroll's Message constructor. - If it contains "archive_format", it is expected to have the archive type, and the contents of the archive - will be published as a "dataset". + If it contains "unpack", it is expected to have the archive type (eg "zip"), or "directory", and the + contents of the archive or directory will be published as a "dataset". Side effect: Publishes posttroll messages containing the location of the file with the following fields: @@ -46,14 +46,13 @@ def file_publisher_from_generator(generator, publisher_config, message_config): """ # noqa publisher = create_publisher_from_dict_config(publisher_config) publisher.start() - archive_format = message_config.pop("archive_format", None) + unpack = message_config.pop("unpack", None) with closing(publisher): for file_item, file_metadata in generator: amended_message_config = deepcopy(message_config) amended_message_config.setdefault("data", {}) - - if archive_format: - dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack(file_item, archive_format)] + if unpack: + dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack_archive(file_item, unpack)] amended_message_config["data"]["dataset"] = dataset else: file_location = _build_file_location(file_item) @@ -67,14 +66,14 @@ def file_publisher_from_generator(generator, publisher_config, message_config): publisher.send(str(msg)) -def unpack(path, archive_format): +def unpack_archive(path, unpack): """Unpack the path and yield the extracted filenames.""" import fsspec - fs = fsspec.get_filesystem_class(archive_format)(fsspec.open(path.path, **path.storage_options)) + fs = fsspec.get_filesystem_class(unpack)(fsspec.open(path.path, **path.storage_options)) files = fs.find("/") for fi in files: yield UPath(fi, - protocol=archive_format, + protocol=unpack, target_protocol=path.protocol, target_options=path.storage_options, fo=path.as_uri()) diff --git a/tests/test_publisher.py b/tests/test_publisher.py index dd13169..84835ac 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -26,7 +26,7 @@ def test_unpacking(tmp_path): publisher_settings = dict(nameservers=False, port=1979) message_settings = dict(subject="/segment/olci/l2/", atype="dataset", data=dict(sensor="olci"), - archive_format="zip") + unpack="zip") with patched_publisher() as messages: file_publisher_from_generator([[path, dict()]], From 18f761b443c5ca19f11eaf9071a3977edbcf09cb Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Thu, 12 Sep 2024 08:54:28 +0200 Subject: [PATCH 2/3] Alow unpacking directories --- src/pytroll_watchers/local_watcher.py | 4 +++- src/pytroll_watchers/publisher.py | 14 +++++++++++++- tests/test_local_watcher.py | 2 +- tests/test_publisher.py | 26 ++++++++++++++++++++++++++ 4 files changed, 43 insertions(+), 3 deletions(-) diff --git a/src/pytroll_watchers/local_watcher.py b/src/pytroll_watchers/local_watcher.py index 80e0426..b10904d 100644 --- a/src/pytroll_watchers/local_watcher.py +++ b/src/pytroll_watchers/local_watcher.py @@ -13,7 +13,7 @@ from pytroll_watchers.backends.local import listen_to_local_events from pytroll_watchers.publisher import SecurityError, file_publisher_from_generator, parse_metadata -# This is a workaround for a but in universal_pathlib, see +# This is a workaround for a bug in universal_pathlib, see WrappedFileSystemFlavour.protocol_config["netloc_is_anchor"].add("ssh") logger = logging.getLogger(__name__) @@ -76,6 +76,8 @@ def file_generator(directory, observer_type="os", file_pattern=None, protocol=No continue if protocol is not None: uri = urlunparse((protocol, None, str(path), None, None, None)) + if storage_options is None: + storage_options = dict() yield UPath(uri, **storage_options), file_metadata else: yield Path(path), file_metadata diff --git a/src/pytroll_watchers/publisher.py b/src/pytroll_watchers/publisher.py index 18c0059..bf835e3 100644 --- a/src/pytroll_watchers/publisher.py +++ b/src/pytroll_watchers/publisher.py @@ -51,7 +51,10 @@ def file_publisher_from_generator(generator, publisher_config, message_config): for file_item, file_metadata in generator: amended_message_config = deepcopy(message_config) amended_message_config.setdefault("data", {}) - if unpack: + if unpack == "directory": + dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack_dir(file_item)] + amended_message_config["data"]["dataset"] = dataset + elif unpack: dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack_archive(file_item, unpack)] amended_message_config["data"]["dataset"] = dataset else: @@ -79,6 +82,15 @@ def unpack_archive(path, unpack): fo=path.as_uri()) +def unpack_dir(path): + """Unpack the directory and generate the files it contains (recursively).""" + files = path.fs.find(path.path) + for fi in files: + yield UPath(fi, + protocol=path.protocol, + **path.storage_options) + + def _build_file_location(file_item): file_location = dict() file_location["uri"] = file_item.as_uri() diff --git a/tests/test_local_watcher.py b/tests/test_local_watcher.py index 7440771..f82db53 100644 --- a/tests/test_local_watcher.py +++ b/tests/test_local_watcher.py @@ -97,7 +97,7 @@ def test_publish_paths(tmp_path, patched_local_events, caplog): # noqa message = Message(rawstr=messages[0]) assert message.data["uri"] == f"file://{str(tmp_path)}/foo.txt" assert message.data["sensor"] == "viirs" - assert "fs" not in message.data + assert "filesystem" not in message.data assert f"Starting watch on '{local_settings['directory']}'" in caplog.text diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 84835ac..1eb39b7 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -35,3 +35,29 @@ def test_unpacking(tmp_path): assert "zip://file1" in messages[0] assert "zip://file2" in messages[0] + + +def test_unpacking_directory(tmp_path): + """Test unpacking the watched directory's components to a dataset message.""" + file1 = tmp_path / "my_dir" / "file1" + file2 = tmp_path / "my_dir" / "file2" + + dirpath = file1.parent + dirpath.mkdir() + + open(file1, "a").close() + open(file2, "a").close() + + path = UPath("file://" + str(dirpath)) + + publisher_settings = dict(nameservers=False, port=1979) + message_settings = dict(subject="/segment/olci/l2/", atype="dataset", data=dict(sensor="olci"), + unpack="directory") + + with patched_publisher() as messages: + file_publisher_from_generator([[path, dict()]], + publisher_config=publisher_settings, + message_config=message_settings) + + assert "my_dir/file1" in messages[0] + assert "my_dir/file2" in messages[0] From cfa05fa9e5915c9cc85b14f1cc2a8cb0e7bacb03 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Thu, 12 Sep 2024 10:07:32 +0200 Subject: [PATCH 3/3] Fix minimal upath version --- pyproject.toml | 2 +- src/pytroll_watchers/local_watcher.py | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c936fa7..2295ebc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "Utility functions and scripts to watch for new files on local or authors = [ { name = "Martin Raspaud", email = "martin.raspaud@smhi.se" } ] -dependencies = ["universal-pathlib", "trollsift", "pyyaml", "geojson"] +dependencies = ["universal-pathlib>=0.2.5", "trollsift", "pyyaml", "geojson"] readme = "README.md" requires-python = ">= 3.10" license = {file = "LICENSE.txt"} diff --git a/src/pytroll_watchers/local_watcher.py b/src/pytroll_watchers/local_watcher.py index b10904d..0d2a233 100644 --- a/src/pytroll_watchers/local_watcher.py +++ b/src/pytroll_watchers/local_watcher.py @@ -8,13 +8,10 @@ from urllib.parse import urlunparse from upath import UPath -from upath._flavour import WrappedFileSystemFlavour from pytroll_watchers.backends.local import listen_to_local_events from pytroll_watchers.publisher import SecurityError, file_publisher_from_generator, parse_metadata -# This is a workaround for a bug in universal_pathlib, see -WrappedFileSystemFlavour.protocol_config["netloc_is_anchor"].add("ssh") logger = logging.getLogger(__name__)