From 170e68f6988fc131d81994a2045b9d9db73baa73 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Mon, 11 Nov 2024 12:56:01 +0200 Subject: [PATCH 1/4] Add SIGTERM handling to segment gatherer --- pytroll_collectors/segments.py | 12 +++++- pytroll_collectors/tests/test_segments.py | 51 +++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/pytroll_collectors/segments.py b/pytroll_collectors/segments.py index a9b68ffa..896a6207 100644 --- a/pytroll_collectors/segments.py +++ b/pytroll_collectors/segments.py @@ -37,10 +37,11 @@ import datetime as dt import logging.handlers +import os +import signal from abc import ABCMeta, abstractmethod from collections import OrderedDict from enum import Enum -import os import trollsift from posttroll import message as pmessage @@ -610,6 +611,7 @@ def __init__(self, config): self._group_by_minutes = self._config.get('group_by_minutes', None) self._loop = False + self._sigterm_caught = False self._providing_server = self._config.get('providing_server') self._is_first_message_after_start = True @@ -711,9 +713,13 @@ def _collect_publisher_config(self): def run(self): """Run SegmentGatherer.""" self._setup_messaging() + signal.signal(signal.SIGTERM, self._handle_sigterm) self._loop = True while self._loop: + if self._sigterm_caught and not self.slots: + self.stop() + self.triage_slots() # Check listener for new messages @@ -734,6 +740,10 @@ def run(self): logger.info("New message received: %s", str(msg)) self.process(msg) + def _handle_sigterm(self, signum, frame): + logging.info("Caught SIGTERM, shutting down when all collections are finished.") + self._sigterm_caught = True + def triage_slots(self): """Check if there are slots ready for publication.""" slots = self.slots.copy() diff --git a/pytroll_collectors/tests/test_segments.py b/pytroll_collectors/tests/test_segments.py index c56874c9..de10f06d 100644 --- a/pytroll_collectors/tests/test_segments.py +++ b/pytroll_collectors/tests/test_segments.py @@ -747,6 +747,57 @@ def test_listener_use_first_nameserver(self): self.msg0deg._setup_listener() assert_messaging(None, None, None, 'localhost', None, ListenerContainer) + def test_sigterm(self): + """Test that SIGTERM signal is handled.""" + import os + import signal + import time + from multiprocessing import Process + + with patch('pytroll_collectors.segments.ListenerContainer'): + col = SegmentGatherer(CONFIG_SINGLE) + proc = Process(target=col.run) + proc.start() + time.sleep(1) + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 + + def test_sigterm_nonempty_slots(self): + """Test that SIGTERM signal is handled properly when there are active slots present.""" + import os + import signal + import time + from multiprocessing import Process + + with patch('pytroll_collectors.segments.ListenerContainer'): + with patch('pytroll_collectors.segments.SegmentGatherer.triage_slots', + new=_fake_triage_slots): + col = SegmentGatherer(CONFIG_SINGLE) + proc = Process(target=col.run) + proc.start() + time.sleep(1) + tic = time.time() + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 + # Triage after the kill signal takes 1 s + assert time.time() - tic > 1. + + +def _fake_triage_slots(self): + """Fake the triage_slots() method. + + The fake triage adds a new slot if SIGTERM has not been caught, and removes it when the signal comes. + """ + import time + self.slots["foo"] = "bar" + if self._sigterm_caught: + del self.slots["foo"] + time.sleep(1) + def _get_message_from_metadata_and_patterns(mda, patterns): fake_message = FakeMessage(mda) From 8fcf29f59af530eb5e04e6392ca667dba4218927 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 15:24:15 +0200 Subject: [PATCH 2/4] Fix fsspec tar and zip filesystem tests --- .../tests/test_fsspec_to_message.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pytroll_collectors/tests/test_fsspec_to_message.py b/pytroll_collectors/tests/test_fsspec_to_message.py index 0444fbd2..8e3edb3b 100644 --- a/pytroll_collectors/tests/test_fsspec_to_message.py +++ b/pytroll_collectors/tests/test_fsspec_to_message.py @@ -121,8 +121,8 @@ def create_files_to_pack(self, tmp_path): @pytest.mark.parametrize( ("packing", "create_packfile", "filesystem_class"), [ - ("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"), - ("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"), + ("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"), + ("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"), ] ) def test_pack_file_extract(self, packing, create_packfile, filesystem_class, tmp_path): @@ -153,8 +153,8 @@ def test_pack_file_extract(self, packing, create_packfile, filesystem_class, tmp @pytest.mark.parametrize( ("packing", "create_packfile", "filesystem_class"), [ - ("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"), - ("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"), + ("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"), + ("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"), ] ) def test_pack_local_file_extract(self, packing, create_packfile, filesystem_class, tmp_path): @@ -184,8 +184,8 @@ def test_pack_local_file_extract(self, packing, create_packfile, filesystem_clas @pytest.mark.parametrize( ("packing", "create_packfile", "filesystem_class"), [ - ("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"), - ("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"), + ("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"), + ("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"), ] ) def test_pack_local_file_extract_filesystem(self, packing, create_packfile, filesystem_class, tmp_path): @@ -211,8 +211,8 @@ def check_filesystem_is_understood_by_fsspec(self, filesystem_info): @pytest.mark.parametrize( ("packing", "create_packfile", "filesystem_class"), [ - ("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"), - ("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"), + ("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"), + ("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"), ] ) def test_pack_local_file_extract_with_custom_options(self, packing, create_packfile, filesystem_class, tmp_path): From fd8dcdf2ec774e562200ec35a301d2fb967ccfa1 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 15:56:12 +0200 Subject: [PATCH 3/4] Add relative_files = True for Coveralls --- setup.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.cfg b/setup.cfg index 2bf78ea0..4d02abc0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,3 +19,4 @@ tag_prefix = v omit = pytroll_collectors/_version.py versioneer.py +relative_files = True From 340937898df3a221a0ac3fb901c9dafa603dff5e Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Wed, 13 Nov 2024 12:42:44 +0200 Subject: [PATCH 4/4] Stylize the while loop in SegmentGatherer.run --- pytroll_collectors/segments.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pytroll_collectors/segments.py b/pytroll_collectors/segments.py index 896a6207..fabfab87 100644 --- a/pytroll_collectors/segments.py +++ b/pytroll_collectors/segments.py @@ -716,10 +716,7 @@ def run(self): signal.signal(signal.SIGTERM, self._handle_sigterm) self._loop = True - while self._loop: - if self._sigterm_caught and not self.slots: - self.stop() - + while self._keep_running(): self.triage_slots() # Check listener for new messages @@ -728,8 +725,7 @@ def run(self): except AttributeError: msg = self._listener.queue.get(True, 1) except KeyboardInterrupt: - self.stop() - continue + break except Empty: continue @@ -739,11 +735,17 @@ def run(self): continue logger.info("New message received: %s", str(msg)) self.process(msg) + self.stop() def _handle_sigterm(self, signum, frame): logging.info("Caught SIGTERM, shutting down when all collections are finished.") self._sigterm_caught = True + def _keep_running(self): + if not self._loop or (self._sigterm_caught and not self.slots): + return False + return True + def triage_slots(self): """Check if there are slots ready for publication.""" slots = self.slots.copy()