Skip to content

Commit

Permalink
Merge pull request #3 from mraspaud/add-codecov
Browse files Browse the repository at this point in the history
Add codecov, logging and aliasing
  • Loading branch information
mraspaud authored Apr 15, 2024
2 parents b4d79df + b6224e8 commit 99a40da
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12"]
env:
PYTHON_VERSION: ${{ matrix.python-version }}

steps:
- uses: actions/checkout@v3
Expand All @@ -35,4 +37,11 @@ jobs:
ruff check .
- name: Test with pytest
run: |
pytest
pytest --cov=pytroll_watchers tests --cov-report=xml
- name: Upload coverage reports to Codecov
uses: codecov/[email protected]
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: pytroll/pytroll-watchers
file: ./coverage.xml
env_vars: PYTHON_VERSION
30 changes: 30 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,36 @@ Welcome to pytroll-watchers's documentation!
:maxdepth: 2
:caption: Contents:

Pytroll-watcher is a library and command-line tool to detect changes on a local or remote file system.

At the moment we support local filesystems and Minio S3 buckets through bucket notifications.

CLI
***

The command-line tool can be used by invoking `pytroll-watcher <config-file>`. An example config-file can be::

backend: minio
fs_config:
endpoint_url: my_endpoint.pytroll.org
bucket_name: satellite-data-viirs
storage_options:
profile: profile_for_credentials
publisher_config:
name: viirs_watcher
message_config:
subject: /segment/viirs/l1b/
atype: file
data:
sensor: viirs
aliases:
platform_name:
npp: Suomi-NPP


API
***

Main interface
--------------
.. automodule:: pytroll_watchers
Expand Down
4 changes: 4 additions & 0 deletions src/pytroll_watchers/local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Either using OS-based envents (like inotify on linux), or polling.
"""
import logging
import os
from pathlib import Path
from urllib.parse import urlunparse
Expand All @@ -11,6 +12,8 @@
from pytroll_watchers.backends.local import listen_to_local_events
from pytroll_watchers.publisher import file_publisher_from_generator, parse_metadata

logger = logging.getLogger(__name__)


def file_publisher(fs_config, publisher_config, message_config):
"""Publish files coming from local filesystem events.
Expand All @@ -21,6 +24,7 @@ def file_publisher(fs_config, publisher_config, message_config):
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
"""
logger.info(f"Starting watch on '{fs_config['directory']}'")
generator = file_generator(**fs_config)
return file_publisher_from_generator(generator, publisher_config, message_config)

Expand Down
39 changes: 39 additions & 0 deletions src/pytroll_watchers/main_interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Main interface functions."""

import argparse
import logging.config

import yaml

Expand Down Expand Up @@ -60,6 +61,7 @@ def publish_from_config(config):
else:
raise ValueError(f"Unknown backend {config['backend']}")


def cli(args=None):
"""Command-line interface for pytroll-watchers."""
parser = argparse.ArgumentParser(
Expand All @@ -68,12 +70,49 @@ def cli(args=None):
epilog="Thanks for using pytroll-watchers!")

parser.add_argument("config", type=str, help="The yaml config file.")
parser.add_argument("-l", "--log-config", type=str, help="The yaml config file for logging.", default=None)

parsed = parser.parse_args(args)


log_config_filename = parsed.log_config
configure_logging(log_config_filename)

config_file = parsed.config

with open(config_file) as fd:
config_dict = yaml.safe_load(fd.read())

return publish_from_config(config_dict)


