From b4c1c48ea6438b20500a9e390f07d44eb6a1e9ca Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 10:31:12 +0200 Subject: [PATCH 1/6] Add SIGTERM handling to geographic gatherer --- pytroll_collectors/geographic_gatherer.py | 17 +++++++++++++ .../tests/test_geographic_gatherer.py | 24 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index 3312d227..672e025a 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -25,6 +25,7 @@ """Geographic segment gathering.""" import logging +import signal import time from configparser import NoOptionError, ConfigParser @@ -56,6 +57,8 @@ def __init__(self, opts): self.triggers = [] self.return_status = 0 + self._sigterm_caught = False + self._clean_config() self._setup_publisher() try: @@ -103,8 +106,10 @@ def _setup_triggers(self): def run(self): """Run granule triggers.""" + signal.signal(signal.SIGTERM, self._handle_sigterm) try: while True: + self._check_sigterm() time.sleep(1) for trigger in self.triggers: if not trigger.is_alive(): @@ -119,6 +124,18 @@ def run(self): return self.return_status + def _handle_sigterm(self, signum, frame): + logger.info("Caught SIGTERM, shutting down when all collections are finished.") + self._sigterm_caught = True + + def _check_sigterm(self): + if self._sigterm_caught: + for t in self.triggers: + for c in t.collectors: + if c.granules: + return + raise KeyboardInterrupt("No ongoing collections.") + def stop(self): """Stop the gatherer.""" logger.info('Ending publication the gathering of granules...') diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 077d2bd2..915ed8fe 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -527,3 +527,27 @@ def test_full_pass(self, sub_factory, monkeypatch, tmp_tle): assert snd_msg.data == expected_msg.data finally: gatherer.stop() + + +def test_sigterm(tmp_config_file, tmp_config_parser): + """Test that SIGTERM signal is handled.""" + import os + import signal + import time + from multiprocessing import Process + + from pytroll_collectors.geographic_gatherer import GeographicGatherer + + with open(tmp_config_file, mode="w") as fp: + tmp_config_parser.write(fp) + + opts = arg_parse(["-c", "minimal_config", "-p", "40000", "-n", "false", "-i", "localhost:12345", + str(tmp_config_file)]) + gatherer = GeographicGatherer(opts) + proc = Process(target=gatherer.run) + proc.start() + time.sleep(1) + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 From b0ff6ab35c625ba8056e05b0df3fedb45abc313b Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 10:32:16 +0200 Subject: [PATCH 2/6] Clarify log message --- pytroll_collectors/geographic_gatherer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index 672e025a..f16a810b 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -138,7 +138,7 @@ def _check_sigterm(self): def stop(self): """Stop the gatherer.""" - logger.info('Ending publication the gathering of granules...') + logger.info('Ending the gathering of granules...') for trigger in self.triggers: trigger.stop() self.publisher.stop() From 43b8cc73039b87da43c6252dd83bdf5c861a9b97 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 14:42:01 +0200 Subject: [PATCH 3/6] Make SIGTERM checking return boolean depending on the status --- pytroll_collectors/geographic_gatherer.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index f16a810b..0dcebef2 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -108,8 +108,7 @@ def run(self): """Run granule triggers.""" signal.signal(signal.SIGTERM, self._handle_sigterm) try: - while True: - self._check_sigterm() + while self._keep_running(): time.sleep(1) for trigger in self.triggers: if not trigger.is_alive(): @@ -128,13 +127,15 @@ def _handle_sigterm(self, signum, frame): logger.info("Caught SIGTERM, shutting down when all collections are finished.") self._sigterm_caught = True - def _check_sigterm(self): + def _keep_running(self): + keep_running = True if self._sigterm_caught: + keep_running = False for t in self.triggers: for c in t.collectors: if c.granules: - return - raise KeyboardInterrupt("No ongoing collections.") + keep_running = True + return keep_running def stop(self): """Stop the gatherer.""" From 962e875596646b436b322cc53d736a25f2d8b28c Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Wed, 13 Nov 2024 13:12:02 +0200 Subject: [PATCH 4/6] Patch the trigger creation --- pytroll_collectors/tests/test_geographic_gatherer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 915ed8fe..1616ebe9 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -543,7 +543,9 @@ def test_sigterm(tmp_config_file, tmp_config_parser): opts = arg_parse(["-c", "minimal_config", "-p", "40000", "-n", "false", "-i", "localhost:12345", str(tmp_config_file)]) - gatherer = GeographicGatherer(opts) + # We don't need the triggers here. They also interfere with completing the test (the test never exits) + with patch("pytroll_collectors.geographic_gatherer.TriggerFactory.create"): + gatherer = GeographicGatherer(opts) proc = Process(target=gatherer.run) proc.start() time.sleep(1) From ce5a0d78c019a0cac04f099432c11622524f0f21 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Wed, 13 Nov 2024 13:20:16 +0200 Subject: [PATCH 5/6] Refactor _keep_running() --- pytroll_collectors/geographic_gatherer.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index 0dcebef2..f446ac6f 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -130,13 +130,16 @@ def _handle_sigterm(self, signum, frame): def _keep_running(self): keep_running = True if self._sigterm_caught: - keep_running = False - for t in self.triggers: - for c in t.collectors: - if c.granules: - keep_running = True + keep_running = self._trigger_collectors_have_granules() return keep_running + def _trigger_collectors_have_granules(self): + for t in self.triggers: + for c in t.collectors: + if c.granules: + return True + return False + def stop(self): """Stop the gatherer.""" logger.info('Ending the gathering of granules...') From e866900bce5e5075a7d616eaefada426bc9f742a Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Wed, 13 Nov 2024 14:21:58 +0200 Subject: [PATCH 6/6] Add SIGTERM test for ongoing collection --- .../tests/test_geographic_gatherer.py | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 1616ebe9..44548f4e 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -553,3 +553,63 @@ def test_sigterm(tmp_config_file, tmp_config_parser): proc.join() assert proc.exitcode == 0 + + +def test_sigterm_with_collection(tmp_config_file, tmp_config_parser): + """Test that SIGTERM signal is handled when there is collection ongoing.""" + import os + import signal + import time + from multiprocessing import Process + + from pytroll_collectors.geographic_gatherer import GeographicGatherer + + with open(tmp_config_file, mode="w") as fp: + tmp_config_parser.write(fp) + + opts = arg_parse(["-c", "posttroll_section", "-p", "40000", "-n", "false", "-i", "localhost:12345", + str(tmp_config_file)]) + # Use a fake trigger that initially sets some granules and after a while clears them + with patch("pytroll_collectors.geographic_gatherer.PostTrollTrigger", + new=FakeTriggerWithGranules): + gatherer = GeographicGatherer(opts) + proc = Process(target=gatherer.run) + proc.start() + time.sleep(1) + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0 + + +class FakeTriggerWithGranules: + """Fake trigger class used in testing SIGTERM handling. + + At creation, adds "foo" to collector granules. When is_alive() is called the second time, it clears the granules. + """ + + def __init__(self, collectors, *args, **kwargs): + """Initialize the trigger class.""" + self.collectors = collectors + for col in self.collectors: + col.granules.append("foo") + self._args = args + self._kwargs = kwargs + self._counter = 0 + + def is_alive(self): + """Return True for alive thread.""" + if self._counter > 0: + # On the second call clear the granules + for col in self.collectors: + col.granules = [] + self._counter += 1 + return True + + def start(self): + """Start the trigger.""" + pass + + def stop(self): + """Stop the trigger.""" + pass