diff --git a/.github/workflows/python-package.yml b/.github/workflows/ci.yml similarity index 74% rename from .github/workflows/python-package.yml rename to .github/workflows/ci.yml index 44beda6..4aabe25 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,8 @@ jobs: fail-fast: false matrix: python-version: ["3.10", "3.11", "3.12"] + env: + PYTHON_VERSION: ${{ matrix.python-version }} steps: - uses: actions/checkout@v3 @@ -35,4 +37,11 @@ jobs: ruff check . - name: Test with pytest run: | - pytest + pytest --cov=pytroll_watchers tests --cov-report=xml + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v4.0.1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + slug: pytroll/pytroll-watchers + file: ./coverage.xml + env_vars: PYTHON_VERSION diff --git a/docs/source/index.rst b/docs/source/index.rst index 229f610..2a70eba 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -10,6 +10,36 @@ Welcome to pytroll-watchers's documentation! :maxdepth: 2 :caption: Contents: +Pytroll-watcher is a library and command-line tool to detect changes on a local or remote file system. + +At the moment we support local filesystems and Minio S3 buckets through bucket notifications. + +CLI +*** + +The command-line tool can be used by invoking `pytroll-watcher `. An example config-file can be:: + + backend: minio + fs_config: + endpoint_url: my_endpoint.pytroll.org + bucket_name: satellite-data-viirs + storage_options: + profile: profile_for_credentials + publisher_config: + name: viirs_watcher + message_config: + subject: /segment/viirs/l1b/ + atype: file + data: + sensor: viirs + aliases: + platform_name: + npp: Suomi-NPP + + +API +*** + Main interface -------------- .. automodule:: pytroll_watchers diff --git a/src/pytroll_watchers/local_watcher.py b/src/pytroll_watchers/local_watcher.py index 125061d..08876bd 100644 --- a/src/pytroll_watchers/local_watcher.py +++ b/src/pytroll_watchers/local_watcher.py @@ -2,6 +2,7 @@ Either using OS-based envents (like inotify on linux), or polling. """ +import logging import os from pathlib import Path from urllib.parse import urlunparse @@ -11,6 +12,8 @@ from pytroll_watchers.backends.local import listen_to_local_events from pytroll_watchers.publisher import file_publisher_from_generator, parse_metadata +logger = logging.getLogger(__name__) + def file_publisher(fs_config, publisher_config, message_config): """Publish files coming from local filesystem events. @@ -21,6 +24,7 @@ def file_publisher(fs_config, publisher_config, message_config): message_config: The information needed to complete the posttroll message generation. Will be amended with the file metadata, and passed directly to posttroll's Message constructor. """ + logger.info(f"Starting watch on '{fs_config['directory']}'") generator = file_generator(**fs_config) return file_publisher_from_generator(generator, publisher_config, message_config) diff --git a/src/pytroll_watchers/main_interface.py b/src/pytroll_watchers/main_interface.py index 2eff60d..bc04110 100644 --- a/src/pytroll_watchers/main_interface.py +++ b/src/pytroll_watchers/main_interface.py @@ -1,6 +1,7 @@ """Main interface functions.""" import argparse +import logging.config import yaml @@ -60,6 +61,7 @@ def publish_from_config(config): else: raise ValueError(f"Unknown backend {config['backend']}") + def cli(args=None): """Command-line interface for pytroll-watchers.""" parser = argparse.ArgumentParser( @@ -68,12 +70,49 @@ def cli(args=None): epilog="Thanks for using pytroll-watchers!") parser.add_argument("config", type=str, help="The yaml config file.") + parser.add_argument("-l", "--log-config", type=str, help="The yaml config file for logging.", default=None) parsed = parser.parse_args(args) + + log_config_filename = parsed.log_config + configure_logging(log_config_filename) + config_file = parsed.config with open(config_file) as fd: config_dict = yaml.safe_load(fd.read()) return publish_from_config(config_dict) + + +def configure_logging(log_config_filename): + """Configure logging from a yaml file.""" + if log_config_filename is not None: + with open(log_config_filename) as fd: + log_config = yaml.safe_load(fd.read()) + else: + log_config = { + "version": 1, + "formatters": { + "pytroll": { + "format": "[%(asctime)s %(levelname)-8s %(name)s] %(message)s" + } + }, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": "INFO", + "formatter": "pytroll", + }, + }, + "disable_existing_loggers": False, + "loggers": { + "": { + "level": "INFO", + "handlers": ["console"], + "propagate": True + }, + }, + } + logging.config.dictConfig(log_config) diff --git a/src/pytroll_watchers/minio_notification_watcher.py b/src/pytroll_watchers/minio_notification_watcher.py index 4129f80..aa16fc6 100644 --- a/src/pytroll_watchers/minio_notification_watcher.py +++ b/src/pytroll_watchers/minio_notification_watcher.py @@ -1,9 +1,13 @@ """Publish messages based on Minio bucket notifications.""" +from logging import getLogger + from upath import UPath from pytroll_watchers.publisher import file_publisher_from_generator, parse_metadata +logger = getLogger(__name__) + def file_publisher(fs_config, publisher_config, message_config): """Publish objects coming from bucket notifications. @@ -14,6 +18,7 @@ def file_publisher(fs_config, publisher_config, message_config): message_config: The information needed to complete the posttroll message generation. Will be amended with the file metadata, and passed directly to posttroll's Message constructor. """ + logger.info(f"Starting watch on '{fs_config['bucket_name']}'") generator = file_generator(**fs_config) return file_publisher_from_generator(generator, publisher_config, message_config) @@ -57,13 +62,15 @@ def file_generator(endpoint_url, bucket_name, file_pattern=None, storage_options yield path, object_metadata -def _record_generator(endpoint_url, bucket_name, profile=None): +def _record_generator(endpoint_url, bucket_name, storage_options): """Generate records for new objects in the bucket.""" from minio import Minio from minio.credentials.providers import AWSConfigProvider - if profile is not None: - credentials = AWSConfigProvider(profile=profile) + if "profile" in storage_options: + credentials = AWSConfigProvider(profile=storage_options["profile"]) + else: + credentials = None client = Minio(endpoint_url, credentials=credentials diff --git a/src/pytroll_watchers/publisher.py b/src/pytroll_watchers/publisher.py index 0cb0e6b..ac42c26 100644 --- a/src/pytroll_watchers/publisher.py +++ b/src/pytroll_watchers/publisher.py @@ -27,12 +27,26 @@ def file_publisher_from_generator(generator, publisher_config, message_config): amended_message_config["data"]["uri"] = file_item.as_uri() with suppress(AttributeError): amended_message_config["data"]["fs"] = file_item.fs.to_json() - + aliases = amended_message_config.pop("aliases", {}) + apply_aliases(aliases, file_metadata) amended_message_config["data"].update(file_metadata) msg = Message(**amended_message_config) publisher.send(str(msg)) +def apply_aliases(aliases, metadata): + """Apply aliases to the metadata. + + Args: + aliases: a dict containing dicts for each key to be aliases. For example + `{"platform_name": {"npp": "Suomi-NPP}}` will replace the `platform_name` "npp" with "Suomi-NPP". + metadata: the metadata to fix + """ + for key, val in metadata.items(): + if key in aliases: + metadata[key] = aliases[key].get(val, val) + + def fix_times(info): """Fix times so that date and time components are combined, and start time is before end time.""" if "start_date" in info: diff --git a/tests/test_bucket_notification_watcher.py b/tests/test_bucket_notification_watcher.py index 23a3aa5..3fd7274 100644 --- a/tests/test_bucket_notification_watcher.py +++ b/tests/test_bucket_notification_watcher.py @@ -1,6 +1,7 @@ """Tests for the bucket notification watcher.""" import datetime +from unittest import mock from posttroll.message import Message from posttroll.testing import patched_publisher @@ -26,6 +27,21 @@ def test_generate_paths(patched_bucket_listener): # noqa assert path == UPath("s3://viirs-data/sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5", profile=profile) +def test_generate_paths_uses_credentials_from_profile(patched_bucket_listener, monkeypatch): # noqa + """Test generating paths.""" + import minio + fake_minio = mock.MagicMock() + monkeypatch.setattr(minio, "Minio", fake_minio) + + profile="someprofile" + s3_config = dict(endpoint_url="someendpoint", + bucket_name="viirs-data", + storage_options=dict(profile=profile)) + + with patched_bucket_listener(records): + _ = list(minio_notification_watcher.file_generator(**s3_config)) + assert fake_minio.mock_calls[0][2]["credentials"] is not None + def test_generate_paths_with_pattern(patched_bucket_listener): # noqa """Test generating paths.""" @@ -71,13 +87,15 @@ def test_fix_times(): assert metadata["start_time"] < metadata["end_time"] -def test_publish_paths(patched_bucket_listener): # noqa +def test_publish_paths(patched_bucket_listener, caplog): # noqa """Test publishing paths.""" s3_config = dict(endpoint_url="someendpoint", bucket_name="viirs-data", storage_options=dict(profile="someprofile")) publisher_settings = dict(nameservers=False, port=1979) message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs")) + + caplog.set_level("INFO") with patched_publisher() as messages: with patched_bucket_listener(records): minio_notification_watcher.file_publisher(fs_config=s3_config, @@ -90,6 +108,7 @@ def test_publish_paths(patched_bucket_listener): # noqa assert message.data["sensor"] == "viirs" assert message.data["fs"] == ('{"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [], ' '"profile": "someprofile"}') + assert "Starting watch on 'viirs-data'" in caplog.text def test_publish_paths_with_pattern(patched_bucket_listener): # noqa @@ -110,6 +129,25 @@ def test_publish_paths_with_pattern(patched_bucket_listener): # noqa assert message.data["platform_name"] == "npp" +def test_publish_paths_with_pattern_and_aliases(patched_bucket_listener): # noqa + """Test publishing paths.""" + s3_config = dict(endpoint_url="someendpoint", + bucket_name="viirs-data", + file_pattern=sdr_file_pattern, + storage_options=dict(profile="someprofile")) + publisher_settings = dict(nameservers=False, port=1979) + message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs"), + aliases={"platform_name": {"npp": "Suomi-NPP"}}) + with patched_publisher() as messages: + with patched_bucket_listener(records): + minio_notification_watcher.file_publisher(fs_config=s3_config, + publisher_config=publisher_settings, + message_config=message_settings) + message = Message(rawstr=messages[0]) + assert message.data["sensor"] == "viirs" + assert message.data["platform_name"] == "Suomi-NPP" + + records = \ [{"Records": [{"awsRegion": "", "eventName": "s3:ObjectCreated:Put", diff --git a/tests/test_local_watcher.py b/tests/test_local_watcher.py index 864423f..730b2d5 100644 --- a/tests/test_local_watcher.py +++ b/tests/test_local_watcher.py @@ -75,7 +75,7 @@ def test_watchdog_generator_with_something_else(tmp_path): next(generator) -def test_publish_paths(tmp_path, patched_local_events): # noqa +def test_publish_paths(tmp_path, patched_local_events, caplog): # noqa """Test publishing paths.""" filename = os.fspath(tmp_path / "foo.txt") @@ -83,6 +83,7 @@ def test_publish_paths(tmp_path, patched_local_events): # noqa publisher_settings = dict(nameservers=False, port=1979) message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs")) + caplog.set_level("INFO") with patched_local_events([filename]): with patched_publisher() as messages: local_watcher.file_publisher(fs_config=local_settings, @@ -95,3 +96,4 @@ def test_publish_paths(tmp_path, patched_local_events): # noqa assert message.data["uri"] == f"file://{str(tmp_path)}/foo.txt" assert message.data["sensor"] == "viirs" assert "fs" not in message.data + assert f"Starting watch on '{local_settings['directory']}'" in caplog.text diff --git a/tests/test_main_interface.py b/tests/test_main_interface.py index 50a887f..a55ac43 100644 --- a/tests/test_main_interface.py +++ b/tests/test_main_interface.py @@ -89,6 +89,7 @@ def test_pass_config_to_file_publisher_for_spurious_backend(): with pytest.raises(ValueError, match="Unknown backend"): publish_from_config(config) + def test_cli(tmp_path, patched_local_events): # noqa """Test the command-line interface.""" local_settings = dict(directory=str(tmp_path)) @@ -109,3 +110,48 @@ def test_cli(tmp_path, patched_local_events): # noqa cli([str(config_file)]) assert len(msgs) == 1 assert str(filename) in msgs[0] + + +def test_cli_with_logging(tmp_path, patched_local_events): # noqa + """Test the command-line interface with logging.""" + local_settings = dict(directory=str(tmp_path)) + publisher_settings = dict(nameservers=False, port=1979) + message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs")) + config = dict(backend="local", + fs_config=local_settings, + publisher_config=publisher_settings, + message_config=message_settings) + + config_file = tmp_path / "config.yaml" + with open(config_file, "w") as fd: + fd.write(yaml.dump(config)) + + log_config_file = tmp_path / "log_config.yaml" + handler_name = "console123" + log_config = { + "version": 1, + "handlers": { + handler_name: { + "class": "logging.StreamHandler", + "level": "INFO", + }, + }, + "loggers": { + "": { + "level": "INFO", + "handlers": [handler_name], + }, + }, + } + with open(log_config_file, "w") as fd: + fd.write(yaml.dump(log_config)) + + import logging + + with patched_publisher(): + filename = tmp_path / "bla" + with patched_local_events([filename]): + cli([str(config_file), "-l", str(log_config_file)]) + root = logging.getLogger() + assert len(root.handlers) == 1 + assert root.handlers[0].name == handler_name