Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unpacking possibilities to the watcher #17

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/source/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,16 @@ The command-line tool can be used by invoking `pytroll-watcher <config-file>`. A
aliases:
platform_name:
npp: Suomi-NPP

Unpacking of the file into it's component and subsequent publishing is achieved by passing the archive format
in the message config part, for example::

message_config:
subject: /segment/viirs/l1b/
atype: dataset
archive_format: zip
data:
sensor: viirs
aliases:
platform_name:
npp: Suomi-NPP
38 changes: 32 additions & 6 deletions src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from posttroll.message import Message
from posttroll.publisher import create_publisher_from_dict_config
from trollsift import parse
from upath import UPath

logger = logging.getLogger(__name__)

Expand All @@ -25,7 +26,9 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
(filename, file_metadata).
publisher_config: The configuration dictionary to pass to the posttroll publishing functions.
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
with the file metadata, and passed directly to posttroll's Message constructor.
If it contains "archive_format", it is expected to have the archive type, and the contents of the archive
will be published as a "dataset".

Side effect:
Publishes posttroll messages containing the location of the file with the following fields:
Expand All @@ -43,15 +46,19 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
""" # noqa
publisher = create_publisher_from_dict_config(publisher_config)
publisher.start()
archive_format = message_config.pop("archive_format", None)
with closing(publisher):
for file_item, file_metadata in generator:
amended_message_config = deepcopy(message_config)
amended_message_config.setdefault("data", {})
amended_message_config["data"]["uri"] = file_item.as_uri()
amended_message_config["data"]["uid"] = file_item.name
with suppress(AttributeError):
amended_message_config["data"]["filesystem"] = json.loads(file_item.fs.to_json())
amended_message_config["data"]["path"] = file_item.path

if archive_format:
dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack(file_item, archive_format)]
amended_message_config["data"]["dataset"] = dataset
else:
file_location = _build_file_location(file_item)
amended_message_config["data"].update(file_location)

aliases = amended_message_config.pop("aliases", {})
apply_aliases(aliases, file_metadata)
amended_message_config["data"].update(file_metadata)
Expand All @@ -60,6 +67,25 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
publisher.send(str(msg))


def unpack(path, archive_format):
"""Unpack the path and yield the extracted filenames."""
import fsspec
fs = fsspec.get_filesystem_class(archive_format)(fsspec.open(path))
files = fs.find("/")
for fi in files:
yield UPath(fi, protocol=archive_format, target_protocol=path.protocol, fo=path.as_uri())


def _build_file_location(file_item):
file_location = dict()
file_location["uri"] = file_item.as_uri()
file_location["uid"] = file_item.name
with suppress(AttributeError):
file_location["filesystem"] = json.loads(file_item.fs.to_json())
file_location["path"] = file_item.path
return file_location


def apply_aliases(aliases, metadata):
"""Apply aliases to the metadata.

Expand Down
36 changes: 36 additions & 0 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Tests for the publisher functionality."""

import os
from shutil import make_archive

from posttroll.testing import patched_publisher
from pytroll_watchers.publisher import file_publisher_from_generator
from upath import UPath


def test_unpacking(tmp_path):
"""Test unpacking the watched file's components to a dataset message."""
file1 = tmp_path / "to_zip" / "file1"
file2 = tmp_path / "to_zip" / "file2"
zip_file = tmp_path / "archived.zip"

file1.parent.mkdir()

open(file1, "a").close()
open(file2, "a").close()

make_archive(os.path.splitext(zip_file)[0], "zip", tmp_path / "to_zip")

path = UPath("file://" + str(zip_file))

publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/olci/l2/", atype="dataset", data=dict(sensor="olci"),
archive_format="zip")

with patched_publisher() as messages:
file_publisher_from_generator([[path, dict()]],
publisher_config=publisher_settings,
message_config=message_settings)

assert "zip://file1" in messages[0]
assert "zip://file2" in messages[0]
Loading