Skip to content

Commit

Permalink
Add SIGTERM handling to segment gatherer
Browse files Browse the repository at this point in the history
  • Loading branch information
lahtinep committed Nov 11, 2024
1 parent acc2631 commit 170e68f
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
12 changes: 11 additions & 1 deletion pytroll_collectors/segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@

import datetime as dt
import logging.handlers
import os
import signal
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from enum import Enum
import os

import trollsift
from posttroll import message as pmessage
Expand Down Expand Up @@ -610,6 +611,7 @@ def __init__(self, config):
self._group_by_minutes = self._config.get('group_by_minutes', None)

self._loop = False
self._sigterm_caught = False
self._providing_server = self._config.get('providing_server')
self._is_first_message_after_start = True

Expand Down Expand Up @@ -711,9 +713,13 @@ def _collect_publisher_config(self):
def run(self):
"""Run SegmentGatherer."""
self._setup_messaging()
signal.signal(signal.SIGTERM, self._handle_sigterm)

self._loop = True
while self._loop:
if self._sigterm_caught and not self.slots:
self.stop()

self.triage_slots()

# Check listener for new messages
Expand All @@ -734,6 +740,10 @@ def run(self):
logger.info("New message received: %s", str(msg))
self.process(msg)

def _handle_sigterm(self, signum, frame):
logging.info("Caught SIGTERM, shutting down when all collections are finished.")
self._sigterm_caught = True

def triage_slots(self):
"""Check if there are slots ready for publication."""
slots = self.slots.copy()
Expand Down
51 changes: 51 additions & 0 deletions pytroll_collectors/tests/test_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,57 @@ def test_listener_use_first_nameserver(self):
self.msg0deg._setup_listener()
assert_messaging(None, None, None, 'localhost', None, ListenerContainer)

def test_sigterm(self):
"""Test that SIGTERM signal is handled."""
import os
import signal
import time
from multiprocessing import Process

with patch('pytroll_collectors.segments.ListenerContainer'):
col = SegmentGatherer(CONFIG_SINGLE)
proc = Process(target=col.run)
proc.start()
time.sleep(1)
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0

def test_sigterm_nonempty_slots(self):
"""Test that SIGTERM signal is handled properly when there are active slots present."""
import os
import signal
import time
from multiprocessing import Process

with patch('pytroll_collectors.segments.ListenerContainer'):
with patch('pytroll_collectors.segments.SegmentGatherer.triage_slots',
new=_fake_triage_slots):
col = SegmentGatherer(CONFIG_SINGLE)
proc = Process(target=col.run)
proc.start()
time.sleep(1)
tic = time.time()
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0
# Triage after the kill signal takes 1 s
assert time.time() - tic > 1.


def _fake_triage_slots(self):
"""Fake the triage_slots() method.
The fake triage adds a new slot if SIGTERM has not been caught, and removes it when the signal comes.
"""
import time
self.slots["foo"] = "bar"
if self._sigterm_caught:
del self.slots["foo"]
time.sleep(1)


def _get_message_from_metadata_and_patterns(mda, patterns):
fake_message = FakeMessage(mda)
Expand Down

0 comments on commit 170e68f

Please sign in to comment.