diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index e0ca6558..266632c0 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -1,36 +1,86 @@
-name: Run tests
+name: CI
on:
- push
- pull_request
jobs:
- build:
- runs-on: ubuntu-latest
+ test:
+ runs-on: ${{ matrix.os }}
+ continue-on-error: ${{ matrix.experimental }}
strategy:
fail-fast: true
matrix:
- python-version: ["3.8", "3.9", "3.10"]
+ os: ["ubuntu-latest"]
+ python-version: ["3.9", "3.10", "3.11"]
experimental: [false]
+ include:
+ - python-version: "3.11"
+ os: "ubuntu-latest"
+ experimental: true
+
+ env:
+ PYTHON_VERSION: ${{ matrix.python-version }}
+ OS: ${{ matrix.os }}
+ UNSTABLE: ${{ matrix.experimental }}
+ ACTIONS_ALLOW_UNSECURE_COMMANDS: true
+
steps:
- name: Checkout source
uses: actions/checkout@v3
- - name: Set up Python ${{ matrix.python-version }}
- uses: actions/setup-python@v4
+
+ - name: Setup Conda Environment
+ uses: conda-incubator/setup-miniconda@v2
with:
+ miniconda-version: "latest"
python-version: ${{ matrix.python-version }}
- - name: Install dependencies
+ mamba-version: "*"
+ channels: conda-forge,defaults
+ environment-file: continuous_integration/environment.yaml
+ activate-environment: test-environment
+
+ - name: Install unstable dependencies
+ if: matrix.experimental == true
+ shell: bash -l {0}
run: |
- pip install -U pytest pytest-cov trollsift netifaces watchdog posttroll pyyaml pyinotify pyresample \
- pytroll-schedule s3fs fsspec python-dateutil paramiko
+ python -m pip install \
+ --no-deps --pre --upgrade \
+ numpy; \
+ python -m pip install \
+ --no-deps --upgrade \
+ git+https://github.com/pytroll/posttroll \
+ git+https://github.com/pytroll/pytroll-schedule \
+ git+https://github.com/pytroll/trollsift;
+
- name: Install pytroll-collectors
run: |
pip install --no-deps -e .
- - name: Run tests
+
+ - name: Run unit tests
+ shell: bash -l {0}
run: |
pytest --cov=pytroll_collectors pytroll_collectors/tests --cov-report=xml
- - name: Upload coverage to Codecov
+
+ - name: Upload unittest coverage to Codecov
uses: codecov/codecov-action@v3
with:
+ flags: unittests
file: ./coverage.xml
- env_vars: PYTHON_VERSION
+ env_vars: OS,PYTHON_VERSION,UNSTABLE
+ fail_ci_if_error: false
+
+ - name: Coveralls Parallel
+ uses: AndreMiras/coveralls-python-action@develop
+ with:
+ flag-name: run-${{ matrix.test_number }}
+ parallel: true
+ if: runner.os == 'Linux'
+
+ coveralls:
+ needs: [test]
+ runs-on: ubuntu-latest
+ steps:
+ - name: Coveralls Finished
+ uses: AndreMiras/coveralls-python-action@develop
+ with:
+ parallel-finished: true
diff --git a/bin/s3stalker.py b/bin/s3stalker.py
index bfc51f83..b749fd39 100644
--- a/bin/s3stalker.py
+++ b/bin/s3stalker.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2020 Martin Raspaud
+# Copyright (c) 2020, 2023 Martin Raspaud
# Author(s):
@@ -27,38 +27,17 @@
with a cronjob.
"""
-import argparse
import logging.config
-import yaml
-
-from pytroll_collectors.helper_functions import read_yaml
-from pytroll_collectors.s3stalker import publish_new_files
-
-
-def arg_parse():
- """Handle input arguments."""
- parser = argparse.ArgumentParser()
- parser.add_argument("bucket", help="The bucket to retrieve from.")
- parser.add_argument("config", help="Config file to be used")
- parser.add_argument("-l", "--log",
- help="Log configuration file",
- default=None)
-
- return parser.parse_args()
+from pytroll_collectors.s3stalker import publish_new_files, get_configs_from_command_line
def main():
"""Stalk an s3 bucket."""
- args = arg_parse()
-
- bucket = args.bucket
- config = read_yaml(args.config)
+ bucket, config, log_config = get_configs_from_command_line()
- if args.log is not None:
- with open(args.log) as fd:
- log_dict = yaml.safe_load(fd.read())
- logging.config.dictConfig(log_dict)
+ if log_config:
+ logging.config.dictConfig(log_config)
try:
publish_new_files(bucket, config)
diff --git a/bin/s3stalker_daemon.py b/bin/s3stalker_daemon.py
new file mode 100644
index 00000000..96f08055
--- /dev/null
+++ b/bin/s3stalker_daemon.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2022 Pytroll developers
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""S3stalker daemon/runner.
+
+This is a daemon supposed to stay up and running "forever". It will regularly
+fetch fresh filenames from an S3 object store and publish the urls of those new
+filenames. It is a daemon version of the s3stalker.py script which needs to be
+run as a cronjob.
+
+"""
+
+import argparse
+import logging
+import logging.config
+import sys
+
+from pytroll_collectors.s3stalker import get_configs_from_command_line
+from pytroll_collectors.s3stalker_daemon_runner import S3StalkerRunner
+
+logger = logging.getLogger(__name__)
+
+
+def arg_parse():
+ """Handle input arguments."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument("bucket", help="The bucket to retrieve from.")
+ parser.add_argument("config", help="Config file to be used")
+ parser.add_argument("-l", "--log",
+ help="Log configuration file",
+ default=None)
+
+ return parser.parse_args()
+
+
+def main():
+ """Stalk an s3 bucket."""
+ bucket, config, log_config = get_configs_from_command_line()
+
+ if log_config:
+ logging.config.dictConfig(log_config)
+
+ logger.info("Try start the s3-stalker runner:")
+ try:
+ s3runner = S3StalkerRunner(bucket, config)
+ s3runner.start()
+ s3runner.join()
+ except KeyboardInterrupt:
+ logger.debug("Interrupting")
+ except Exception as err:
+ logger.error('The S3 Stalker Runner crashed: %s', str(err))
+ sys.exit(1)
+ finally:
+ s3runner.close()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/continuous_integration/environment.yaml b/continuous_integration/environment.yaml
new file mode 100644
index 00000000..5caf386f
--- /dev/null
+++ b/continuous_integration/environment.yaml
@@ -0,0 +1,38 @@
+name: test-environment
+channels:
+ - conda-forge
+dependencies:
+ - distributed
+ - toolz
+ - sphinx
+ - pyyaml
+ - pyproj
+ - coveralls
+ - coverage
+ - codecov
+ - behave
+ - mock
+ - numpy
+ - pyresample
+ - pyorbital
+ - pycrs
+ - cartopy
+ - geopy
+ - freezegun
+ - paramiko
+ - responses
+ - netifaces
+ - watchdog
+ - s3fs
+ - pyinotify
+ - requests
+ - openssl
+ - pytz
+ - python-dateutil
+ - pytest
+ - pytest-cov
+ - pip
+ - pip:
+ - trollsift
+ - posttroll
+ - pytroll-schedule
diff --git a/doc/source/index.rst b/doc/source/index.rst
index 190082f3..a22264a0 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -475,6 +475,26 @@ trollstalker2
New, alternative implementation of trollstalker. Not really needed,
as trollstalker works fine and is actively maintained.
+
+s3stalker
+^^^^^^^^^
+
+A counterpart to trollstalker for polling for new files on an s3 bucket.
+This is thought to be run regularly from eg. cron. For a daemon version of
+this, check the next item.
+Example configuration:
+https://github.com/pytroll/pytroll-collectors/blob/main/examples/s3stalker.yaml
+
+s3stalker_daemon
+^^^^^^^^^^^^^^^^
+
+The daemon version of s3stalker, that stays on and polls until stopped
+(preferably with a SIGTERM).
+Example configuration:
+https://github.com/pytroll/pytroll-collectors/blob/main/examples/s3stalker_runner.yaml_template
+
+See also https://s3fs.readthedocs.io/en/latest/#credentials on options how to define the S3 credentials.
+
zipcollector_runner
^^^^^^^^^^^^^^^^^^^
diff --git a/examples/geographic_gatherer_config.ini_template b/examples/geographic_gatherer_config.ini_template
index 145b798a..bf8b70e1 100644
--- a/examples/geographic_gatherer_config.ini_template
+++ b/examples/geographic_gatherer_config.ini_template
@@ -20,7 +20,9 @@ sensor = viirs
timeliness = 10
# duration of a granule in SECONDS
duration = 180
-publish_topic =
+publish_topic =
+# The topics to listen for:
+topics = /viirs/sdr/1
[ears_viirs]
pattern = /data/prod/satellit/ears/viirs/SVMC_{platform}_d{start_date:%Y%m%d}_t{start_time:%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit_number:5d}_c{proctime:%Y%m%d%H%M%S%f}_eum_ops.h5.bz2
@@ -32,7 +34,9 @@ sensor = viirs
timeliness = 30
duration = 85.4
variant = EARS
-publish_topic =
+publish_topic =
+# The topics to listen for:
+topics = /ears/viirs/sdr/1
[ears_noaa18_avhrr]
pattern = /data/prod/satellit/ears/avhrr/avhrr_{start_time:%Y%m%d_%H%M%S}_noaa18.hrp.bz2
@@ -44,7 +48,8 @@ platform_name = NOAA-18
sensor = avhrr/3
timeliness = 15
variant = EARS
-publish_topic =
+publish_topic =
+topics = /ears/avhrr/hrpt/1
[ears_noaa19_avhrr]
pattern = /data/prod/satellit/ears/avhrr/avhrr_{start_time:%Y%m%d_%H%M%S}_noaa18.hrp.bz2
@@ -56,7 +61,8 @@ platform_name = NOAA-18
sensor = avhrr/3
timeliness = 15
variant = EARS
-publish_topic =
+publish_topic =
+topics = /ears/avhrr/hrpt/1
[ears_metop-b]
pattern = /data/prod/satellit/ears/avhrr/AVHR_HRP_{data_processing_level:2s}_M01_{start_time:%Y%m%d%H%M%S}Z_{end_time:%Y%m%d%H%M%S}Z_N_O_{proc_time:%Y%m%d%H%M%S}Z.bz2
@@ -67,7 +73,8 @@ sensor = avhrr/3
timeliness = 15
level = 0
variant = EARS
-publish_topic =
+publish_topic =
+topics = /ears/avhrr/metop/eps/1
[ears_metop-a]
pattern = /data/prod/satellit/ears/avhrr/AVHR_HRP_{data_processing_level:2s}_M02_{start_time:%Y%m%d%H%M%S}Z_{end_time:%Y%m%d%H%M%S}Z_N_O_{proc_time:%Y%m%d%H%M%S}Z.bz2
@@ -79,6 +86,7 @@ timeliness = 15
level = 0
variant = EARS
publish_topic = /EARS/Metop-B
+topics = /ears/avhrr/metop/eps/1
[gds_metop-b]
pattern = /data/prod/satellit/metop2/AVHR_xxx_{data_processing_level:2s}_M01_{start_time:%Y%m%d%H%M%S}Z_{end_time:%Y%m%d%H%M%S}Z_N_O_{proc_time:%Y%m%d%H%M%S}Z
@@ -90,6 +98,7 @@ timeliness = 100
variant = GDS
orbit_type = polar
publish_topic = /GDS/Metop-B
+topics = /gds/avhrr/metop/eps/1
[gds_metop-a]
pattern = /data/prod/satellit/metop2/AVHR_xxx_{level:2s}_M02_{start_time:%Y%m%d%H%M%S}Z_{end_time:%Y%m%d%H%M%S}Z_N_O_{proc_time:%Y%m%d%H%M%S}Z
@@ -100,6 +109,7 @@ sensor = avhrr/3
timeliness = 100
variant = GDS
publish_topic = /GDS/Metop-A
+topics = /gds/avhrr/metop/eps/1
[EARS_terra]
pattern = /data/prod/satellit/modis/lvl1/thin_MOD021KM.A{start_time:%Y%j.%H%M}.005.{proc_time:%Y%j%H%M%S}.NRT.hdf
@@ -111,6 +121,7 @@ sensor = modis
timeliness = 180
duration = 300
variant = EARS
+topics = /ears/modis/hdf4/1
[EARS_aqua]
pattern = /data/prod/satellit/modis/lvl1/thin_MYD021KM.A{start_time:%Y%j.%H%M}.005.{proc_time:%Y%j%H%M%S}.NRT.hdf
@@ -122,3 +133,4 @@ sensor = modis
timeliness = 180
duration = 300
variant = EARS
+topics = /ears/modis/hdf4/1
diff --git a/examples/s3stalker_log.yaml_template b/examples/s3stalker_log.yaml_template
new file mode 100644
index 00000000..af08e00a
--- /dev/null
+++ b/examples/s3stalker_log.yaml_template
@@ -0,0 +1,17 @@
+version: 1
+disable_existing_loggers: false
+formatters:
+ pytroll:
+ format: '[%(asctime)s %(levelname)-8s %(name)s] %(message)s'
+handlers:
+ console:
+ class: logging.StreamHandler
+ level: DEBUG
+ formatter: pytroll
+ stream: ext://sys.stdout
+loggers:
+ posttroll:
+ level: INFO
+root:
+ level: DEBUG
+ handlers: [console]
\ No newline at end of file
diff --git a/examples/s3stalker_runner.yaml_template b/examples/s3stalker_runner.yaml_template
new file mode 100644
index 00000000..79f47f95
--- /dev/null
+++ b/examples/s3stalker_runner.yaml_template
@@ -0,0 +1,13 @@
+s3_kwargs:
+ anon: False
+ client_kwargs:
+ endpoint_url: 'https://zzz.yyy.xx'
+ aws_access_key_id: 'my_access_key'
+ aws_secret_access_key: 'my_secret_access_key'
+timedelta:
+ minutes: 2
+subject:
+ /atms/sdr/mystation
+file_pattern: GATMO_{platform_name:3s}_d{start_time:%Y%m%d_t%H%M%S}{frac:1s}_e{end_time:%H%M%S}{frac_end:1s}_b{orbit_number:5s}_c{process_time:20s}_cspp_dev.h5
+publisher:
+ name: s3stalker_runner
diff --git a/pytroll_collectors/s3stalker.py b/pytroll_collectors/s3stalker.py
index 31000d76..c2d2aad7 100644
--- a/pytroll_collectors/s3stalker.py
+++ b/pytroll_collectors/s3stalker.py
@@ -1,11 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2020 Martin Raspaud
-
-# Author(s):
-
-# Martin Raspaud
+# Copyright (c) 2020 - 2023 Pytroll developers
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -19,19 +15,36 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Module to find new files on an s3 bucket."""
-
+"""Module to find new files on an s3 bucket.
+
+The contents of the yaml configuration file should look like this::
+
+ s3_kwargs:
+ anon: false
+ client_kwargs:
+ aws_access_key_id: my_accesskey
+ aws_secret_access_key: my_secret_key
+ endpoint_url: https://xxx.yyy.zz
+ fetch_back_to:
+ hours: 20
+ file_pattern: '{platform_name:3s}_OL_2_{datatype_id:_<6s}_{start_time:%Y%m%dT%H%M%S}_{end_time:%Y%m%dT%H%M%S}_{creation_time:%Y%m%dT%H%M%S}_{duration:4d}_{cycle:3d}_{relative_orbit:3d}_{frame:4d}_{centre:3s}_{mode:1s}_{timeliness:2s}_{collection:3s}.zip'
+ subject: /segment/2/safe-olci/S3/
+""" # noqa
+import argparse
import logging
import posixpath
-from datetime import datetime, timedelta
import time
from contextlib import contextmanager
+from datetime import datetime, timedelta
+
import s3fs
+import yaml
from dateutil import tz
from posttroll.publisher import Publish
from trollsift import Parser
from pytroll_collectors.fsspec_to_message import filelist_unzip_to_messages
+from pytroll_collectors.helper_functions import read_yaml
logger = logging.getLogger(__name__)
@@ -43,6 +56,7 @@ def sleeper(duration):
yield
end_time = datetime.utcnow()
waiting_time = duration - (end_time - start_time).total_seconds()
+ logger.debug('waiting time: %f', waiting_time)
time.sleep(max(waiting_time, 0))
@@ -52,9 +66,19 @@ class DatetimeHolder:
last_fetch = datetime.now(tz.UTC) - timedelta(hours=12)
-def get_last_files(path, *args, pattern=None, **kwargs):
+def set_last_fetch(timestamp):
+ """Set the last fetch time."""
+ DatetimeHolder.last_fetch = timestamp
+
+
+def get_last_fetch():
+ """Get the last fetch time."""
+ return DatetimeHolder.last_fetch
+
+
+def get_last_files(path, pattern=None, **s3_kwargs):
"""Get the last files from path (s3 bucket and directory)."""
- fs = s3fs.S3FileSystem(*args, **kwargs)
+ fs = s3fs.S3FileSystem(**s3_kwargs)
files = _get_files_since_last_fetch(fs, path)
files = _match_files_to_pattern(files, path, pattern)
_reset_last_fetch_from_file_list(files)
@@ -69,7 +93,8 @@ def _reset_last_fetch_from_file_list(files):
def _get_files_since_last_fetch(fs, path):
files = fs.ls(path, detail=True)
- files = list(filter((lambda x: x['LastModified'] > DatetimeHolder.last_fetch), files))
+ logger.debug(f"Get files since {get_last_fetch()}")
+ files = list(filter((lambda x: x['LastModified'] > get_last_fetch()), files))
return files
@@ -88,23 +113,54 @@ def _match_files_to_pattern(files, path, pattern):
return files
-def set_last_fetch(timestamp):
- """Set the last fetch time."""
- DatetimeHolder.last_fetch = timestamp
-
-
-def publish_new_files(bucket, config):
+def publish_new_files(bucket, config, publisher_ready_time=2.5):
"""Publish files newly arrived in bucket."""
+ time_back = config.pop('fetch_back_to')
+ set_last_fetch(datetime.now(tz.UTC) - timedelta(**time_back))
with Publish("s3_stalker") as pub:
- time_back = config['timedelta']
- subject = config['subject']
- pattern = config.get('file_pattern')
- with sleeper(2.5):
- set_last_fetch(datetime.now(tz.UTC) - timedelta(**time_back))
- s3_kwargs = config['s3_kwargs']
- fs, files = get_last_files(bucket, pattern=pattern, **s3_kwargs)
- messages = filelist_unzip_to_messages(fs, files, subject)
-
+ with sleeper(publisher_ready_time):
+ messages = create_messages_for_recent_files(bucket, config)
for message in messages:
logger.info("Publishing %s", str(message))
pub.send(str(message))
+
+
+def create_messages_for_recent_files(bucket, config):
+ """Create messages for recent files and return."""
+ logger.debug("Create messages for recent files...")
+
+ pattern = config.get('file_pattern')
+ s3_kwargs = config['s3_kwargs']
+ fs_, files = get_last_files(bucket, pattern=pattern, **s3_kwargs)
+
+ subject = config['subject']
+ messages = filelist_unzip_to_messages(fs_, files, subject)
+ return messages
+
+
+def arg_parse(args=None):
+ """Handle input arguments."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument("bucket", nargs="?", help="The bucket to retrieve from.")
+ parser.add_argument("config", help="Config file to be used")
+ parser.add_argument("-l", "--log",
+ help="Log configuration file",
+ default=None)
+
+ return parser.parse_args(args)
+
+
+def get_configs_from_command_line(command_line_args=None):
+ """Split command line arguments to parameters."""
+ args = arg_parse(command_line_args)
+ config = read_yaml(args.config)
+
+ bucket = args.bucket or config["s3_bucket"]
+
+ log_config_file = args.log
+ if log_config_file is not None:
+ with open(log_config_file) as fd:
+ log_config = yaml.safe_load(fd.read())
+ else:
+ log_config = {}
+ return bucket, config, log_config
diff --git a/pytroll_collectors/s3stalker_daemon_runner.py b/pytroll_collectors/s3stalker_daemon_runner.py
new file mode 100644
index 00000000..29875045
--- /dev/null
+++ b/pytroll_collectors/s3stalker_daemon_runner.py
@@ -0,0 +1,96 @@
+# Copyright (c) 2020 - 2023 Pytroll developers
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+"""S3stalker daemon.
+
+The contents of the yaml configuration file should look like this::
+
+ s3_kwargs:
+ anon: false
+ client_kwargs:
+ aws_access_key_id: my_accesskey
+ aws_secret_access_key: my_secret_key
+ endpoint_url: https://xxx.yyy.zz
+ fetch_back_to:
+ hours: 20
+ polling_interval:
+ minutes: 2
+ file_pattern: '{platform_name:3s}_OL_2_{datatype_id:_<6s}_{start_time:%Y%m%dT%H%M%S}_{end_time:%Y%m%dT%H%M%S}_{creation_time:%Y%m%dT%H%M%S}_{duration:4d}_{cycle:3d}_{relative_orbit:3d}_{frame:4d}_{centre:3s}_{mode:1s}_{timeliness:2s}_{collection:3s}.zip'
+ subject: /segment/2/safe-olci/S3/
+""" # noqa
+import signal
+from datetime import timedelta, datetime
+from threading import Thread
+
+from dateutil.tz import UTC
+
+from posttroll.publisher import create_publisher_from_dict_config
+
+from pytroll_collectors.s3stalker import logger, create_messages_for_recent_files, set_last_fetch, sleeper
+
+
+class S3StalkerRunner(Thread):
+ """Runner for stalking for new files in an S3 object store."""
+
+ def __init__(self, bucket, config, publisher_ready_time=2.5):
+ """Initialize the S3Stalker runner class."""
+ super().__init__()
+
+ self.bucket = bucket
+ fetch_back_time = timedelta(**config.pop("fetch_back_to"))
+
+ self._wait_seconds = timedelta(**config.pop('polling_interval')).total_seconds()
+
+ self.config = config
+
+ self._publisher_ready_time = publisher_ready_time
+ self._publisher = None
+ self.loop = True
+ self._set_signal_shutdown()
+
+ last_fetch_time = datetime.now(UTC) - fetch_back_time
+ set_last_fetch(last_fetch_time)
+
+ def _set_signal_shutdown(self):
+ """Set a signal to handle shutdown."""
+ signal.signal(signal.SIGTERM, self.close)
+
+ def _start_communication(self):
+ """Set up the Posttroll communication and start the publisher."""
+ self._publisher = create_publisher_from_dict_config(self.config['publisher'])
+ with sleeper(self._publisher_ready_time):
+ self._publisher.start()
+
+ def run(self):
+ """Start the s3-stalker daemon/runner in a thread."""
+ logger.info("Starting up s3stalker.")
+ self._start_communication()
+
+ while self.loop:
+ with sleeper(self._wait_seconds):
+ self._fetch_bucket_content_and_publish_new_files()
+
+ def _fetch_bucket_content_and_publish_new_files(self):
+ """Go through all messages in list and publish them one after the other."""
+ messages = create_messages_for_recent_files(self.bucket, self.config)
+ for message in messages:
+ logger.info("Publishing %s", str(message))
+ self._publisher.send(str(message))
+
+ def close(self, *args, **kwargs):
+ """Shutdown the S3Stalker runner."""
+ logger.info('Terminating the S3 Stalker daemon/runner.')
+ self.loop = False
+ if self._publisher:
+ self._publisher.stop()
diff --git a/pytroll_collectors/scisys.py b/pytroll_collectors/scisys.py
index a6d26be8..24a82e74 100755
--- a/pytroll_collectors/scisys.py
+++ b/pytroll_collectors/scisys.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
-# Copyright (c) 2012 - 2021 Pytroll developers
+# Copyright (c) 2012 - 2022 Pytroll developers
#
# Author(s):
#
@@ -140,9 +140,9 @@ def get(self, key, default=None):
"""Get a pass."""
utctime, satellite = key
for (rectime, recsat), val in self.iter():
- if(recsat == satellite and
- (abs(rectime - utctime)).seconds < 30 * 60 and
- (abs(rectime - utctime)).days == 0):
+ if (recsat == satellite and
+ (abs(rectime - utctime)).seconds < 30 * 60 and
+ (abs(rectime - utctime)).days == 0):
return val
return default
@@ -522,13 +522,13 @@ def receive_from_zmq(host, port, station, environment, excluded_platforms,
continue
if topic_postfix is not None:
subject = "/".join(("", to_send['format'],
- to_send['data_processing_level'],
- topic_postfix))
+ to_send['data_processing_level'],
+ topic_postfix))
else:
subject = "/".join(("", to_send['format'],
- to_send['data_processing_level'],
- station, environment,
- "polar", "direct_readout"))
+ to_send['data_processing_level'],
+ station, environment,
+ "polar", "direct_readout"))
logger.debug("Subject: %s", str(subject))
msg = Message(subject,
"file",
diff --git a/pytroll_collectors/tests/test_fsspec_to_message.py b/pytroll_collectors/tests/test_fsspec_to_message.py
index fbd342dc..2190f4e9 100644
--- a/pytroll_collectors/tests/test_fsspec_to_message.py
+++ b/pytroll_collectors/tests/test_fsspec_to_message.py
@@ -296,6 +296,7 @@ def test_remote_message_cannot_contain_password(self, tmp_path):
"password": password,
"protocol": protocol,
"port": port}
+
with pytest.raises(RuntimeError):
_ = extract_local_files_to_message_for_remote_use(filename, topic,
target_options=target_options)
diff --git a/pytroll_collectors/tests/test_s3stalker.py b/pytroll_collectors/tests/test_s3stalker.py
index d29b7c2d..bd97b36b 100644
--- a/pytroll_collectors/tests/test_s3stalker.py
+++ b/pytroll_collectors/tests/test_s3stalker.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2020 Martin Raspaud
+# Copyright (c) 2020, 2022, 2023 Martin Raspaud
# Author(s):
@@ -22,13 +22,19 @@
"""Tests for s3stalker."""
import datetime
-import unittest
-from unittest import mock
+from contextlib import contextmanager
from copy import deepcopy
+from unittest import mock
+import os
+import pytest
+import yaml
from dateutil.tz import tzutc
+from freezegun import freeze_time
import pytroll_collectors.fsspec_to_message
+from pytroll_collectors.s3stalker import arg_parse
+from pytroll_collectors.s3stalker import get_configs_from_command_line
subject = "/my/great/subject/"
@@ -115,26 +121,29 @@
{
'Key': 'sentinel-s3-ol2wfr-zips/2020/11/21/S3B_OL_2_WFR____20201121T104211_20201121T104448_20201121T131528_0157_046_051_1980_MAR_O_NR_002.zip', # noqa
'LastModified': datetime.datetime(2020, 11, 21, 14, 1, 8, 996000, tzinfo=tzutc()),
- 'ETag': '"0251f232014b24bacc3d45c10c4c0df2-38"', 'Size': 315207067, 'StorageClass': 'STANDARD',
+ 'ETag': '"0251f232014b24bacc3d45c10c4c0df2-38"', 'Size': 315207067,
+ 'StorageClass': 'STANDARD',
'type': 'file', 'size': 315207067,
'name': 'sentinel-s3-ol2wfr-zips/2020/11/21/S3B_OL_2_WFR____20201121T104211_20201121T104448_20201121T131528_0157_046_051_1980_MAR_O_NR_002.zip'}, # noqa
{
'Key': 'sentinel-s3-ol2wfr-zips/2020/11/21/S3B_OL_2_WFR____20201121T104211_20201121T104448_20201122T192710_0157_046_051_1980_MAR_O_NT_002.md5', # noqa
'LastModified': datetime.datetime(2020, 11, 22, 21, 1, 5, 181000, tzinfo=tzutc()),
- 'ETag': '"c5e3e19f37b3670d4b4792d430e2a3a6"', 'Size': 32, 'StorageClass': 'STANDARD', 'type': 'file',
+ 'ETag': '"c5e3e19f37b3670d4b4792d430e2a3a6"', 'Size': 32,
+ 'StorageClass': 'STANDARD', 'type': 'file',
'size': 32,
'name': 'sentinel-s3-ol2wfr-zips/2020/11/21/S3B_OL_2_WFR____20201121T104211_20201121T104448_20201122T192710_0157_046_051_1980_MAR_O_NT_002.md5'}, # noqa
{
'Key': 'sentinel-s3-ol2wfr-zips/2020/11/21/S3B_OL_2_WFR____20201121T104211_20201121T104448_20201122T192710_0157_046_051_1980_MAR_O_NT_002.zip', # noqa
'LastModified': datetime.datetime(2020, 11, 22, 21, 1, 5, 99000, tzinfo=tzutc()),
- 'ETag': '"d043d88fe0f2b27f58e4993fef8017d1-38"', 'Size': 314382569, 'StorageClass': 'STANDARD',
+ 'ETag': '"d043d88fe0f2b27f58e4993fef8017d1-38"', 'Size': 314382569,
+ 'StorageClass': 'STANDARD',
'type': 'file', 'size': 314382569,
'name': 'sentinel-s3-ol2wfr-zips/2020/11/21/S3B_OL_2_WFR____20201121T104211_20201121T104448_20201122T192710_0157_046_051_1980_MAR_O_NT_002.zip'}] # noqa
fs_json = '{"cls": "s3fs.core.S3FileSystem", "protocol": "s3", "args": [], "anon": true}'
-zip_json = '{"cls": "fsspec.implementations.zip.ZipFileSystem", "protocol": "abstract", "args": ["sentinel-s3-ol2wfr-zips/2020/11/21/S3A_OL_2_WFR____20201121T075933_20201121T080210_20201121T103050_0157_065_192_1980_MAR_O_NR_002.zip"], "target_protocol": "s3", "target_options": {"anon": true, "client_kwargs": {}}}' # noqa
-zip_json_fo = '{"cls": "fsspec.implementations.zip.ZipFileSystem", "protocol": "abstract", "fo": "sentinel-s3-ol2wfr-zips/2020/11/21/S3A_OL_2_WFR____20201121T075933_20201121T080210_20201121T103050_0157_065_192_1980_MAR_O_NR_002.zip", "target_protocol": "s3", "target_options": {"anon": true, "client_kwargs": {}}}' # noqa
+zip_json = '{"cls": "fsspec.implementations.zip.ZipFileSystem", "protocol": "abstract", "args": ["sentinel-s3-ol2wfr-zips/2020/11/21/S3A_OL_2_WFR____20201121T075933_20201121T080210_20201121T103050_0157_065_192_1980_MAR_O_NR_002.zip"], "target_protocol": "s3", "target_options": {"anon": true, "client_kwargs": {}}}' # noqa
+zip_json_fo = '{"cls": "fsspec.implementations.zip.ZipFileSystem", "protocol": "abstract", "fo": "sentinel-s3-ol2wfr-zips/2020/11/21/S3A_OL_2_WFR____20201121T075933_20201121T080210_20201121T103050_0157_065_192_1980_MAR_O_NR_002.zip", "target_protocol": "s3", "target_options": {"anon": true, "client_kwargs": {}}}' # noqa
zip_content = {
'S3A_OL_2_WFR____20201121T075933_20201121T080210_20201121T103050_0157_065_192_1980_MAR_O_NR_002.SEN3/Oa01_reflectance.nc': { # noqa
@@ -907,37 +916,33 @@
'type': 'file'}}
-class TestLastFilesGetter(unittest.TestCase):
+class TestLastFilesGetter:
"""Test case for files getter."""
- def setUp(self):
+ def setup_method(self):
"""Set up the test case."""
from pytroll_collectors import s3stalker
s3stalker.set_last_fetch(datetime.datetime(2000, 1, 1, 0, 0, tzinfo=tzutc()))
- self.ls_output = deepcopy(ls_output)
@mock.patch('s3fs.S3FileSystem')
def test_get_last_files_returns_files(self, s3_fs):
"""Test files are returned."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = self.ls_output
- fs, files = s3stalker.get_last_files(path, anon=True)
- assert list(files) == self.ls_output
+ fs, files = get_last_files_from_stalker()
+ assert list(files) == ls_output
@mock.patch('s3fs.S3FileSystem')
def test_get_last_files_returns_incrementally(self, s3_fs):
"""Test files are newer than epoch."""
from pytroll_collectors import s3stalker
path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- sorted_output = sorted(self.ls_output, key=(lambda x: x['LastModified']))
+ sorted_output = sorted(ls_output, key=(lambda x: x['LastModified']))
s3_fs.return_value.ls.return_value = sorted_output[:8]
fs, files = s3stalker.get_last_files(path, anon=True)
assert len(files) == 8
fetch_date = sorted_output[7]['LastModified']
- s3_fs.return_value.ls.return_value = self.ls_output
+ s3_fs.return_value.ls.return_value = ls_output
fs, newer_files = s3stalker.get_last_files(path, anon=True)
assert all(new_file['LastModified'] > fetch_date for new_file in newer_files)
assert len(newer_files) == 8
@@ -945,36 +950,37 @@ def test_get_last_files_returns_incrementally(self, s3_fs):
@mock.patch('s3fs.S3FileSystem')
def test_get_last_files_returns_fs(self, s3_fs):
"""Test fs is returned."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.to_json.return_value = fs_json
- fs, files = s3stalker.get_last_files(path, anon=True)
+ fs, files = get_last_files_from_stalker()
assert fs.to_json() == fs_json
@mock.patch('s3fs.S3FileSystem')
def test_get_last_files_filters_according_to_pattern(self, s3_fs):
"""Test fs is returned."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21/"
- s3_fs.return_value.ls.return_value = self.ls_output
- fs, files = s3stalker.get_last_files(path, pattern=zip_pattern, anon=True)
+ fs, files = get_last_files_from_stalker(pattern=zip_pattern)
assert len(list(files)) == len(ls_output) / 2
- @mock.patch('s3fs.S3FileSystem')
- def test_get_last_files_with_pattern_add_metadata(self, s3_fs):
+ def test_get_last_files_with_pattern_add_metadata(self):
"""Test fs is returned."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21/"
- s3_fs.return_value.ls.return_value = self.ls_output
- fs, files = s3stalker.get_last_files(path, pattern=zip_pattern, anon=True)
+ fs, files = get_last_files_from_stalker(pattern=zip_pattern)
assert 'metadata' in files[0]
assert files[0]['metadata']['platform_name'] == 'S3A'
-class TestFileListToMessages(unittest.TestCase):
+def get_last_files_from_stalker(**s3_kwargs):
+ """Get the last files using an instantiated stalker."""
+ with mock.patch('s3fs.S3FileSystem') as s3_fs:
+ from pytroll_collectors import s3stalker
+ path = "sentinel-s3-ol2wfr-zips/2020/11/21/"
+ s3_fs.return_value.to_json.return_value = fs_json
+ s3_fs.return_value.ls.return_value = deepcopy(ls_output)
+ fs, files = s3stalker.get_last_files(path, anon=True, **s3_kwargs)
+ return fs, files
+
+
+class TestFileListToMessages:
"""Test case for filelist_to_messages."""
- def setUp(self):
+ def setup_method(self):
"""Set up the test case."""
from pytroll_collectors import s3stalker
s3stalker.set_last_fetch(datetime.datetime(2000, 1, 1, 0, 0, tzinfo=tzutc()))
@@ -982,131 +988,219 @@ def setUp(self):
@mock.patch('s3fs.S3FileSystem')
def test_file_list_to_messages_returns_right_number_of_messages(self, s3_fs):
"""Test the right number of messages is returned."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = ls_output
- s3_fs.return_value.to_json.return_value = fs_json
- fs, files = s3stalker.get_last_files(path, anon=True)
+ fs, files = get_last_files_from_stalker()
message_list = pytroll_collectors.fsspec_to_message.filelist_to_messages(fs, files, subject)
assert len(message_list) == len(files)
@mock.patch('s3fs.S3FileSystem')
def test_file_list_to_messages_returns_messages_containing_uris(self, s3_fs):
"""Test uris are in the messages."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = ls_output
- s3_fs.return_value.to_json.return_value = fs_json
- fs, files = s3stalker.get_last_files(path, anon=True)
+ fs, files = get_last_files_from_stalker()
message_list = pytroll_collectors.fsspec_to_message.filelist_to_messages(fs, files, subject)
assert 'uri' in message_list[0].data
-class TestFileListUnzipToMessages(unittest.TestCase):
+class TestFileListUnzipToMessages:
"""Test filelist_unzip_to_messages."""
- def setUp(self):
+ def setup_method(self):
"""Set up the test case."""
from pytroll_collectors import s3stalker
s3stalker.set_last_fetch(datetime.datetime(2000, 1, 1, 0, 0, tzinfo=tzutc()))
- @mock.patch('s3fs.S3FileSystem')
- @mock.patch('pytroll_collectors.fsspec_to_message.get_filesystem_class')
- def test_file_list_unzip_to_messages_returns_right_number_of_messages(self, zip_fs, s3_fs):
+ def test_file_list_unzip_to_messages_returns_right_number_of_messages(self):
"""Test there are as many messages as files."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = ls_output
- s3_fs.return_value.to_json.return_value = fs_json
- fs, files = s3stalker.get_last_files(path, anon=True)
- message_list = pytroll_collectors.fsspec_to_message.filelist_unzip_to_messages(fs, files, subject)
+ fs, files = get_last_files_from_stalker()
+ message_list = filelist_unzip_to_messages(fs, files)
assert len(message_list) == len(files)
- @mock.patch('s3fs.S3FileSystem')
- @mock.patch('pytroll_collectors.fsspec_to_message.get_filesystem_class')
- def test_file_list_unzip_to_messages_returns_messages_with_datasets_when_zip_file_is_source(self, zip_fs, s3_fs):
+ def test_file_list_unzip_to_messages_returns_messages_with_datasets_when_zip_file_is_source(self):
"""Test messages are of type dataset."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = ls_output
- s3_fs.return_value.to_json.return_value = fs_json
- fs, files = s3stalker.get_last_files(path, anon=True)
- message_list = pytroll_collectors.fsspec_to_message.filelist_unzip_to_messages(fs, files, subject)
+ fs, files = get_last_files_from_stalker()
+ message_list = filelist_unzip_to_messages(fs, files)
assert 'dataset' in message_list[1].data
- @mock.patch('s3fs.S3FileSystem')
- @mock.patch('pytroll_collectors.fsspec_to_message.get_filesystem_class')
- def test_file_list_unzip_to_messages_returns_messages_with_right_amount_of_files(self, zip_fs, s3_fs):
+ def test_file_list_unzip_to_messages_returns_messages_with_right_amount_of_files(self):
"""Test right amount of files is present in messages."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = ls_output
- s3_fs.return_value.to_json.return_value = fs_json
- zip_fs.return_value.return_value.find.return_value = zip_content
- zip_fs.return_value.return_value.to_json.return_value = zip_json
- fs, files = s3stalker.get_last_files(path, anon=True)
- message_list = pytroll_collectors.fsspec_to_message.filelist_unzip_to_messages(fs, files, subject)
+ fs, files = get_last_files_from_stalker()
+ message_list = filelist_unzip_to_messages(fs, files)
exp_file_list = [file['name'] for file in zip_content.values()]
file_list = [file['uri'] for file in message_list[1].data['dataset']]
assert len(file_list) == len(exp_file_list)
- @mock.patch('s3fs.S3FileSystem')
- @mock.patch('pytroll_collectors.fsspec_to_message.get_filesystem_class')
- def test_file_list_unzip_to_messages_returns_messages_with_list_of_zip_content_in_uid(self, zip_fs, s3_fs):
+ def test_file_list_unzip_to_messages_returns_messages_with_list_of_zip_content_in_uid(self):
"""Test zip content is included in messages."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = ls_output
- s3_fs.return_value.to_json.return_value = fs_json
- zip_fs.return_value.return_value.find.return_value = zip_content
- zip_fs.return_value.return_value.to_json.return_value = zip_json
- fs, files = s3stalker.get_last_files(path, anon=True)
- message_list = pytroll_collectors.fsspec_to_message.filelist_unzip_to_messages(fs, files, subject)
+ fs, files = get_last_files_from_stalker()
+ message_list = filelist_unzip_to_messages(fs, files)
exp_file_list = ['zip://' + file['name'] for file in zip_content.values()]
file_list = [file['uid'] for file in message_list[1].data['dataset']]
assert file_list == exp_file_list
- @mock.patch('s3fs.S3FileSystem')
- @mock.patch('pytroll_collectors.fsspec_to_message.get_filesystem_class')
- def test_file_list_unzip_to_messages_returns_messages_with_list_of_zip_content_in_uri(self, zip_fs, s3_fs):
+ def test_file_list_unzip_to_messages_returns_messages_with_list_of_zip_content_in_uri(self):
"""Test zip content is included in messages."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = ls_output
- s3_fs.return_value.to_json.return_value = fs_json
- zip_fs.return_value.return_value.find.return_value = zip_content
- zip_fs.return_value.return_value.to_json.return_value = zip_json
- fs, files = s3stalker.get_last_files(path, anon=True)
- message_list = pytroll_collectors.fsspec_to_message.filelist_unzip_to_messages(fs, files, subject)
+ fs, files = get_last_files_from_stalker()
+ message_list = filelist_unzip_to_messages(fs, files)
zip_file = ls_output[1]['name']
exp_file_list = ['zip://' + file['name'] + '::s3://' + zip_file for file in zip_content.values()]
file_list = [file['uri'] for file in message_list[1].data['dataset']]
assert file_list == exp_file_list
- @mock.patch('s3fs.S3FileSystem')
- @mock.patch('pytroll_collectors.fsspec_to_message.get_filesystem_class')
- def test_file_list_unzip_to_messages_has_correct_subject(self, zip_fs, s3_fs):
+ def test_file_list_unzip_to_messages_has_correct_subject(self):
"""Test filelist_unzip_to_messages has correct subject."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = ls_output
- s3_fs.return_value.to_json.return_value = fs_json
- zip_fs.return_value.return_value.find.return_value = zip_content
- zip_fs.return_value.return_value.to_json.return_value = zip_json
- fs, files = s3stalker.get_last_files(path, anon=True)
- message_list = pytroll_collectors.fsspec_to_message.filelist_unzip_to_messages(fs, files, subject)
+ fs, files = get_last_files_from_stalker()
+ message_list = filelist_unzip_to_messages(fs, files)
assert message_list[1].subject == subject
- @mock.patch('s3fs.S3FileSystem')
- @mock.patch('pytroll_collectors.fsspec_to_message.get_filesystem_class')
- def test_file_list_unzip_to_messages_has_metadata(self, zip_fs, s3_fs):
+ def test_file_list_unzip_to_messages_has_metadata(self):
"""Test filelist_unzip_to_messages has correct subject."""
- from pytroll_collectors import s3stalker
- path = "sentinel-s3-ol2wfr-zips/2020/11/21"
- s3_fs.return_value.ls.return_value = ls_output
- s3_fs.return_value.to_json.return_value = fs_json
+ fs, files = get_last_files_from_stalker(pattern=zip_pattern)
+ message_list = filelist_unzip_to_messages(fs, files)
+ assert message_list[0].data['platform_name'] == 'S3A'
+
+
+def filelist_unzip_to_messages(fs, files):
+ """Get the message from the list of zipped files."""
+ with mock.patch('pytroll_collectors.fsspec_to_message.get_filesystem_class') as zip_fs:
zip_fs.return_value.return_value.find.return_value = zip_content
zip_fs.return_value.return_value.to_json.return_value = zip_json
- fs, files = s3stalker.get_last_files(path, pattern=zip_pattern, anon=True)
- message_list = pytroll_collectors.fsspec_to_message.filelist_unzip_to_messages(fs, files, subject)
- assert message_list[0].data['platform_name'] == 'S3A'
+ return pytroll_collectors.fsspec_to_message.filelist_unzip_to_messages(fs, files, subject)
+
+
+S3_STALKER_CONFIG = {'s3_kwargs': {'anon': False, 'client_kwargs': {'endpoint_url': 'https://xxx.yyy.zz',
+ 'aws_access_key_id': 'my_accesskey',
+ 'aws_secret_access_key': 'my_secret_key'}},
+ "s3_bucket": "s3://bucket_from_file/",
+ "fetch_back_to": {"hours": 20},
+ "subject": "/segment/2/safe-olci/S3/",
+ "file_pattern": ("{platform_name:3s}_OL_2_{datatype_id:_<6s}_{start_time:%Y%m%dT%H%M%S}_"
+ "{end_time:%Y%m%dT%H%M%S}_{creation_time:%Y%m%dT%H%M%S}_{duration:4d}_"
+ "{cycle:3d}_{relative_orbit:3d}_{frame:4d}_{centre:3s}_{mode:1s}_{timeliness:2s}_"
+ "{collection:3s}.zip")}
+
+
+@contextmanager
+def FakePublish(topic, publisher):
+ """Make a fake Publish context."""
+ yield publisher
+
+
+@mock.patch('s3fs.S3FileSystem')
+def test_publish_new_files(s3_fs):
+ """Test that publish_new_files actually publishes files."""
+ publisher = FakePublisher("fake publisher")
+ from functools import partial
+ fake_publish = partial(FakePublish, publisher=publisher)
+ with mock.patch('pytroll_collectors.s3stalker.Publish', new=fake_publish):
+ s3_fs.return_value.to_json.return_value = fs_json
+ s3_fs.return_value.ls.return_value = deepcopy(ls_output)
+ with mock.patch('pytroll_collectors.fsspec_to_message.get_filesystem_class') as zip_fs:
+ zip_fs.return_value.return_value.find.return_value = zip_content
+ zip_fs.return_value.return_value.to_json.return_value = zip_json
+ from pytroll_collectors.s3stalker import publish_new_files
+ with freeze_time('2020-11-21 14:00:00'):
+ publish_new_files("sentinel-s3-ol2wfr-zips/2020/11/21",
+ S3_STALKER_CONFIG.copy(),
+ publisher_ready_time=0)
+ assert len(publisher.messages_sent) == 8
+ with freeze_time('2020-11-24 14:00:00'):
+ publisher.clear_sent_messages()
+ publish_new_files("sentinel-s3-ol2wfr-zips/2020/11/21",
+ S3_STALKER_CONFIG.copy(),
+ publisher_ready_time=0)
+ assert len(publisher.messages_sent) == 0
+
+
+class FakePublisher:
+ """A fake publish class with a dummy send method."""
+
+ def __init__(self, _dummy):
+ """Initialize the fake publisher class."""
+ self.messages_sent = []
+
+ def send(self, msg):
+ """Faking the sending of a message."""
+ self.messages_sent.append(msg)
+ return msg
+
+ def __call__(self, msg):
+ """Faking a call method."""
+ return self.send(msg)
+
+ def clear_sent_messages(self):
+ """Clear the sent messages."""
+ self.messages_sent = []
+
+ def start(self):
+ """Start the publisher."""
+
+ def stop(self):
+ """Stop the publisher."""
+
+
+def test_arg_parse():
+ """Test the arg parsing."""
+ res = arg_parse(["s3://some_bucket", "my_config_file", "-l", "my_log_config"])
+ assert res.bucket == "s3://some_bucket"
+ assert res.config == "my_config_file"
+ assert res.log == "my_log_config"
+
+
+LOG_CONFIG = """version: 1
+formatters:
+ simple:
+ format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+handlers:
+ console:
+ class: logging.StreamHandler
+ level: DEBUG
+ formatter: simple
+ stream: ext://sys.stdout
+loggers:
+ simpleExample:
+ level: DEBUG
+ handlers: [console]
+ propagate: no
+root:
+ level: DEBUG
+ handlers: [console]
+"""
+
+
+def test_get_configs_from_command_line(fake_config_file, fake_log_config):
+ """Test get_configs_from_command_line."""
+ config_filename = fake_config_file
+ log_config_filename = fake_log_config
+ command_line = ["s3://some_bucket", os.fspath(config_filename), "-l", os.fspath(log_config_filename)]
+ bucket, config, log_config = get_configs_from_command_line(command_line)
+ assert config == S3_STALKER_CONFIG
+ assert bucket == "s3://some_bucket"
+ assert log_config["version"] == 1
+
+
+@pytest.fixture()
+def fake_log_config(tmp_path):
+ """Create a fake log config file."""
+ log_config_filename = tmp_path / "my_log_config"
+ with open(log_config_filename, "w") as fd:
+ fd.write(LOG_CONFIG)
+ return log_config_filename
+
+
+@pytest.fixture
+def fake_config_file(tmp_path):
+ """Create a fake config file."""
+ config_filename = tmp_path / "my_config_file"
+ with open(config_filename, "w") as fd:
+ fd.write(yaml.dump(S3_STALKER_CONFIG))
+ return config_filename
+
+
+def test_get_configs_from_command_line_gets_bucket_from_config_when_not_provided(fake_config_file):
+ """Test the function gets the bucket from the config file."""
+ config_filename = fake_config_file
+
+ command_line = [os.fspath(config_filename)]
+ bucket, config, log_config = get_configs_from_command_line(command_line)
+ assert config == S3_STALKER_CONFIG
+ assert bucket == "s3://bucket_from_file/"
+ assert log_config == {}
diff --git a/pytroll_collectors/tests/test_s3stalker_daemon_runner.py b/pytroll_collectors/tests/test_s3stalker_daemon_runner.py
new file mode 100644
index 00000000..0bc59df4
--- /dev/null
+++ b/pytroll_collectors/tests/test_s3stalker_daemon_runner.py
@@ -0,0 +1,147 @@
+# Copyright (c) 2020 - 2023 Pytroll developers
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+"""Test s3stalker daemon."""
+import datetime
+from copy import deepcopy
+from unittest import mock
+import time
+
+from dateutil.tz import tzutc, UTC
+from freezegun import freeze_time
+
+from pytroll_collectors.s3stalker import set_last_fetch, _match_files_to_pattern, create_messages_for_recent_files
+from pytroll_collectors.s3stalker_daemon_runner import S3StalkerRunner
+from pytroll_collectors.tests.test_s3stalker import fs_json, FakePublisher
+
+S3_STALKER_CONFIG = {'s3_kwargs': {'anon': False, 'client_kwargs': {'endpoint_url': 'https://xxx.yyy.zz',
+ 'aws_access_key_id': 'my_accesskey',
+ 'aws_secret_access_key': 'my_secret_key'}},
+ 'fetch_back_to': {'hours': 1},
+ 'polling_interval': {'minutes': 2},
+ 'subject': '/yuhu',
+ 'file_pattern': '{channel:5s}_{platform_name:3s}_d{start_time:%Y%m%d_t%H%M%S}{frac:1s}_e{end_time:%H%M%S}{frac_end:1s}_b{orbit_number:5s}_c{process_time:20s}_cspp_dev.h5', # noqa
+ 'publisher': {'name': 's3stalker_runner'}}
+
+ATMS_FILES = [{'Key': 'atms-sdr/GATMO_j01_d20221220_t1230560_e1231276_b26363_c20221220124753607778_cspp_dev.h5',
+ 'LastModified': datetime.datetime(2022, 12, 20, 12, 48, 25, 173000, tzinfo=tzutc()),
+ 'ETag': '"bb037828c47d28a30ce6d49e719b6c64"',
+ 'Size': 155964,
+ 'StorageClass': 'STANDARD',
+ 'type': 'file',
+ 'size': 155964,
+ 'name': 'atms-sdr/GATMO_j01_d20221220_t1230560_e1231276_b26363_c20221220124753607778_cspp_dev.h5'},
+ {'Key': 'atms-sdr/GATMO_j01_d20221220_t1231280_e1231596_b26363_c20221220124754976465_cspp_dev.h5',
+ 'LastModified': datetime.datetime(2022, 12, 20, 12, 48, 25, 834000, tzinfo=tzutc()),
+ 'ETag': '"327b7e1300700f55268cc1f4dc133459"',
+ 'Size': 156172,
+ 'StorageClass': 'STANDARD',
+ 'type': 'file',
+ 'size': 156172,
+ 'name': 'atms-sdr/GATMO_j01_d20221220_t1231280_e1231596_b26363_c20221220124754976465_cspp_dev.h5'},
+ {'Key': 'atms-sdr/SATMS_npp_d20221220_t1330400_e1331116_b57761_c20221220133901538622_cspp_dev.h5',
+ 'LastModified': datetime.datetime(2022, 12, 20, 13, 39, 33, 86000, tzinfo=tzutc()),
+ 'ETag': '"2fe59174e29627acd82a28716b18d92a"',
+ 'Size': 168096,
+ 'StorageClass': 'STANDARD',
+ 'type': 'file',
+ 'size': 168096,
+ 'name': 'atms-sdr/SATMS_npp_d20221220_t1330400_e1331116_b57761_c20221220133901538622_cspp_dev.h5'},
+ {'Key': 'atms-sdr/SATMS_npp_d20221220_t1331120_e1331436_b57761_c20221220133902730925_cspp_dev.h5',
+ 'LastModified': datetime.datetime(2022, 12, 20, 13, 39, 33, 798000, tzinfo=tzutc()),
+ 'ETag': '"ffff983cdf767ab635a7ae51dc7d0626"',
+ 'Size': 167928,
+ 'StorageClass': 'STANDARD',
+ 'type': 'file',
+ 'size': 167928,
+ 'name': 'atms-sdr/SATMS_npp_d20221220_t1331120_e1331436_b57761_c20221220133902730925_cspp_dev.h5'}]
+
+
+def test_match_files_to_pattern():
+ """Test matching files to pattern."""
+ path = 'atms-sdr'
+ pattern = 'GATMO_{platform_name:3s}_d{start_time:%Y%m%d_t%H%M%S}{frac:1s}_e{end_time:%H%M%S}{frac_end:1s}_b{orbit_number:5s}_c{process_time:20s}_cspp_dev.h5' # noqa
+
+ res_files = _match_files_to_pattern(ATMS_FILES, path, pattern)
+
+ assert res_files == ATMS_FILES[0:2]
+
+
+class TestS3StalkerRunner:
+ """Test the S3 Stalker Runner functionalities."""
+
+ def setup_method(self):
+ """Set up the test case."""
+ self.ls_output = deepcopy(ATMS_FILES)
+ start_time = datetime.datetime(2022, 12, 20, 12, 0)
+ now = datetime.datetime.utcnow()
+ self.delta_sec = (now - start_time).total_seconds()
+ self.bucket = 'atms-sdr'
+ self.config = S3_STALKER_CONFIG
+
+ @mock.patch('s3fs.S3FileSystem')
+ def test_create_messages_for_recent_files(self, s3_fs):
+ """Test create messages for recent files arriving in the bucket."""
+ s3_fs.return_value.ls.return_value = self.ls_output
+ s3_fs.return_value.to_json.return_value = fs_json
+
+ s3runner = S3StalkerRunner(self.bucket, S3_STALKER_CONFIG.copy())
+
+ s3runner._timedelta = {'seconds': self.delta_sec}
+ last_fetch_time = datetime.datetime.now(UTC) - datetime.timedelta(**s3runner._timedelta)
+ set_last_fetch(last_fetch_time)
+ result_msgs = create_messages_for_recent_files(s3runner.bucket, s3runner.config)
+
+ assert len(result_msgs) == 4
+ msg = result_msgs[0]
+ assert msg.data['orbit_number'] == '26363'
+ assert msg.data['platform_name'] == 'j01'
+ assert msg.data['frac_end'] == '6'
+ assert msg.data['start_time'] == datetime.datetime(2022, 12, 20, 12, 30, 56)
+ assert msg.data['end_time'] == datetime.datetime(1900, 1, 1, 12, 31, 27)
+ assert msg.data["process_time"] == "20221220124753607778"
+
+ @mock.patch('s3fs.S3FileSystem')
+ @mock.patch('pytroll_collectors.s3stalker_daemon_runner.create_publisher_from_dict_config')
+ def test_fetch_new_files_publishes_messages(self, create_publisher, s3_fs):
+ """Test process the messages."""
+ s3_fs.return_value.ls.return_value = self.ls_output[:2]
+ s3_fs.return_value.to_json.return_value = fs_json
+
+ publisher = FakePublisher("fake_publisher")
+ create_publisher.return_value = publisher
+ before_files_arrived = datetime.datetime(2022, 12, 20, 12, 0, 0, tzinfo=UTC)
+
+ stalker_config = S3_STALKER_CONFIG.copy()
+ stalker_config["polling_interval"] = dict(seconds=.2)
+
+ s3runner = None
+ try:
+
+ with freeze_time(before_files_arrived + datetime.timedelta(hours=1)):
+ s3runner = S3StalkerRunner(self.bucket, stalker_config, 0)
+ s3runner.start()
+ time.sleep(.1)
+ assert len(publisher.messages_sent) == 2
+ first_messages_sent = publisher.messages_sent
+ publisher.clear_sent_messages()
+
+ with freeze_time(before_files_arrived + datetime.timedelta(hours=2)):
+ s3_fs.return_value.ls.return_value = self.ls_output
+ time.sleep(.1)
+ assert len(publisher.messages_sent) == 2
+ assert publisher.messages_sent != first_messages_sent
+ finally:
+ if s3runner:
+ s3runner.close()
diff --git a/setup.py b/setup.py
index 9216eb04..093c5463 100644
--- a/setup.py
+++ b/setup.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
-# Copyright (c) 2014 - 2021 Pytroll developers
+# Copyright (c) 2014 - 2022 Pytroll developers
#
# Author(s):
#
@@ -92,8 +92,9 @@
install_requires=['posttroll>=1.3.0',
'trollsift',
'pyyaml'],
- tests_require=['trollsift', 'netifaces', 'watchdog', 'posttroll', 'pyyaml', 'pyinotify', 's3fs',
+ tests_require=['trollsift', 'netifaces', 'watchdog', 'posttroll', 'pyyaml',
+ 'pyinotify', 's3fs', 'freezegun',
'pyresample', 'python-dateutil', 'posttroll', 'pytest'],
extras_require=extras_require,
- python_requires='>=3.7',
+ python_requires='>=3.9',
)