def configure_logging(log_config_filename):
"""Configure logging from a yaml file."""
if log_config_filename is not None:
with open(log_config_filename) as fd:
log_config = yaml.safe_load(fd.read())
else:
log_config = {
"version": 1,
"formatters": {
"pytroll": {
"format": "[%(asctime)s %(levelname)-8s %(name)s] %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "pytroll",
},
},
"disable_existing_loggers": False,
"loggers": {
"": {
"level": "INFO",
"handlers": ["console"],
"propagate": True
},
},
}
logging.config.dictConfig(log_config)
13 changes: 10 additions & 3 deletions src/pytroll_watchers/minio_notification_watcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Publish messages based on Minio bucket notifications."""

from logging import getLogger

from upath import UPath

from pytroll_watchers.publisher import file_publisher_from_generator, parse_metadata

logger = getLogger(__name__)


def file_publisher(fs_config, publisher_config, message_config):
"""Publish objects coming from bucket notifications.
Expand All @@ -14,6 +18,7 @@ def file_publisher(fs_config, publisher_config, message_config):
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
"""
logger.info(f"Starting watch on '{fs_config['bucket_name']}'")
generator = file_generator(**fs_config)
return file_publisher_from_generator(generator, publisher_config, message_config)

Expand Down Expand Up @@ -57,13 +62,15 @@ def file_generator(endpoint_url, bucket_name, file_pattern=None, storage_options
yield path, object_metadata


def _record_generator(endpoint_url, bucket_name, profile=None):
def _record_generator(endpoint_url, bucket_name, storage_options):
"""Generate records for new objects in the bucket."""
from minio import Minio
from minio.credentials.providers import AWSConfigProvider

if profile is not None:
credentials = AWSConfigProvider(profile=profile)
if "profile" in storage_options:
credentials = AWSConfigProvider(profile=storage_options["profile"])
else:
credentials = None

client = Minio(endpoint_url,
credentials=credentials
Expand Down
16 changes: 15 additions & 1 deletion src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,26 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
amended_message_config["data"]["uri"] = file_item.as_uri()
with suppress(AttributeError):
amended_message_config["data"]["fs"] = file_item.fs.to_json()

aliases = amended_message_config.pop("aliases", {})
apply_aliases(aliases, file_metadata)
amended_message_config["data"].update(file_metadata)
msg = Message(**amended_message_config)
publisher.send(str(msg))


def apply_aliases(aliases, metadata):
"""Apply aliases to the metadata.
Args:
aliases: a dict containing dicts for each key to be aliases. For example
`{"platform_name": {"npp": "Suomi-NPP}}` will replace the `platform_name` "npp" with "Suomi-NPP".
metadata: the metadata to fix
"""
for key, val in metadata.items():
if key in aliases:
metadata[key] = aliases[key].get(val, val)


def fix_times(info):
"""Fix times so that date and time components are combined, and start time is before end time."""
if "start_date" in info:
Expand Down
40 changes: 39 additions & 1 deletion tests/test_bucket_notification_watcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for the bucket notification watcher."""

import datetime
from unittest import mock

from posttroll.message import Message
from posttroll.testing import patched_publisher
Expand All @@ -26,6 +27,21 @@ def test_generate_paths(patched_bucket_listener): # noqa
assert path == UPath("s3://viirs-data/sdr/SVM13_npp_d20240408_t1006227_e1007469_b64498_c20240408102334392250_cspp_dev.h5",
profile=profile)

def test_generate_paths_uses_credentials_from_profile(patched_bucket_listener, monkeypatch): # noqa
"""Test generating paths."""
import minio
fake_minio = mock.MagicMock()
monkeypatch.setattr(minio, "Minio", fake_minio)

profile="someprofile"
s3_config = dict(endpoint_url="someendpoint",
bucket_name="viirs-data",
storage_options=dict(profile=profile))

with patched_bucket_listener(records):
_ = list(minio_notification_watcher.file_generator(**s3_config))
assert fake_minio.mock_calls[0][2]["credentials"] is not None


def test_generate_paths_with_pattern(patched_bucket_listener): # noqa
"""Test generating paths."""
Expand Down Expand Up @@ -71,13 +87,15 @@ def test_fix_times():
assert metadata["start_time"] < metadata["end_time"]


def test_publish_paths(patched_bucket_listener): # noqa
def test_publish_paths(patched_bucket_listener, caplog): # noqa
"""Test publishing paths."""
s3_config = dict(endpoint_url="someendpoint",
bucket_name="viirs-data",
storage_options=dict(profile="someprofile"))
publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs"))

