From 170e68f6988fc131d81994a2045b9d9db73baa73 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Mon, 11 Nov 2024 12:56:01 +0200 Subject: [PATCH 01/12] Add SIGTERM handling to segment gatherer --- pytroll_collectors/segments.py | 12 +++++- pytroll_collectors/tests/test_segments.py | 51 +++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/pytroll_collectors/segments.py b/pytroll_collectors/segments.py index a9b68ffa..896a6207 100644 --- a/pytroll_collectors/segments.py +++ b/pytroll_collectors/segments.py @@ -37,10 +37,11 @@ import datetime as dt import logging.handlers +import os +import signal from abc import ABCMeta, abstractmethod from collections import OrderedDict from enum import Enum -import os import trollsift from posttroll import message as pmessage @@ -610,6 +611,7 @@ def __init__(self, config): self._group_by_minutes = self._config.get('group_by_minutes', None) self._loop = False + self._sigterm_caught = False self._providing_server = self._config.get('providing_server') self._is_first_message_after_start = True @@ -711,9 +713,13 @@ def _collect_publisher_config(self): def run(self): """Run SegmentGatherer.""" self._setup_messaging() + signal.signal(signal.SIGTERM, self._handle_sigterm) self._loop = True while self._loop: + if self._sigterm_caught and not self.slots: + self.stop() + self.triage_slots() # Check listener for new messages @@ -734,6 +740,10 @@ def run(self): logger.info("New message received: %s", str(msg)) self.process(msg) + def _handle_sigterm(self, signum, frame): + logging.info("Caught SIGTERM, shutting down when all collections are finished.") + self._sigterm_caught = True + def triage_slots(self): """Check if there are slots ready for publication.""" slots = self.slots.copy() diff --git a/pytroll_collectors/tests/test_segments.py b/pytroll_collectors/tests/test_segments.py index c56874c9..de10f06d 100644 --- a/pytroll_collectors/tests/test_segments.py +++ b/pytroll_collectors/tests/test_segments.py @@ -747,6 +747,57 @@ def test_listener_use_first_nameserver(self): self.msg0deg._setup_listener() assert_messaging(None, None, None, 'localhost', None, ListenerContainer) + def test_sigterm(self): + """Test that SIGTERM signal is handled.""" + import os + import signal + import time + from multiprocessing import Process + + with patch('pytroll_collectors.segments.ListenerContainer'): + col = SegmentGatherer(CONFIG_SINGLE) + proc = Process(target=col.run) + proc.start() + time.sleep(1) + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 + + def test_sigterm_nonempty_slots(self): + """Test that SIGTERM signal is handled properly when there are active slots present.""" + import os + import signal + import time + from multiprocessing import Process + + with patch('pytroll_collectors.segments.ListenerContainer'): + with patch('pytroll_collectors.segments.SegmentGatherer.triage_slots', + new=_fake_triage_slots): + col = SegmentGatherer(CONFIG_SINGLE) + proc = Process(target=col.run) + proc.start() + time.sleep(1) + tic = time.time() + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 + # Triage after the kill signal takes 1 s + assert time.time() - tic > 1. + + +def _fake_triage_slots(self): + """Fake the triage_slots() method. + + The fake triage adds a new slot if SIGTERM has not been caught, and removes it when the signal comes. + """ + import time + self.slots["foo"] = "bar" + if self._sigterm_caught: + del self.slots["foo"] + time.sleep(1) + def _get_message_from_metadata_and_patterns(mda, patterns): fake_message = FakeMessage(mda) From b4c1c48ea6438b20500a9e390f07d44eb6a1e9ca Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 10:31:12 +0200 Subject: [PATCH 02/12] Add SIGTERM handling to geographic gatherer --- pytroll_collectors/geographic_gatherer.py | 17 +++++++++++++ .../tests/test_geographic_gatherer.py | 24 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index 3312d227..672e025a 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -25,6 +25,7 @@ """Geographic segment gathering.""" import logging +import signal import time from configparser import NoOptionError, ConfigParser @@ -56,6 +57,8 @@ def __init__(self, opts): self.triggers = [] self.return_status = 0 + self._sigterm_caught = False + self._clean_config() self._setup_publisher() try: @@ -103,8 +106,10 @@ def _setup_triggers(self): def run(self): """Run granule triggers.""" + signal.signal(signal.SIGTERM, self._handle_sigterm) try: while True: + self._check_sigterm() time.sleep(1) for trigger in self.triggers: if not trigger.is_alive(): @@ -119,6 +124,18 @@ def run(self): return self.return_status + def _handle_sigterm(self, signum, frame): + logger.info("Caught SIGTERM, shutting down when all collections are finished.") + self._sigterm_caught = True + + def _check_sigterm(self): + if self._sigterm_caught: + for t in self.triggers: + for c in t.collectors: + if c.granules: + return + raise KeyboardInterrupt("No ongoing collections.") + def stop(self): """Stop the gatherer.""" logger.info('Ending publication the gathering of granules...') diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 077d2bd2..915ed8fe 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -527,3 +527,27 @@ def test_full_pass(self, sub_factory, monkeypatch, tmp_tle): assert snd_msg.data == expected_msg.data finally: gatherer.stop() + + +def test_sigterm(tmp_config_file, tmp_config_parser): + """Test that SIGTERM signal is handled.""" + import os + import signal + import time + from multiprocessing import Process + + from pytroll_collectors.geographic_gatherer import GeographicGatherer + + with open(tmp_config_file, mode="w") as fp: + tmp_config_parser.write(fp) + + opts = arg_parse(["-c", "minimal_config", "-p", "40000", "-n", "false", "-i", "localhost:12345", + str(tmp_config_file)]) + gatherer = GeographicGatherer(opts) + proc = Process(target=gatherer.run) + proc.start() + time.sleep(1) + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 From b0ff6ab35c625ba8056e05b0df3fedb45abc313b Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 10:32:16 +0200 Subject: [PATCH 03/12] Clarify log message --- pytroll_collectors/geographic_gatherer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index 672e025a..f16a810b 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -138,7 +138,7 @@ def _check_sigterm(self): def stop(self): """Stop the gatherer.""" - logger.info('Ending publication the gathering of granules...') + logger.info('Ending the gathering of granules...') for trigger in self.triggers: trigger.stop() self.publisher.stop() From 43b8cc73039b87da43c6252dd83bdf5c861a9b97 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 14:42:01 +0200 Subject: [PATCH 04/12] Make SIGTERM checking return boolean depending on the status --- pytroll_collectors/geographic_gatherer.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index f16a810b..0dcebef2 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -108,8 +108,7 @@ def run(self): """Run granule triggers.""" signal.signal(signal.SIGTERM, self._handle_sigterm) try: - while True: - self._check_sigterm() + while self._keep_running(): time.sleep(1) for trigger in self.triggers: if not trigger.is_alive(): @@ -128,13 +127,15 @@ def _handle_sigterm(self, signum, frame): logger.info("Caught SIGTERM, shutting down when all collections are finished.") self._sigterm_caught = True - def _check_sigterm(self): + def _keep_running(self): + keep_running = True if self._sigterm_caught: + keep_running = False for t in self.triggers: for c in t.collectors: if c.granules: - return - raise KeyboardInterrupt("No ongoing collections.") + keep_running = True + return keep_running def stop(self): """Stop the gatherer.""" From 8fcf29f59af530eb5e04e6392ca667dba4218927 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 15:24:15 +0200 Subject: [PATCH 05/12] Fix fsspec tar and zip filesystem tests --- .../tests/test_fsspec_to_message.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pytroll_collectors/tests/test_fsspec_to_message.py b/pytroll_collectors/tests/test_fsspec_to_message.py index 0444fbd2..8e3edb3b 100644 --- a/pytroll_collectors/tests/test_fsspec_to_message.py +++ b/pytroll_collectors/tests/test_fsspec_to_message.py @@ -121,8 +121,8 @@ def create_files_to_pack(self, tmp_path): @pytest.mark.parametrize( ("packing", "create_packfile", "filesystem_class"), [ - ("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"), - ("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"), + ("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"), + ("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"), ] ) def test_pack_file_extract(self, packing, create_packfile, filesystem_class, tmp_path): @@ -153,8 +153,8 @@ def test_pack_file_extract(self, packing, create_packfile, filesystem_class, tmp @pytest.mark.parametrize( ("packing", "create_packfile", "filesystem_class"), [ - ("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"), - ("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"), + ("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"), + ("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"), ] ) def test_pack_local_file_extract(self, packing, create_packfile, filesystem_class, tmp_path): @@ -184,8 +184,8 @@ def test_pack_local_file_extract(self, packing, create_packfile, filesystem_clas @pytest.mark.parametrize( ("packing", "create_packfile", "filesystem_class"), [ - ("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"), - ("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"), + ("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"), + ("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"), ] ) def test_pack_local_file_extract_filesystem(self, packing, create_packfile, filesystem_class, tmp_path): @@ -211,8 +211,8 @@ def check_filesystem_is_understood_by_fsspec(self, filesystem_info): @pytest.mark.parametrize( ("packing", "create_packfile", "filesystem_class"), [ - ("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"), - ("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"), + ("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"), + ("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"), ] ) def test_pack_local_file_extract_with_custom_options(self, packing, create_packfile, filesystem_class, tmp_path): From fd8dcdf2ec774e562200ec35a301d2fb967ccfa1 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 15:56:12 +0200 Subject: [PATCH 06/12] Add relative_files = True for Coveralls --- setup.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.cfg b/setup.cfg index 2bf78ea0..4d02abc0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,3 +19,4 @@ tag_prefix = v omit = pytroll_collectors/_version.py versioneer.py +relative_files = True From 340937898df3a221a0ac3fb901c9dafa603dff5e Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Wed, 13 Nov 2024 12:42:44 +0200 Subject: [PATCH 07/12] Stylize the while loop in SegmentGatherer.run --- pytroll_collectors/segments.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pytroll_collectors/segments.py b/pytroll_collectors/segments.py index 896a6207..fabfab87 100644 --- a/pytroll_collectors/segments.py +++ b/pytroll_collectors/segments.py @@ -716,10 +716,7 @@ def run(self): signal.signal(signal.SIGTERM, self._handle_sigterm) self._loop = True - while self._loop: - if self._sigterm_caught and not self.slots: - self.stop() - + while self._keep_running(): self.triage_slots() # Check listener for new messages @@ -728,8 +725,7 @@ def run(self): except AttributeError: msg = self._listener.queue.get(True, 1) except KeyboardInterrupt: - self.stop() - continue + break except Empty: continue @@ -739,11 +735,17 @@ def run(self): continue logger.info("New message received: %s", str(msg)) self.process(msg) + self.stop() def _handle_sigterm(self, signum, frame): logging.info("Caught SIGTERM, shutting down when all collections are finished.") self._sigterm_caught = True + def _keep_running(self): + if not self._loop or (self._sigterm_caught and not self.slots): + return False + return True + def triage_slots(self): """Check if there are slots ready for publication.""" slots = self.slots.copy() From 962e875596646b436b322cc53d736a25f2d8b28c Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Wed, 13 Nov 2024 13:12:02 +0200 Subject: [PATCH 08/12] Patch the trigger creation --- pytroll_collectors/tests/test_geographic_gatherer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 915ed8fe..1616ebe9 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -543,7 +543,9 @@ def test_sigterm(tmp_config_file, tmp_config_parser): opts = arg_parse(["-c", "minimal_config", "-p", "40000", "-n", "false", "-i", "localhost:12345", str(tmp_config_file)]) - gatherer = GeographicGatherer(opts) + # We don't need the triggers here. They also interfere with completing the test (the test never exits) + with patch("pytroll_collectors.geographic_gatherer.TriggerFactory.create"): + gatherer = GeographicGatherer(opts) proc = Process(target=gatherer.run) proc.start() time.sleep(1) From ce5a0d78c019a0cac04f099432c11622524f0f21 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Wed, 13 Nov 2024 13:20:16 +0200 Subject: [PATCH 09/12] Refactor _keep_running() --- pytroll_collectors/geographic_gatherer.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index 0dcebef2..f446ac6f 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -130,13 +130,16 @@ def _handle_sigterm(self, signum, frame): def _keep_running(self): keep_running = True if self._sigterm_caught: - keep_running = False - for t in self.triggers: - for c in t.collectors: - if c.granules: - keep_running = True + keep_running = self._trigger_collectors_have_granules() return keep_running + def _trigger_collectors_have_granules(self): + for t in self.triggers: + for c in t.collectors: + if c.granules: + return True + return False + def stop(self): """Stop the gatherer.""" logger.info('Ending the gathering of granules...') From e866900bce5e5075a7d616eaefada426bc9f742a Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Wed, 13 Nov 2024 14:21:58 +0200 Subject: [PATCH 10/12] Add SIGTERM test for ongoing collection --- .../tests/test_geographic_gatherer.py | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 1616ebe9..44548f4e 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -553,3 +553,63 @@ def test_sigterm(tmp_config_file, tmp_config_parser): proc.join() assert proc.exitcode == 0 + + +def test_sigterm_with_collection(tmp_config_file, tmp_config_parser): + """Test that SIGTERM signal is handled when there is collection ongoing.""" + import os + import signal + import time + from multiprocessing import Process + + from pytroll_collectors.geographic_gatherer import GeographicGatherer + + with open(tmp_config_file, mode="w") as fp: + tmp_config_parser.write(fp) + + opts = arg_parse(["-c", "posttroll_section", "-p", "40000", "-n", "false", "-i", "localhost:12345", + str(tmp_config_file)]) + # Use a fake trigger that initially sets some granules and after a while clears them + with patch("pytroll_collectors.geographic_gatherer.PostTrollTrigger", + new=FakeTriggerWithGranules): + gatherer = GeographicGatherer(opts) + proc = Process(target=gatherer.run) + proc.start() + time.sleep(1) + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 + + +class FakeTriggerWithGranules: + """Fake trigger class used in testing SIGTERM handling. + + At creation, adds "foo" to collector granules. When is_alive() is called the second time, it clears the granules. + """ + + def __init__(self, collectors, *args, **kwargs): + """Initialize the trigger class.""" + self.collectors = collectors + for col in self.collectors: + col.granules.append("foo") + self._args = args + self._kwargs = kwargs + self._counter = 0 + + def is_alive(self): + """Return True for alive thread.""" + if self._counter > 0: + # On the second call clear the granules + for col in self.collectors: + col.granules = [] + self._counter += 1 + return True + + def start(self): + """Start the trigger.""" + pass + + def stop(self): + """Stop the trigger.""" + pass From 3884f6e9f3793d60a25d14b8f385eff79c8cf06c Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Wed, 13 Nov 2024 15:01:36 +0200 Subject: [PATCH 11/12] Update changelog --- CHANGELOG.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc26c94b..49cac7f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,21 @@ +############################################################################### +## Version 0.17.0 (2024/11/13) + +### Issues Closed + +* [Issue 149](https://github.com/pytroll/pytroll-collectors/issues/149) - trollstalker seems to do nothing in current main + +In this release 1 issue was closed. + +### Pull Requests Merged + +#### Features added + +* [PR 157](https://github.com/pytroll/pytroll-collectors/pull/157) - Add SIGTERM handling to geographic gatherer +* [PR 156](https://github.com/pytroll/pytroll-collectors/pull/156) - Add SIGTERM handling to segment gatherer + +In this release 2 pull requests were closed. + ############################################################################### ## Version 0.16.0 (2024/02/16) From afb6cae608fbae061d72002ce81ec17cf8a778f6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 18 Nov 2024 18:34:15 +0000 Subject: [PATCH 12/12] Bump codecov/codecov-action from 4 to 5 Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4 to 5. - [Release notes](https://github.com/codecov/codecov-action/releases) - [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/codecov/codecov-action/compare/v4...v5) --- updated-dependencies: - dependency-name: codecov/codecov-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 591eaf24..416de4b3 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -62,7 +62,7 @@ jobs: pytest --cov=pytroll_collectors pytroll_collectors/tests --cov-report=xml - name: Upload unittest coverage to Codecov - uses: codecov/codecov-action@v4 + uses: codecov/codecov-action@v5 with: flags: unittests file: ./coverage.xml