Skip to content

Commit

Permalink
Merge pull request #17 from mraspaud/add-unpacking
Browse files Browse the repository at this point in the history
Add unpacking possibilities to the watcher
  • Loading branch information
mraspaud authored Jun 5, 2024
2 parents bcbf996 + 263cf85 commit 519c8d4
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 6 deletions.
13 changes: 13 additions & 0 deletions docs/source/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,16 @@ The command-line tool can be used by invoking `pytroll-watcher <config-file>`. A
aliases:
platform_name:
npp: Suomi-NPP

Unpacking of the file into it's component and subsequent publishing is achieved by passing the archive format
in the message config part, for example::

message_config:
subject: /segment/viirs/l1b/
atype: dataset
archive_format: zip
data:
sensor: viirs
aliases:
platform_name:
npp: Suomi-NPP
38 changes: 32 additions & 6 deletions src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from posttroll.message import Message
from posttroll.publisher import create_publisher_from_dict_config
from trollsift import parse
from upath import UPath

logger = logging.getLogger(__name__)

Expand All @@ -25,7 +26,9 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
(filename, file_metadata).
publisher_config: The configuration dictionary to pass to the posttroll publishing functions.
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
with the file metadata, and passed directly to posttroll's Message constructor.
If it contains "archive_format", it is expected to have the archive type, and the contents of the archive
will be published as a "dataset".
Side effect:
Publishes posttroll messages containing the location of the file with the following fields:
Expand All @@ -43,15 +46,19 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
""" # noqa
publisher = create_publisher_from_dict_config(publisher_config)
publisher.start()
archive_format = message_config.pop("archive_format", None)
with closing(publisher):
for file_item, file_metadata in generator:
amended_message_config = deepcopy(message_config)
amended_message_config.setdefault("data", {})
amended_message_config["data"]["uri"] = file_item.as_uri()
amended_message_config["data"]["uid"] = file_item.name
with suppress(AttributeError):
amended_message_config["data"]["filesystem"] = json.loads(file_item.fs.to_json())
amended_message_config["data"]["path"] = file_item.path

if archive_format:
dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack(file_item, archive_format)]
amended_message_config["data"]["dataset"] = dataset
else:
file_location = _build_file_location(file_item)
amended_message_config["data"].update(file_location)

aliases = amended_message_config.pop("aliases", {})
apply_aliases(aliases, file_metadata)
amended_message_config["data"].update(file_metadata)
Expand All @@ -60,6 +67,25 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
publisher.send(str(msg))


def unpack(path, archive_format):
"""Unpack the path and yield the extracted filenames."""
import fsspec
fs = fsspec.get_filesystem_class(archive_format)(fsspec.open(path))
files = fs.find("/")
for fi in files:
yield UPath(fi, protocol=archive_format, target_protocol=path.protocol, fo=path.as_uri())


def _build_file_location(file_item):
file_location = dict()
file_location["uri"] = file_item.as_uri()
file_location["uid"] = file_item.name
with suppress(AttributeError):
file_location["filesystem"] = json.loads(file_item.fs.to_json())
file_location["path"] = file_item.path
return file_location


def apply_aliases(aliases, metadata):
"""Apply aliases to the metadata.
Expand Down
36 changes: 36 additions & 0 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Tests for the publisher functionality."""

import os
from shutil import make_archive

from posttroll.testing import patched_publisher
from pytroll_watchers.publisher import file_publisher_from_generator
from upath import UPath


def test_unpacking(tmp_path):
"""Test unpacking the watched file's components to a dataset message."""
file1 = tmp_path / "to_zip" / "file1"
file2 = tmp_path / "to_zip" / "file2"
zip_file = tmp_path / "archived.zip"

file1.parent.mkdir()

open(file1, "a").close()
open(file2, "a").close()

make_archive(os.path.splitext(zip_file)[0], "zip", tmp_path / "to_zip")

path = UPath("file://" + str(zip_file))

publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/olci/l2/", atype="dataset", data=dict(sensor="olci"),
archive_format="zip")

with patched_publisher() as messages:
file_publisher_from_generator([[path, dict()]],
publisher_config=publisher_settings,
message_config=message_settings)

assert "zip://file1" in messages[0]
assert "zip://file2" in messages[0]

0 comments on commit 519c8d4

Please sign in to comment.