caplog.set_level("INFO")
with patched_publisher() as messages:
with patched_bucket_listener(records):
minio_notification_watcher.file_publisher(fs_config=s3_config,
Expand All @@ -90,6 +108,7 @@ def test_publish_paths(patched_bucket_listener): # noqa
assert message.data["sensor"] == "viirs"
assert message.data["fs"] == ('{"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [], '
'"profile": "someprofile"}')
assert "Starting watch on 'viirs-data'" in caplog.text


def test_publish_paths_with_pattern(patched_bucket_listener): # noqa
Expand All @@ -110,6 +129,25 @@ def test_publish_paths_with_pattern(patched_bucket_listener): # noqa
assert message.data["platform_name"] == "npp"


def test_publish_paths_with_pattern_and_aliases(patched_bucket_listener): # noqa
"""Test publishing paths."""
s3_config = dict(endpoint_url="someendpoint",
bucket_name="viirs-data",
file_pattern=sdr_file_pattern,
storage_options=dict(profile="someprofile"))
publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs"),
aliases={"platform_name": {"npp": "Suomi-NPP"}})
with patched_publisher() as messages:
with patched_bucket_listener(records):
minio_notification_watcher.file_publisher(fs_config=s3_config,
publisher_config=publisher_settings,
message_config=message_settings)
message = Message(rawstr=messages[0])
assert message.data["sensor"] == "viirs"
assert message.data["platform_name"] == "Suomi-NPP"


records = \
[{"Records": [{"awsRegion": "",
"eventName": "s3:ObjectCreated:Put",
Expand Down
4 changes: 3 additions & 1 deletion tests/test_local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ def test_watchdog_generator_with_something_else(tmp_path):
next(generator)


def test_publish_paths(tmp_path, patched_local_events): # noqa
def test_publish_paths(tmp_path, patched_local_events, caplog): # noqa
"""Test publishing paths."""
filename = os.fspath(tmp_path / "foo.txt")

local_settings = dict(directory=tmp_path)
publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs"))

caplog.set_level("INFO")
with patched_local_events([filename]):
with patched_publisher() as messages:
local_watcher.file_publisher(fs_config=local_settings,
Expand All @@ -95,3 +96,4 @@ def test_publish_paths(tmp_path, patched_local_events): # noqa
assert message.data["uri"] == f"file://{str(tmp_path)}/foo.txt"
assert message.data["sensor"] == "viirs"
assert "fs" not in message.data
assert f"Starting watch on '{local_settings['directory']}'" in caplog.text
46 changes: 46 additions & 0 deletions tests/test_main_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def test_pass_config_to_file_publisher_for_spurious_backend():
with pytest.raises(ValueError, match="Unknown backend"):
publish_from_config(config)


def test_cli(tmp_path, patched_local_events): # noqa
"""Test the command-line interface."""
local_settings = dict(directory=str(tmp_path))
Expand All @@ -109,3 +110,48 @@ def test_cli(tmp_path, patched_local_events): # noqa
cli([str(config_file)])
assert len(msgs) == 1
assert str(filename) in msgs[0]


def test_cli_with_logging(tmp_path, patched_local_events): # noqa
"""Test the command-line interface with logging."""
local_settings = dict(directory=str(tmp_path))
publisher_settings = dict(nameservers=False, port=1979)
message_settings = dict(subject="/segment/viirs/l1b/", atype="file", data=dict(sensor="viirs"))
config = dict(backend="local",
fs_config=local_settings,
publisher_config=publisher_settings,
message_config=message_settings)

config_file = tmp_path / "config.yaml"
with open(config_file, "w") as fd:
fd.write(yaml.dump(config))

log_config_file = tmp_path / "log_config.yaml"
handler_name = "console123"
log_config = {
"version": 1,
"handlers": {
handler_name: {
"class": "logging.StreamHandler",
"level": "INFO",
},
},
"loggers": {
"": {
"level": "INFO",
"handlers": [handler_name],
},
},
}
with open(log_config_file, "w") as fd:
fd.write(yaml.dump(log_config))

import logging

with patched_publisher():
filename = tmp_path / "bla"
with patched_local_events([filename]):
cli([str(config_file), "-l", str(log_config_file)])
root = logging.getLogger()
assert len(root.handlers) == 1
assert root.handlers[0].name == handler_name

0 comments on commit 99a40da

Please sign in to comment.