diff --git a/docs/source/cli.rst b/docs/source/cli.rst index 885c4e4..6b0b3c0 100644 --- a/docs/source/cli.rst +++ b/docs/source/cli.rst @@ -19,3 +19,16 @@ The command-line tool can be used by invoking `pytroll-watcher `. A aliases: platform_name: npp: Suomi-NPP + +Unpacking of the file into it's component and subsequent publishing is achieved by passing the archive format +in the message config part, for example:: + + message_config: + subject: /segment/viirs/l1b/ + atype: dataset + archive_format: zip + data: + sensor: viirs + aliases: + platform_name: + npp: Suomi-NPP diff --git a/src/pytroll_watchers/publisher.py b/src/pytroll_watchers/publisher.py index 0d8beba..6d34e7e 100644 --- a/src/pytroll_watchers/publisher.py +++ b/src/pytroll_watchers/publisher.py @@ -9,6 +9,7 @@ from posttroll.message import Message from posttroll.publisher import create_publisher_from_dict_config from trollsift import parse +from upath import UPath logger = logging.getLogger(__name__) @@ -25,7 +26,9 @@ def file_publisher_from_generator(generator, publisher_config, message_config): (filename, file_metadata). publisher_config: The configuration dictionary to pass to the posttroll publishing functions. message_config: The information needed to complete the posttroll message generation. Will be amended - with the file metadata, and passed directly to posttroll's Message constructor. + with the file metadata, and passed directly to posttroll's Message constructor. + If it contains "archive_format", it is expected to have the archive type, and the contents of the archive + will be published as a "dataset". Side effect: Publishes posttroll messages containing the location of the file with the following fields: @@ -43,15 +46,19 @@ def file_publisher_from_generator(generator, publisher_config, message_config): """ # noqa publisher = create_publisher_from_dict_config(publisher_config) publisher.start() + archive_format = message_config.pop("archive_format", None) with closing(publisher): for file_item, file_metadata in generator: amended_message_config = deepcopy(message_config) amended_message_config.setdefault("data", {}) - amended_message_config["data"]["uri"] = file_item.as_uri() - amended_message_config["data"]["uid"] = file_item.name - with suppress(AttributeError): - amended_message_config["data"]["filesystem"] = json.loads(file_item.fs.to_json()) - amended_message_config["data"]["path"] = file_item.path + + if archive_format: + dataset = [_build_file_location(unpacked_file) for unpacked_file in unpack(file_item, archive_format)] + amended_message_config["data"]["dataset"] = dataset + else: + file_location = _build_file_location(file_item) + amended_message_config["data"].update(file_location) + aliases = amended_message_config.pop("aliases", {}) apply_aliases(aliases, file_metadata) amended_message_config["data"].update(file_metadata) @@ -60,6 +67,25 @@ def file_publisher_from_generator(generator, publisher_config, message_config): publisher.send(str(msg)) +def unpack(path, archive_format): + """Unpack the path and yield the extracted filenames.""" + import fsspec + fs = fsspec.get_filesystem_class(archive_format)(fsspec.open(path)) + files = fs.find("/") + for fi in files: + yield UPath(fi, protocol=archive_format, target_protocol=path.protocol, fo=path.as_uri()) + + +def _build_file_location(file_item): + file_location = dict() + file_location["uri"] = file_item.as_uri() + file_location["uid"] = file_item.name + with suppress(AttributeError): + file_location["filesystem"] = json.loads(file_item.fs.to_json()) + file_location["path"] = file_item.path + return file_location + + def apply_aliases(aliases, metadata): """Apply aliases to the metadata. diff --git a/tests/test_publisher.py b/tests/test_publisher.py new file mode 100644 index 0000000..aeb1a5d --- /dev/null +++ b/tests/test_publisher.py @@ -0,0 +1,36 @@ +"""Tests for the publisher functionality.""" + +import os +from shutil import make_archive + +from posttroll.testing import patched_publisher +from pytroll_watchers.publisher import file_publisher_from_generator +from upath import UPath + + +def test_unpacking(tmp_path): + """Test unpacking the watched file's components to a dataset message.""" + file1 = tmp_path / "to_zip" / "file1" + file2 = tmp_path / "to_zip" / "file2" + zip_file = tmp_path / "archived.zip" + + file1.parent.mkdir() + + open(file1, "a").close() + open(file2, "a").close() + + make_archive(os.path.splitext(zip_file)[0], "zip", tmp_path / "to_zip") + + path = UPath("file://" + str(zip_file)) + + publisher_settings = dict(nameservers=False, port=1979) + message_settings = dict(subject="/segment/olci/l2/", atype="dataset", data=dict(sensor="olci"), + archive_format="zip") + + with patched_publisher() as messages: + file_publisher_from_generator([[path, dict()]], + publisher_config=publisher_settings, + message_config=message_settings) + + assert "zip://file1" in messages[0] + assert "zip://file2" in messages[0]