From d587bf0df754e69781b6b1ea3c428628690a59c4 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 13:24:46 +0200 Subject: [PATCH 01/16] Add selector feature --- docs/source/conf.py | 8 +- docs/source/index.rst | 1 + docs/source/selector.rst | 4 + src/pytroll_watchers/selector.py | 131 +++++++++++++++++++++++++++++++ tests/test_selector.py | 124 +++++++++++++++++++++++++++++ 5 files changed, 267 insertions(+), 1 deletion(-) create mode 100644 docs/source/selector.rst create mode 100644 src/pytroll_watchers/selector.py create mode 100644 tests/test_selector.py diff --git a/docs/source/conf.py b/docs/source/conf.py index bc03029..c146706 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -15,7 +15,7 @@ # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration -extensions = ["sphinx.ext.napoleon", "sphinx.ext.autodoc"] +extensions = ["sphinx.ext.napoleon", "sphinx.ext.autodoc", "sphinx.ext.intersphinx"] autodoc_mock_imports = ["watchdog", "minio", "posttroll", "pytest", "trollsift", "universal_path", "freezegun", "responses", "oauthlib", "requests_oauthlib", "defusedxml"] @@ -29,3 +29,9 @@ html_theme = "alabaster" html_static_path = ["_static"] + +# intersphinx +intersphinx_mapping = { + "posttroll": ("https://posttroll.readthedocs.io/en/latest/", None), + "redis": ("https://redis.readthedocs.io/en/latest/", None), +} diff --git a/docs/source/index.rst b/docs/source/index.rst index 0795ad0..545665e 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -14,6 +14,7 @@ Welcome to pytroll-watchers's documentation! published backends other_api + selector Pytroll-watcher is a library and command-line tool to detect changes on a local or remote file system. diff --git a/docs/source/selector.rst b/docs/source/selector.rst new file mode 100644 index 0000000..a51fdd1 --- /dev/null +++ b/docs/source/selector.rst @@ -0,0 +1,4 @@ +Selector +-------- +.. automodule:: pytroll_watchers.selector + :members: diff --git a/src/pytroll_watchers/selector.py b/src/pytroll_watchers/selector.py new file mode 100644 index 0000000..e5cc54e --- /dev/null +++ b/src/pytroll_watchers/selector.py @@ -0,0 +1,131 @@ +"""Functions and classes for performing message selection. + +Selection in this context means making sure only one message refering to some file will be published further. + +This is useful when multiple source for the same data are sending messages (eg two reception servers for eumetcast) but +only one of each file is needed for further processing. + +At the moment, this module makes use of redis as a refined dictionary for keeping track of the received files. + +""" + +import time +from contextlib import closing, contextmanager +from functools import cache +from subprocess import Popen + +import redis +from posttroll.message import Message +from posttroll.publisher import create_publisher_from_dict_config +from posttroll.subscriber import create_subscriber_from_dict_config + + +@cache +def _connect_to_redis(**kwargs): + return redis.Redis(**kwargs) + +class TTLDict: + """A simple dictionary-like object that discards items older than a time-to-live. + + Not thread-safe. + """ + + def __init__(self, ttl=300, **redis_params): + """Set up the instance. + + Args: + ttl: the time to live of the stored items in integer seconds or as a timedelta instance. Cannot be less + than 1 second. + redis_params: the keyword arguments to pass to the underlying :py:class:`~redis.Redis` instance. + """ + self._redis = _connect_to_redis(**redis_params) + self._ttl = ttl + + def __getitem__(self, key): + """Get the value corresponding to *key*.""" + return self._redis[key] + + def __setitem__(self, key, value): + """Set the *value* corresponding to *key*.""" + res = self._redis.get(key) + if not res: + self._redis.set(key, value, ex=self._ttl) + + +def running_selector(selector_config, subscriber_config): + """Generate selected messages. + + The aim of this generator is to skip messages that are duplicates to already processed messages. + Duplicate in this context means messages referring to the same file (even if stored in different locations). + + Args: + selector_config: a dictionary of arguments to pass to the underlying redis instance, see + https://redis.readthedocs.io/en/stable/connections.html#redis.Redis. You can also provide a ttl as an int + (seconds) or timedelta instance. + subscriber_config: a dictionary of arguments to pass to + :py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`. + + Yields: + JSON representations of posttroll messages. + """ + subscriber = create_subscriber_from_dict_config(subscriber_config) + + with closing(subscriber): + sel = TTLDict(**selector_config) + for msg in subscriber.recv(): + key = Message.decode(msg).data["uid"] + try: + _ = sel[key] + except KeyError: + sel[key] = msg + yield msg + + +def _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config): + """Run the selector with a managed ttldict server.""" + publisher = create_publisher_from_dict_config(publisher_config) + publisher.start() + with closing(publisher): + for msg in running_selector(selector_config, subscriber_config): + publisher.send(msg) + + +def run_selector(selector_config, subscriber_config, publisher_config): + """Run the selector. + + The aim of the selector is to skip messages that refer to already processed files. For example + + The aim of the selector is to skip messages that are duplicates to already published messages. + Duplicate in this context means messages referring to the same file (even if stored in different locations). + + Messages that refer to new files will be published. + + Args: + selector_config: a dictionary of arguments to pass to the underlying redis instance, see + https://redis.readthedocs.io/en/stable/connections.html#redis.Redis. You can also provide a ttl as an int + (seconds) or timedelta instance. + subscriber_config: a dictionary of arguments to pass to + :py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`. The subscribtion is used as a source for + messages to process. + publisher_config: a dictionary of arguments to pass to + :py:func:`~posttroll.publisher.create_publisher_from_dict_config`. This publisher will send the selected + messages. + + """ + with _running_redis_server(port=selector_config.get("port")): + _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config) + + +@contextmanager +def _running_redis_server(port=None): + command = ["redis-server"] + if port: + port = str(int(port)) # using int first here prevents arbitrary strings to be passed to Popen + command += ["--port", port] + proc = Popen(command) # noqa:S603 port is validated + time.sleep(.25) + try: + yield + finally: + proc.terminate() + proc.wait(3) diff --git a/tests/test_selector.py b/tests/test_selector.py new file mode 100644 index 0000000..fbaf923 --- /dev/null +++ b/tests/test_selector.py @@ -0,0 +1,124 @@ +"""Tests for the selector.""" +import time + +import pytest +from posttroll.testing import patched_publisher, patched_subscriber_recv +from pytroll_watchers.selector import ( + TTLDict, + _run_selector_with_managed_dict_server, + _running_redis_server, + run_selector, +) + + +def test_run_selector_that_starts_redis_on_given_port(tmp_path): + """Test running a selector that also starts a redis server.""" + uid = "IVCDB_j03_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5" + sdr_file = tmp_path / "sdr" / uid + create_data_file(sdr_file) + + msg1 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + 'application/json {"sensor": "viirs", ' + f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", ' + '"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}') + + messages = [msg1] + + pipe_in_address = "ipc://" + str(tmp_path / "in.ipc") + pipe_out_address = "ipc://" + str(tmp_path / "out.ipc") + subscriber_config = dict(addresses=[pipe_in_address], + nameserver=False, + port=3000) + + publisher_config = dict(address=pipe_out_address, + nameservers=False) + + selector_config = dict(ttl=1, host="localhost", port=6388) + + with patched_subscriber_recv(messages): + with patched_publisher() as published_messages: + run_selector(selector_config, subscriber_config, publisher_config) + assert len(published_messages) == 1 + + + +@pytest.fixture(scope="module") +def _redis_server(): + """Start a redis server.""" + with _running_redis_server(): + yield + + +def create_data_file(path): + """Create a data file.""" + path.parent.mkdir(exist_ok=True) + + with open(path, "w") as fd: + fd.write("data") + + +@pytest.mark.usefixtures("_redis_server") +def test_run_selector_on_single_file_messages(tmp_path): + """Test running the selector on single file messages.""" + uid = "IVCDB_j02_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5" + sdr_file = tmp_path / "sdr" / uid + create_data_file(sdr_file) + + uid2 = "IVCDB_j01_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5" + sdr_file2 = tmp_path / "sdr" / uid2 + create_data_file(sdr_file2) + + + msg1 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + 'application/json {"sensor": "viirs", ' + f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", ' + '"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}') + + msg2 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + 'application/json {"sensor": "viirs", ' + f'"uid": "{uid}", "uri": "ssh://someplace.pytroll.org:/{str(sdr_file)}", "path": "{str(sdr_file)}", ' + '"filesystem": {"cls": "fsspec.implementations.ssh.SFTPFileSystem", "protocol": "ssh", "args": []}}') + + msg3 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + 'application/json {"sensor": "viirs", ' + f'"uid": "{uid2}", "uri": "ssh://someplace.pytroll.org:/{str(sdr_file2)}", "path": "{str(sdr_file2)}", ' + '"filesystem": {"cls": "fsspec.implementations.ssh.SFTPFileSystem", "protocol": "ssh", "args": []}}') + + messages = [msg1, msg2, msg3] + + pipe_in_address = "ipc://" + str(tmp_path / "in.ipc") + pipe_out_address = "ipc://" + str(tmp_path / "out.ipc") + subscriber_config = dict(addresses=[pipe_in_address], + nameserver=False, + port=3000) + + publisher_config = dict(address=pipe_out_address, + nameservers=False) + + selector_config = dict(ttl=1, host="localhost", port=6379) + + with patched_subscriber_recv(messages): + with patched_publisher() as published_messages: + _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config) + assert len(published_messages) == 2 + assert published_messages[0] == msg1 + assert published_messages[1] == msg3 + + +@pytest.mark.usefixtures("_redis_server") +def test__dict(): + """Test the TTLDict.""" + ttl = 1 + key = "uid_1" + value = b"some stuff" + other_value = b"some other important stuff" + + sel = TTLDict(ttl) + + sel[key] = value + assert sel[key] == value + sel[key] = other_value + assert sel[key] == value + time.sleep(ttl) + sel[key] = other_value + assert sel[key] == other_value From f4683048f0c506ab0d16eee431301bfe031d73d2 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 14:05:22 +0200 Subject: [PATCH 02/16] Enable custom redis directory --- docs/source/conf.py | 2 +- src/pytroll_watchers/selector.py | 10 ++++++++-- tests/test_selector.py | 30 ++++++++++++++++++++++++++++-- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index c146706..86c708d 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -17,7 +17,7 @@ extensions = ["sphinx.ext.napoleon", "sphinx.ext.autodoc", "sphinx.ext.intersphinx"] autodoc_mock_imports = ["watchdog", "minio", "posttroll", "pytest", "trollsift", "universal_path", - "freezegun", "responses", "oauthlib", "requests_oauthlib", "defusedxml"] + "freezegun", "responses", "oauthlib", "requests_oauthlib", "defusedxml", "redis"] templates_path = ["_templates"] exclude_patterns = [] diff --git a/src/pytroll_watchers/selector.py b/src/pytroll_watchers/selector.py index e5cc54e..0c6b453 100644 --- a/src/pytroll_watchers/selector.py +++ b/src/pytroll_watchers/selector.py @@ -12,6 +12,7 @@ import time from contextlib import closing, contextmanager from functools import cache +from pathlib import Path from subprocess import Popen import redis @@ -24,6 +25,7 @@ def _connect_to_redis(**kwargs): return redis.Redis(**kwargs) + class TTLDict: """A simple dictionary-like object that discards items older than a time-to-live. @@ -117,12 +119,16 @@ def run_selector(selector_config, subscriber_config, publisher_config): @contextmanager -def _running_redis_server(port=None): +def _running_redis_server(port=None, directory=None): command = ["redis-server"] if port: port = str(int(port)) # using int first here prevents arbitrary strings to be passed to Popen command += ["--port", port] - proc = Popen(command) # noqa:S603 port is validated + if directory: + directory = Path(directory) + directory.mkdir(parents=True, exist_ok=True) + command += ["--dir", directory] + proc = Popen(command) # noqa:S603 port is validated and directory is a Path time.sleep(.25) try: yield diff --git a/tests/test_selector.py b/tests/test_selector.py index fbaf923..d1fb944 100644 --- a/tests/test_selector.py +++ b/tests/test_selector.py @@ -11,6 +11,32 @@ ) +def test_ttldict_multiple_redis_instances(tmp_path): + """Test the TTLDict.""" + ttl = 300 + key = "uid_multiple" + value = b"some stuff" + other_value = b"some other important stuff" + port = 7321 + with _running_redis_server(port=port, directory=tmp_path / "redis_1"): + sel = TTLDict(ttl, port=port) + + sel[key] = value + assert sel[key] == value + sel[key] = other_value + assert sel[key] == value + with _running_redis_server(port=port, directory=tmp_path / "redis_2"): + with pytest.raises(KeyError): + sel[key] + + +def test_redis_server_validates_directory(tmp_path): + """Test the TTLDict.""" + port = 7321 + with _running_redis_server(port=port, directory=str(tmp_path / "redis_1")): + assert True + + def test_run_selector_that_starts_redis_on_given_port(tmp_path): """Test running a selector that also starts a redis server.""" uid = "IVCDB_j03_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5" @@ -106,7 +132,7 @@ def test_run_selector_on_single_file_messages(tmp_path): @pytest.mark.usefixtures("_redis_server") -def test__dict(): +def test_ttldict(): """Test the TTLDict.""" ttl = 1 key = "uid_1" @@ -119,6 +145,6 @@ def test__dict(): assert sel[key] == value sel[key] = other_value assert sel[key] == value - time.sleep(ttl) + time.sleep(ttl+1) sel[key] = other_value assert sel[key] == other_value From 286cb9f394695128cf46ef301cabdeb8eefe171a Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 14:07:41 +0200 Subject: [PATCH 03/16] Fix ci --- .github/workflows/ci.yml | 2 +- pyproject.toml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 88e4e57..d2b855f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,7 @@ jobs: python -m pip install --upgrade pip python -m pip install ruff pytest pytest-cov freezegun responses python -m pip install git+https://github.com/gorakhargosh/watchdog - python -m pip install -e .[local,minio,publishing,ssh,dataspace,datastore,dhus] + python -m pip install -e .[local,minio,publishing,ssh,dataspace,datastore,dhus,selector] - name: Lint with ruff run: | ruff check . diff --git a/pyproject.toml b/pyproject.toml index 0326fc2..2920c6b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ ssh = ["paramiko"] dataspace = ["oauthlib", "requests_oauthlib", "s3fs"] datastore = ["oauthlib", "requests_oauthlib"] dhus = ["defusedxml"] +selector = ["redis", "redis-server"] [build-system] requires = ["hatchling", "hatch-vcs"] From 31bb7d6dc7144d8a6f08f94dacfc207a164484fc Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 15:35:22 +0200 Subject: [PATCH 04/16] Add a command-line script --- pyproject.toml | 1 + src/pytroll_watchers/selector.py | 98 ++++++++++++++++++++++++++++---- tests/test_local_watcher.py | 8 +-- tests/test_selector.py | 52 +++++++++++++++-- 4 files changed, 140 insertions(+), 19 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2920c6b..48590b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ classifiers = [ [project.scripts] pytroll-watcher = "pytroll_watchers.main_interface:cli" +pytroll-selector = "pytroll_watchers.selector:cli" [project.entry-points."pytroll_watchers.backends"] local = "pytroll_watchers.local_watcher" diff --git a/src/pytroll_watchers/selector.py b/src/pytroll_watchers/selector.py index 0c6b453..5b77e00 100644 --- a/src/pytroll_watchers/selector.py +++ b/src/pytroll_watchers/selector.py @@ -5,10 +5,47 @@ This is useful when multiple source for the same data are sending messages (eg two reception servers for eumetcast) but only one of each file is needed for further processing. -At the moment, this module makes use of redis as a refined dictionary for keeping track of the received files. +At the moment, this module makes use of redis as a refined dictionary for keeping track of the received files. Hence a +redis server instance will be started along with some of the functions here, and with the cli. + +A command-line script is also made available by this module. It is called ``pytroll-selector``:: + + usage: pytroll-selector [-h] [-l LOG_CONFIG] config + + Selects unique messages from multiple sources. + + positional arguments: + config The yaml config file. + + options: + -h, --help show this help message and exit + -l LOG_CONFIG, --log-config LOG_CONFIG + The yaml config file for logging. + + Thanks for using pytroll-selector! + +An example config file to use with this script is the following:: + + selector_config: + ttl: 30 + publisher_config: + name: hrit_selector + subscriber_config: + addresses: + - tcp://eumetcast_reception_1:9999 + - tcp://eumetcast_reception_2:9999 + nameserver: false + topics: + - /1b/hrit-segment/0deg + +The different sections are passed straight on to :py:func:`run_selector`, so check it to have more information about +what to pass to it. + """ +import argparse +import logging import time from contextlib import closing, contextmanager from functools import cache @@ -16,10 +53,13 @@ from subprocess import Popen import redis -from posttroll.message import Message +import yaml from posttroll.publisher import create_publisher_from_dict_config from posttroll.subscriber import create_subscriber_from_dict_config +from pytroll_watchers.main_interface import configure_logging + +logger = logging.getLogger(__name__) @cache def _connect_to_redis(**kwargs): @@ -30,6 +70,12 @@ class TTLDict: """A simple dictionary-like object that discards items older than a time-to-live. Not thread-safe. + + Args: + ttl: the time to live of the stored items in integer seconds or as a timedelta instance. Cannot be less + than 1 second. + redis_params: the keyword arguments to pass to the underlying :py:class:`~redis.Redis` instance. + """ def __init__(self, ttl=300, **redis_params): @@ -62,7 +108,7 @@ def running_selector(selector_config, subscriber_config): Args: selector_config: a dictionary of arguments to pass to the underlying redis instance, see - https://redis.readthedocs.io/en/stable/connections.html#redis.Redis. You can also provide a ttl as an int + :py:class:`~redis.Redis`. You can also provide a ttl as an int (seconds) or timedelta instance. subscriber_config: a dictionary of arguments to pass to :py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`. @@ -75,12 +121,15 @@ def running_selector(selector_config, subscriber_config): with closing(subscriber): sel = TTLDict(**selector_config) for msg in subscriber.recv(): - key = Message.decode(msg).data["uid"] + key = msg.data["uid"] try: _ = sel[key] + logger.info(f"Discarded {str(msg)}") except KeyError: - sel[key] = msg - yield msg + msg_string = str(msg) + sel[key] = msg_string + logger.info(f"New content {str(msg)}") + yield msg_string def _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config): @@ -95,8 +144,6 @@ def _run_selector_with_managed_dict_server(selector_config, subscriber_config, p def run_selector(selector_config, subscriber_config, publisher_config): """Run the selector. - The aim of the selector is to skip messages that refer to already processed files. For example - The aim of the selector is to skip messages that are duplicates to already published messages. Duplicate in this context means messages referring to the same file (even if stored in different locations). @@ -104,8 +151,9 @@ def run_selector(selector_config, subscriber_config, publisher_config): Args: selector_config: a dictionary of arguments to pass to the underlying redis instance, see - https://redis.readthedocs.io/en/stable/connections.html#redis.Redis. You can also provide a ttl as an int - (seconds) or timedelta instance. + :py:class:`~redis.Redis`. You can also provide a *ttl* for the + selector as an int (seconds) or timedelta instance, so that incoming messages are forgotten after that time. + Also, you can provide a *directory* for the underlying datastructure to store the data in. subscriber_config: a dictionary of arguments to pass to :py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`. The subscribtion is used as a source for messages to process. @@ -114,7 +162,7 @@ def run_selector(selector_config, subscriber_config, publisher_config): messages. """ - with _running_redis_server(port=selector_config.get("port")): + with _running_redis_server(port=selector_config.get("port"), directory=selector_config.pop("directory", None)): _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config) @@ -135,3 +183,31 @@ def _running_redis_server(port=None, directory=None): finally: proc.terminate() proc.wait(3) + + +def cli(args=None): + """Command line interface.""" + parser = argparse.ArgumentParser( + prog="pytroll-selector", + description="Selects unique messages from multiple sources.", + epilog="Thanks for using pytroll-selector!") + + parser.add_argument("config", type=str, help="The yaml config file.") + parser.add_argument("-l", "--log-config", type=str, help="The yaml config file for logging.", default=None) + + parsed = parser.parse_args(args) + + + log_config_filename = parsed.log_config + configure_logging(log_config_filename) + + config_file = parsed.config + + with open(config_file) as fd: + config_dict = yaml.safe_load(fd.read()) + + selector_config = config_dict.get("selector_config", {}) + subscriber_config = config_dict.get("subscriber_config", {}) + publisher_config = config_dict.get("publisher_config", {}) + + return run_selector(selector_config, subscriber_config, publisher_config) diff --git a/tests/test_local_watcher.py b/tests/test_local_watcher.py index 34accca..c0fc9b9 100644 --- a/tests/test_local_watcher.py +++ b/tests/test_local_watcher.py @@ -88,8 +88,8 @@ def test_publish_paths(tmp_path, patched_local_events, caplog): # noqa with patched_local_events([filename]): with patched_publisher() as messages: local_watcher.file_publisher(fs_config=local_settings, - publisher_config=publisher_settings, - message_config=message_settings) + publisher_config=publisher_settings, + message_config=message_settings) assert "uri" not in message_settings["data"] assert len(messages) == 1 @@ -115,5 +115,5 @@ def test_publish_paths_forbids_passing_password(tmp_path, patched_local_events, with patched_publisher(): with pytest.raises(SecurityError): local_watcher.file_publisher(fs_config=local_settings, - publisher_config=publisher_settings, - message_config=message_settings) + publisher_config=publisher_settings, + message_config=message_settings) diff --git a/tests/test_selector.py b/tests/test_selector.py index d1fb944..24551d4 100644 --- a/tests/test_selector.py +++ b/tests/test_selector.py @@ -2,11 +2,14 @@ import time import pytest +import yaml +from posttroll.message import Message from posttroll.testing import patched_publisher, patched_subscriber_recv from pytroll_watchers.selector import ( TTLDict, _run_selector_with_managed_dict_server, _running_redis_server, + cli, run_selector, ) @@ -48,7 +51,7 @@ def test_run_selector_that_starts_redis_on_given_port(tmp_path): f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", ' '"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}') - messages = [msg1] + messages = [Message.decode(msg1)] pipe_in_address = "ipc://" + str(tmp_path / "in.ipc") pipe_out_address = "ipc://" + str(tmp_path / "out.ipc") @@ -67,7 +70,6 @@ def test_run_selector_that_starts_redis_on_given_port(tmp_path): assert len(published_messages) == 1 - @pytest.fixture(scope="module") def _redis_server(): """Start a redis server.""" @@ -84,7 +86,7 @@ def create_data_file(path): @pytest.mark.usefixtures("_redis_server") -def test_run_selector_on_single_file_messages(tmp_path): +def test_run_selector_on_single_file_messages(tmp_path, caplog): """Test running the selector on single file messages.""" uid = "IVCDB_j02_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5" sdr_file = tmp_path / "sdr" / uid @@ -110,7 +112,7 @@ def test_run_selector_on_single_file_messages(tmp_path): f'"uid": "{uid2}", "uri": "ssh://someplace.pytroll.org:/{str(sdr_file2)}", "path": "{str(sdr_file2)}", ' '"filesystem": {"cls": "fsspec.implementations.ssh.SFTPFileSystem", "protocol": "ssh", "args": []}}') - messages = [msg1, msg2, msg3] + messages = [Message.decode(msg1), Message.decode(msg2), Message.decode(msg3)] pipe_in_address = "ipc://" + str(tmp_path / "in.ipc") pipe_out_address = "ipc://" + str(tmp_path / "out.ipc") @@ -123,12 +125,17 @@ def test_run_selector_on_single_file_messages(tmp_path): selector_config = dict(ttl=1, host="localhost", port=6379) + caplog.set_level("INFO") with patched_subscriber_recv(messages): with patched_publisher() as published_messages: _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config) assert len(published_messages) == 2 assert published_messages[0] == msg1 assert published_messages[1] == msg3 + assert "New content " + str(msg1) in caplog.text + assert "Discarded " + str(msg2) in caplog.text + assert "New content " + str(msg3) in caplog.text + @pytest.mark.usefixtures("_redis_server") @@ -148,3 +155,40 @@ def test_ttldict(): time.sleep(ttl+1) sel[key] = other_value assert sel[key] == other_value + + +def test_cli(tmp_path): + """Test the command-line interface.""" + uid = "IVCDB_j03_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5" + sdr_file = tmp_path / "sdr" / uid + create_data_file(sdr_file) + + msg1 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + 'application/json {"sensor": "viirs", ' + f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", ' + '"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}') + + messages = [Message.decode(msg1)] + + pipe_in_address = "ipc://" + str(tmp_path / "in.ipc") + pipe_out_address = "ipc://" + str(tmp_path / "out.ipc") + subscriber_config = dict(addresses=[pipe_in_address], + nameserver=False, + port=3000) + + publisher_config = dict(address=pipe_out_address, + nameservers=False) + + redis_dir = tmp_path / "redis_dir" + + selector_config = dict(ttl=1, port=6389, directory=str(redis_dir)) + config = dict(publisher_config=publisher_config, + subscriber_config=subscriber_config, + selector_config=selector_config) + config_file = tmp_path / "selector_config" + with open(config_file, "w") as fd: + fd.write(yaml.dump(config)) + with patched_subscriber_recv(messages): + with patched_publisher() as published_messages: + cli([str(config_file)]) + assert len(published_messages) == 1 From a77b1244d04ce337928584e1d0fb6a629eb4287c Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 15:42:24 +0200 Subject: [PATCH 05/16] Install redis-server with apt in ci --- .github/workflows/ci.yml | 1 + pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d2b855f..3011881 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,6 +28,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | + sudo apt install -y redis-server python -m pip install --upgrade pip python -m pip install ruff pytest pytest-cov freezegun responses python -m pip install git+https://github.com/gorakhargosh/watchdog diff --git a/pyproject.toml b/pyproject.toml index 48590b2..1637a09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ ssh = ["paramiko"] dataspace = ["oauthlib", "requests_oauthlib", "s3fs"] datastore = ["oauthlib", "requests_oauthlib"] dhus = ["defusedxml"] -selector = ["redis", "redis-server"] +selector = ["redis"] [build-system] requires = ["hatchling", "hatch-vcs"] From c877fe7e39c7f888158120320bf0704bab8bd02e Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 15:50:05 +0200 Subject: [PATCH 06/16] Use fixed publishing ports for selector tests --- tests/test_selector.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/test_selector.py b/tests/test_selector.py index 24551d4..39de12b 100644 --- a/tests/test_selector.py +++ b/tests/test_selector.py @@ -60,7 +60,8 @@ def test_run_selector_that_starts_redis_on_given_port(tmp_path): port=3000) publisher_config = dict(address=pipe_out_address, - nameservers=False) + nameservers=False, + port=1999) selector_config = dict(ttl=1, host="localhost", port=6388) @@ -121,6 +122,7 @@ def test_run_selector_on_single_file_messages(tmp_path, caplog): port=3000) publisher_config = dict(address=pipe_out_address, + port=1999, nameservers=False) selector_config = dict(ttl=1, host="localhost", port=6379) @@ -177,7 +179,8 @@ def test_cli(tmp_path): port=3000) publisher_config = dict(address=pipe_out_address, - nameservers=False) + nameservers=False, + port=1999) redis_dir = tmp_path / "redis_dir" From 70a28f4cde21854c937b52e1e939d663024c9884 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 15:53:52 +0200 Subject: [PATCH 07/16] Don't use default redis port in tests --- tests/test_selector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_selector.py b/tests/test_selector.py index 39de12b..f98add4 100644 --- a/tests/test_selector.py +++ b/tests/test_selector.py @@ -125,7 +125,7 @@ def test_run_selector_on_single_file_messages(tmp_path, caplog): port=1999, nameservers=False) - selector_config = dict(ttl=1, host="localhost", port=6379) + selector_config = dict(ttl=1, host="localhost", port=6309) caplog.set_level("INFO") with patched_subscriber_recv(messages): From 5e47ebb6438b23ccdfe4a7ee8feb5e475f1f8a4c Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 15:58:16 +0200 Subject: [PATCH 08/16] Fix conflicting port --- tests/test_selector.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_selector.py b/tests/test_selector.py index f98add4..e41fe4c 100644 --- a/tests/test_selector.py +++ b/tests/test_selector.py @@ -86,7 +86,6 @@ def create_data_file(path): fd.write("data") -@pytest.mark.usefixtures("_redis_server") def test_run_selector_on_single_file_messages(tmp_path, caplog): """Test running the selector on single file messages.""" uid = "IVCDB_j02_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5" @@ -130,7 +129,8 @@ def test_run_selector_on_single_file_messages(tmp_path, caplog): caplog.set_level("INFO") with patched_subscriber_recv(messages): with patched_publisher() as published_messages: - _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config) + with _running_redis_server(port=6309): + _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config) assert len(published_messages) == 2 assert published_messages[0] == msg1 assert published_messages[1] == msg3 @@ -154,7 +154,7 @@ def test_ttldict(): assert sel[key] == value sel[key] = other_value assert sel[key] == value - time.sleep(ttl+1) + time.sleep(ttl) sel[key] = other_value assert sel[key] == other_value From 798a4ed406fca7815d1384d50dea59db9af0afbb Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 16:02:29 +0200 Subject: [PATCH 09/16] Remove caplog --- tests/test_selector.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/test_selector.py b/tests/test_selector.py index e41fe4c..69730fb 100644 --- a/tests/test_selector.py +++ b/tests/test_selector.py @@ -86,7 +86,7 @@ def create_data_file(path): fd.write("data") -def test_run_selector_on_single_file_messages(tmp_path, caplog): +def test_run_selector_on_single_file_messages(tmp_path): """Test running the selector on single file messages.""" uid = "IVCDB_j02_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5" sdr_file = tmp_path / "sdr" / uid @@ -126,7 +126,6 @@ def test_run_selector_on_single_file_messages(tmp_path, caplog): selector_config = dict(ttl=1, host="localhost", port=6309) - caplog.set_level("INFO") with patched_subscriber_recv(messages): with patched_publisher() as published_messages: with _running_redis_server(port=6309): @@ -134,10 +133,6 @@ def test_run_selector_on_single_file_messages(tmp_path, caplog): assert len(published_messages) == 2 assert published_messages[0] == msg1 assert published_messages[1] == msg3 - assert "New content " + str(msg1) in caplog.text - assert "Discarded " + str(msg2) in caplog.text - assert "New content " + str(msg3) in caplog.text - @pytest.mark.usefixtures("_redis_server") From cdca28c298ed58f581f7fce7fa76330cd475cf68 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 16:06:38 +0200 Subject: [PATCH 10/16] Give some margin to redis after ttl in tests --- tests/test_selector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_selector.py b/tests/test_selector.py index 69730fb..11d5aed 100644 --- a/tests/test_selector.py +++ b/tests/test_selector.py @@ -149,7 +149,7 @@ def test_ttldict(): assert sel[key] == value sel[key] = other_value assert sel[key] == value - time.sleep(ttl) + time.sleep(ttl + 0.1) sel[key] = other_value assert sel[key] == other_value From 05585e9271c0a2a5160973d3705aab20a5b8f1d7 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 16:30:55 +0200 Subject: [PATCH 11/16] Add warning about redis-server --- docs/source/selector.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/source/selector.rst b/docs/source/selector.rst index a51fdd1..f04b210 100644 --- a/docs/source/selector.rst +++ b/docs/source/selector.rst @@ -1,4 +1,9 @@ Selector -------- + +.. warning:: + For this module and script to work properly, redis-server must be installed! It is available in for most linux + distributions, or in conda-forge. + .. automodule:: pytroll_watchers.selector :members: From 631cd4748c4a7893e937474cfa95f803c9563a92 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 16:32:19 +0200 Subject: [PATCH 12/16] Fix typo --- docs/source/selector.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/selector.rst b/docs/source/selector.rst index f04b210..dd142c6 100644 --- a/docs/source/selector.rst +++ b/docs/source/selector.rst @@ -2,7 +2,7 @@ Selector -------- .. warning:: - For this module and script to work properly, redis-server must be installed! It is available in for most linux + For this module and script to work properly, redis-server must be installed! It is available for most linux distributions, or in conda-forge. .. automodule:: pytroll_watchers.selector From 1bc71ca19f0bca6ac988d52b77b20437ea72ffd0 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 14 May 2024 16:41:04 +0200 Subject: [PATCH 13/16] Fix documentation --- src/pytroll_watchers/selector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pytroll_watchers/selector.py b/src/pytroll_watchers/selector.py index 5b77e00..152ddb4 100644 --- a/src/pytroll_watchers/selector.py +++ b/src/pytroll_watchers/selector.py @@ -109,7 +109,7 @@ def running_selector(selector_config, subscriber_config): Args: selector_config: a dictionary of arguments to pass to the underlying redis instance, see :py:class:`~redis.Redis`. You can also provide a ttl as an int - (seconds) or timedelta instance. + (seconds) or timedelta instance, otherwise it defaults to 300 seconds (5 minutes). subscriber_config: a dictionary of arguments to pass to :py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`. @@ -153,6 +153,7 @@ def run_selector(selector_config, subscriber_config, publisher_config): selector_config: a dictionary of arguments to pass to the underlying redis instance, see :py:class:`~redis.Redis`. You can also provide a *ttl* for the selector as an int (seconds) or timedelta instance, so that incoming messages are forgotten after that time. + If not provided, the ttl defaults to 300 seconds (5 minutes). Also, you can provide a *directory* for the underlying datastructure to store the data in. subscriber_config: a dictionary of arguments to pass to :py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`. The subscribtion is used as a source for From 44de5d9406e7f8bd5ba8abb6fb3aaaa613d2b2b0 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Wed, 15 May 2024 08:56:02 +0200 Subject: [PATCH 14/16] Address review comments --- src/pytroll_watchers/selector.py | 36 ++++++++++++++++++++++---------- tests/test_selector.py | 12 +++++------ 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/pytroll_watchers/selector.py b/src/pytroll_watchers/selector.py index 152ddb4..14893a2 100644 --- a/src/pytroll_watchers/selector.py +++ b/src/pytroll_watchers/selector.py @@ -5,6 +5,8 @@ This is useful when multiple source for the same data are sending messages (eg two reception servers for eumetcast) but only one of each file is needed for further processing. +To check if two messages refer to the same data, the *uid* metadata of the messages is used. + At the moment, this module makes use of redis as a refined dictionary for keeping track of the received files. Hence a redis server instance will be started along with some of the functions here, and with the cli. @@ -12,7 +14,7 @@ usage: pytroll-selector [-h] [-l LOG_CONFIG] config - Selects unique messages from multiple sources. + Selects unique messages (based on uid) from multiple sources. positional arguments: config The yaml config file. @@ -99,6 +101,14 @@ def __setitem__(self, key, value): if not res: self._redis.set(key, value, ex=self._ttl) + def __contains__(self, key): + """Check if key is already present.""" + try: + _ = self[key] + return True + except KeyError: + return False + def running_selector(selector_config, subscriber_config): """Generate selected messages. @@ -119,17 +129,21 @@ def running_selector(selector_config, subscriber_config): subscriber = create_subscriber_from_dict_config(subscriber_config) with closing(subscriber): - sel = TTLDict(**selector_config) + ttl_dict = TTLDict(**selector_config) for msg in subscriber.recv(): - key = msg.data["uid"] - try: - _ = sel[key] - logger.info(f"Discarded {str(msg)}") - except KeyError: + key = unique_key(msg) + if key not in ttl_dict: msg_string = str(msg) - sel[key] = msg_string + ttl_dict[key] = msg_string logger.info(f"New content {str(msg)}") yield msg_string + else: + logger.debug(f"Discarded {str(msg)}") + + +def unique_key(msg): + """Identify the content of the message with a unique key.""" + return msg.data["uid"] def _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config): @@ -163,12 +177,12 @@ def run_selector(selector_config, subscriber_config, publisher_config): messages. """ - with _running_redis_server(port=selector_config.get("port"), directory=selector_config.pop("directory", None)): + with _started_redis_server(port=selector_config.get("port"), directory=selector_config.pop("directory", None)): _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config) @contextmanager -def _running_redis_server(port=None, directory=None): +def _started_redis_server(port=None, directory=None): command = ["redis-server"] if port: port = str(int(port)) # using int first here prevents arbitrary strings to be passed to Popen @@ -190,7 +204,7 @@ def cli(args=None): """Command line interface.""" parser = argparse.ArgumentParser( prog="pytroll-selector", - description="Selects unique messages from multiple sources.", + description="Selects unique messages (based on uid) from multiple sources.", epilog="Thanks for using pytroll-selector!") parser.add_argument("config", type=str, help="The yaml config file.") diff --git a/tests/test_selector.py b/tests/test_selector.py index 11d5aed..f8bd250 100644 --- a/tests/test_selector.py +++ b/tests/test_selector.py @@ -8,7 +8,7 @@ from pytroll_watchers.selector import ( TTLDict, _run_selector_with_managed_dict_server, - _running_redis_server, + _started_redis_server, cli, run_selector, ) @@ -21,14 +21,14 @@ def test_ttldict_multiple_redis_instances(tmp_path): value = b"some stuff" other_value = b"some other important stuff" port = 7321 - with _running_redis_server(port=port, directory=tmp_path / "redis_1"): + with _started_redis_server(port=port, directory=tmp_path / "redis_1"): sel = TTLDict(ttl, port=port) sel[key] = value assert sel[key] == value sel[key] = other_value assert sel[key] == value - with _running_redis_server(port=port, directory=tmp_path / "redis_2"): + with _started_redis_server(port=port, directory=tmp_path / "redis_2"): with pytest.raises(KeyError): sel[key] @@ -36,7 +36,7 @@ def test_ttldict_multiple_redis_instances(tmp_path): def test_redis_server_validates_directory(tmp_path): """Test the TTLDict.""" port = 7321 - with _running_redis_server(port=port, directory=str(tmp_path / "redis_1")): + with _started_redis_server(port=port, directory=str(tmp_path / "redis_1")): assert True @@ -74,7 +74,7 @@ def test_run_selector_that_starts_redis_on_given_port(tmp_path): @pytest.fixture(scope="module") def _redis_server(): """Start a redis server.""" - with _running_redis_server(): + with _started_redis_server(): yield @@ -128,7 +128,7 @@ def test_run_selector_on_single_file_messages(tmp_path): with patched_subscriber_recv(messages): with patched_publisher() as published_messages: - with _running_redis_server(port=6309): + with _started_redis_server(port=6309): _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config) assert len(published_messages) == 2 assert published_messages[0] == msg1 From 1ec4760388b5c2c9a4afe78963d93a6a9b204faa Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Wed, 15 May 2024 09:12:56 +0200 Subject: [PATCH 15/16] Process only `file` messages --- src/pytroll_watchers/selector.py | 28 +++++++++++++++------- tests/test_selector.py | 41 ++++++++++++++++++++++++++++---- 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/src/pytroll_watchers/selector.py b/src/pytroll_watchers/selector.py index 14893a2..9bfbca5 100644 --- a/src/pytroll_watchers/selector.py +++ b/src/pytroll_watchers/selector.py @@ -126,19 +126,29 @@ def running_selector(selector_config, subscriber_config): Yields: JSON representations of posttroll messages. """ + ttl_dict = TTLDict(**selector_config) + + for msg in _data_messages(subscriber_config): + key = unique_key(msg) + msg_string = str(msg) + + if key not in ttl_dict: + ttl_dict[key] = msg_string + logger.info(f"New content {msg_string}") + yield msg_string + else: + logger.debug(f"Discarded {msg_string}") + + +def _data_messages(subscriber_config): + """Generate messages referring to new data from subscriber settings.""" subscriber = create_subscriber_from_dict_config(subscriber_config) with closing(subscriber): - ttl_dict = TTLDict(**selector_config) for msg in subscriber.recv(): - key = unique_key(msg) - if key not in ttl_dict: - msg_string = str(msg) - ttl_dict[key] = msg_string - logger.info(f"New content {str(msg)}") - yield msg_string - else: - logger.debug(f"Discarded {str(msg)}") + if msg.type != "file": + continue + yield msg def unique_key(msg): diff --git a/tests/test_selector.py b/tests/test_selector.py index f8bd250..c8e9cd0 100644 --- a/tests/test_selector.py +++ b/tests/test_selector.py @@ -46,7 +46,7 @@ def test_run_selector_that_starts_redis_on_given_port(tmp_path): sdr_file = tmp_path / "sdr" / uid create_data_file(sdr_file) - msg1 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + msg1 = ('pytroll://segment/viirs/l1b/ file a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' 'application/json {"sensor": "viirs", ' f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", ' '"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}') @@ -71,6 +71,37 @@ def test_run_selector_that_starts_redis_on_given_port(tmp_path): assert len(published_messages) == 1 +def test_run_selector_ignores_non_file_messages(tmp_path): + """Test running a selector ignore irrelevant messages.""" + uid = "IVCDB_j04_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5" + sdr_file = tmp_path / "sdr" / uid + create_data_file(sdr_file) + + msg1 = ('pytroll://segment/viirs/l1b/ del a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + 'application/json {"sensor": "viirs", ' + f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", ' + '"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}') + + messages = [Message.decode(msg1)] + + pipe_in_address = "ipc://" + str(tmp_path / "in.ipc") + pipe_out_address = "ipc://" + str(tmp_path / "out.ipc") + subscriber_config = dict(addresses=[pipe_in_address], + nameserver=False, + port=3000) + + publisher_config = dict(address=pipe_out_address, + nameservers=False, + port=2000) + + selector_config = dict(ttl=1, host="localhost", port=6298) + + with patched_subscriber_recv(messages): + with patched_publisher() as published_messages: + run_selector(selector_config, subscriber_config, publisher_config) + assert len(published_messages) == 0 + + @pytest.fixture(scope="module") def _redis_server(): """Start a redis server.""" @@ -97,17 +128,17 @@ def test_run_selector_on_single_file_messages(tmp_path): create_data_file(sdr_file2) - msg1 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + msg1 = ('pytroll://segment/viirs/l1b/ file a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' 'application/json {"sensor": "viirs", ' f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", ' '"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}') - msg2 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + msg2 = ('pytroll://segment/viirs/l1b/ file a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' 'application/json {"sensor": "viirs", ' f'"uid": "{uid}", "uri": "ssh://someplace.pytroll.org:/{str(sdr_file)}", "path": "{str(sdr_file)}", ' '"filesystem": {"cls": "fsspec.implementations.ssh.SFTPFileSystem", "protocol": "ssh", "args": []}}') - msg3 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + msg3 = ('pytroll://segment/viirs/l1b/ file a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' 'application/json {"sensor": "viirs", ' f'"uid": "{uid2}", "uri": "ssh://someplace.pytroll.org:/{str(sdr_file2)}", "path": "{str(sdr_file2)}", ' '"filesystem": {"cls": "fsspec.implementations.ssh.SFTPFileSystem", "protocol": "ssh", "args": []}}') @@ -160,7 +191,7 @@ def test_cli(tmp_path): sdr_file = tmp_path / "sdr" / uid create_data_file(sdr_file) - msg1 = ('pytroll://segment/viirs/l1b/ info a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' + msg1 = ('pytroll://segment/viirs/l1b/ file a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 ' 'application/json {"sensor": "viirs", ' f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", ' '"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}') From ba3b20b6668b20952eeb5694694869e4c94439a9 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Fri, 17 May 2024 14:16:55 +0200 Subject: [PATCH 16/16] Add workaround for upath's limitation on ssh paths --- src/pytroll_watchers/local_watcher.py | 5 ++++- tests/test_main_interface.py | 31 +++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/pytroll_watchers/local_watcher.py b/src/pytroll_watchers/local_watcher.py index b6de7f4..80e0426 100644 --- a/src/pytroll_watchers/local_watcher.py +++ b/src/pytroll_watchers/local_watcher.py @@ -8,10 +8,13 @@ from urllib.parse import urlunparse from upath import UPath +from upath._flavour import WrappedFileSystemFlavour from pytroll_watchers.backends.local import listen_to_local_events from pytroll_watchers.publisher import SecurityError, file_publisher_from_generator, parse_metadata +# This is a workaround for a but in universal_pathlib, see +WrappedFileSystemFlavour.protocol_config["netloc_is_anchor"].add("ssh") logger = logging.getLogger(__name__) @@ -72,7 +75,7 @@ def file_generator(directory, observer_type="os", file_pattern=None, protocol=No except ValueError: continue if protocol is not None: - uri = urlunparse((protocol, None, path, None, None, None)) + uri = urlunparse((protocol, None, str(path), None, None, None)) yield UPath(uri, **storage_options), file_metadata else: yield Path(path), file_metadata diff --git a/tests/test_main_interface.py b/tests/test_main_interface.py index 144a7f0..7da22c5 100644 --- a/tests/test_main_interface.py +++ b/tests/test_main_interface.py @@ -1,9 +1,12 @@ """Tests for the gathered publisher functions.""" +import json import logging +from unittest import mock import pytest import yaml +from posttroll.message import Message from posttroll.testing import patched_publisher from pytroll_watchers.local_watcher import file_generator as local_generator from pytroll_watchers.local_watcher import file_publisher as local_publisher @@ -16,6 +19,7 @@ from pytroll_watchers.minio_notification_watcher import file_generator as minio_generator from pytroll_watchers.minio_notification_watcher import file_publisher as minio_publisher from pytroll_watchers.testing import patched_bucket_listener, patched_local_events # noqa +from upath import UPath def test_getting_right_publisher(): @@ -54,6 +58,33 @@ def test_pass_config_to_file_publisher_for_local_backend(tmp_path, patched_local assert str(filename) in msgs[0] +def test_pass_config_to_file_publisher_for_local_backend_with_protocol(tmp_path, patched_local_events, monkeypatch): # noqa + """Test passing a config to create a file publisher from a local fs with protocol.""" + new_fs = mock.Mock() + host = "myhost.pytroll.org" + fs = dict(cls="fsspec.implementations.sftp.SFTPFileSystem", + protocol="sftp", + args=[], + host=host) + new_fs.to_json.return_value = json.dumps(fs) + monkeypatch.setattr(UPath, "fs", new_fs) + local_settings = dict(directory=tmp_path, protocol="ssh", storage_options=dict(host=host)) + publisher_settings = dict(nameservers=False, port=1979) + message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs")) + config = dict(backend="local", + fs_config=local_settings, + publisher_config=publisher_settings, + message_config=message_settings) + with patched_publisher() as msgs: + filename = tmp_path / "bla" + with patched_local_events([filename]): + publish_from_config(config) + assert len(msgs) == 1 + msg = Message.decode(msgs[0]) + assert msg.data["path"] == str(filename) + + + def test_pass_config_to_object_publisher_for_minio_backend(patched_bucket_listener): # noqa """Test passing a config to create an objec publisher from minio bucket.""" s3_settings = dict(endpoint_url="someendpoint",