From b4c1c48ea6438b20500a9e390f07d44eb6a1e9ca Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 12 Nov 2024 10:31:12 +0200 Subject: [PATCH] Add SIGTERM handling to geographic gatherer --- pytroll_collectors/geographic_gatherer.py | 17 +++++++++++++ .../tests/test_geographic_gatherer.py | 24 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/pytroll_collectors/geographic_gatherer.py b/pytroll_collectors/geographic_gatherer.py index 3312d227..672e025a 100644 --- a/pytroll_collectors/geographic_gatherer.py +++ b/pytroll_collectors/geographic_gatherer.py @@ -25,6 +25,7 @@ """Geographic segment gathering.""" import logging +import signal import time from configparser import NoOptionError, ConfigParser @@ -56,6 +57,8 @@ def __init__(self, opts): self.triggers = [] self.return_status = 0 + self._sigterm_caught = False + self._clean_config() self._setup_publisher() try: @@ -103,8 +106,10 @@ def _setup_triggers(self): def run(self): """Run granule triggers.""" + signal.signal(signal.SIGTERM, self._handle_sigterm) try: while True: + self._check_sigterm() time.sleep(1) for trigger in self.triggers: if not trigger.is_alive(): @@ -119,6 +124,18 @@ def run(self): return self.return_status + def _handle_sigterm(self, signum, frame): + logger.info("Caught SIGTERM, shutting down when all collections are finished.") + self._sigterm_caught = True + + def _check_sigterm(self): + if self._sigterm_caught: + for t in self.triggers: + for c in t.collectors: + if c.granules: + return + raise KeyboardInterrupt("No ongoing collections.") + def stop(self): """Stop the gatherer.""" logger.info('Ending publication the gathering of granules...') diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 077d2bd2..915ed8fe 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -527,3 +527,27 @@ def test_full_pass(self, sub_factory, monkeypatch, tmp_tle): assert snd_msg.data == expected_msg.data finally: gatherer.stop() + + +def test_sigterm(tmp_config_file, tmp_config_parser): + """Test that SIGTERM signal is handled.""" + import os + import signal + import time + from multiprocessing import Process + + from pytroll_collectors.geographic_gatherer import GeographicGatherer + + with open(tmp_config_file, mode="w") as fp: + tmp_config_parser.write(fp) + + opts = arg_parse(["-c", "minimal_config", "-p", "40000", "-n", "false", "-i", "localhost:12345", + str(tmp_config_file)]) + gatherer = GeographicGatherer(opts) + proc = Process(target=gatherer.run) + proc.start() + time.sleep(1) + os.kill(proc.pid, signal.SIGTERM) + proc.join() + + assert proc.exitcode == 0