Skip to content

Commit

Permalink
Merge pull request #30 from mraspaud/feature-unpack-dirs
Browse files Browse the repository at this point in the history
Rename archive_format to unpack
  • Loading branch information
mraspaud authored Sep 12, 2024
2 parents 47432c8 + cfa05fa commit d9312c4
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/source/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ in the message config part, for example::
message_config:
subject: /segment/viirs/l1b/
atype: dataset
archive_format: zip
unpack: zip
data:
sensor: viirs
aliases:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Utility functions and scripts to watch for new files on local or
authors = [
{ name = "Martin Raspaud", email = "[email protected]" }
]
dependencies = ["universal-pathlib", "trollsift", "pyyaml", "geojson"]
dependencies = ["universal-pathlib>=0.2.5", "trollsift", "pyyaml", "geojson"]
readme = "README.md"
requires-python = ">= 3.10"
license = {file = "LICENSE.txt"}
Expand Down
5 changes: 2 additions & 3 deletions src/pytroll_watchers/local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
from urllib.parse import urlunparse

from upath import UPath
from upath._flavour import WrappedFileSystemFlavour

from pytroll_watchers.backends.local import listen_to_local_events
from pytroll_watchers.publisher import SecurityError, file_publisher_from_generator, parse_metadata

# This is a workaround for a but in universal_pathlib, see
WrappedFileSystemFlavour.protocol_config["netloc_is_anchor"].add("ssh")
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -76,6 +73,8 @@ def file_generator(directory, observer_type="os", file_pattern=None, protocol=No
continue
if protocol is not None:
uri = urlunparse((protocol, None, str(path), None, None, None))
if storage_options is None:
storage_options = dict()
yield UPath(uri, **storage_options), file_metadata
else:
yield Path(path), file_metadata
29 changes: 20 additions & 9 deletions src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
publisher_config: The configuration dictionary to pass to the posttroll publishing functions.
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
If it contains "archive_format", it is expected to have the archive type, and the contents of the archive
will be published as a "dataset".
If it contains "unpack", it is expected to have the archive type (eg "zip"), or "directory", and the
contents of the archive or directory will be published as a "dataset".
Side effect:
Publishes posttroll messages containing the location of the file with the following fields:
Expand All @@ -46,14 +46,16 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
""" # noqa
publisher = create_publisher_from_dict_config(publisher_config)
publisher.start()
archive_format = message_config.pop("archive_format", None)
unpack = message_config.pop("unpack", None)
with closing(publisher):
for file_item, file_metadata in generator:
amended_message_config = deepcopy(message_config)
amended_message_config.setdefault("data", {})

if archive_format:
dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack(file_item, archive_format)]
if unpack == "directory":
dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack_dir(file_item)]
amended_message_config["data"]["dataset"] = dataset
elif unpack:
dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack_archive(file_item, unpack)]
amended_message_config["data"]["dataset"] = dataset
else:
file_location = _build_file_location(file_item)
Expand All @@ -67,19 +69,28 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
publisher.send(str(msg))


def unpack(path, archive_format):
def unpack_archive(path, unpack):
"""Unpack the path and yield the extracted filenames."""
import fsspec
fs = fsspec.get_filesystem_class(archive_format)(fsspec.open(path.path, **path.storage_options))
fs = fsspec.get_filesystem_class(unpack)(fsspec.open(path.path, **path.storage_options))
files = fs.find("/")
for fi in files:
yield UPath(fi,
protocol=archive_format,
protocol=unpack,
target_protocol=path.protocol,
target_options=path.storage_options,
fo=path.as_uri())


def unpack_dir(path):
"""Unpack the directory and generate the files it contains (recursively)."""
files = path.fs.find(path.path)
for fi in files:
yield UPath(fi,
protocol=path.protocol,
**path.storage_options)


def _build_file_location(file_item):
file_location = dict()
file_location["uri"] = file_item.as_uri()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_publish_paths(tmp_path, patched_local_events, caplog): # noqa
message = Message(rawstr=messages[0])
assert message.data["uri"] == f"file://{str(tmp_path)}/foo.txt"
assert message.data["sensor"] == "viirs"
assert "fs" not in message.data
assert "filesystem" not in message.data
assert f"Starting watch on '{local_settings['directory']}'" in caplog.text


Expand Down
28 changes: 27 additions & 1 deletion tests/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_unpacking(tmp_path):

publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/olci/l2/", atype="dataset", data=dict(sensor="olci"),
archive_format="zip")
unpack="zip")

with patched_publisher() as messages:
file_publisher_from_generator([[path, dict()]],
Expand All @@ -35,3 +35,29 @@ def test_unpacking(tmp_path):

assert "zip://file1" in messages[0]
assert "zip://file2" in messages[0]


def test_unpacking_directory(tmp_path):
"""Test unpacking the watched directory's components to a dataset message."""
file1 = tmp_path / "my_dir" / "file1"
file2 = tmp_path / "my_dir" / "file2"

dirpath = file1.parent
dirpath.mkdir()

open(file1, "a").close()
open(file2, "a").close()

path = UPath("file://" + str(dirpath))

publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/olci/l2/", atype="dataset", data=dict(sensor="olci"),
unpack="directory")

with patched_publisher() as messages:
file_publisher_from_generator([[path, dict()]],
publisher_config=publisher_settings,
message_config=message_settings)

assert "my_dir/file1" in messages[0]
assert "my_dir/file2" in messages[0]

0 comments on commit d9312c4

Please sign in to comment.