Skip to content

Commit

Permalink
Merge branch 'main' into fix_sensor_list_in_topic
Browse files Browse the repository at this point in the history
  • Loading branch information
ninahakansson committed Dec 19, 2024
2 parents f9d9b11 + 292e745 commit 49d5fe0
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
pytest --cov=pytroll_collectors pytroll_collectors/tests --cov-report=xml
- name: Upload unittest coverage to Codecov
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
with:
flags: unittests
file: ./coverage.xml
Expand Down
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
###############################################################################
## Version 0.17.0 (2024/11/13)

### Issues Closed

* [Issue 149](https://github.com/pytroll/pytroll-collectors/issues/149) - trollstalker seems to do nothing in current main

In this release 1 issue was closed.

### Pull Requests Merged

#### Features added

* [PR 157](https://github.com/pytroll/pytroll-collectors/pull/157) - Add SIGTERM handling to geographic gatherer
* [PR 156](https://github.com/pytroll/pytroll-collectors/pull/156) - Add SIGTERM handling to segment gatherer

In this release 2 pull requests were closed.

###############################################################################
## Version 0.16.0 (2024/02/16)

Expand Down
25 changes: 23 additions & 2 deletions pytroll_collectors/geographic_gatherer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"""Geographic segment gathering."""

import logging
import signal
import time

from configparser import NoOptionError, ConfigParser
Expand Down Expand Up @@ -56,6 +57,8 @@ def __init__(self, opts):
self.triggers = []
self.return_status = 0

self._sigterm_caught = False

self._clean_config()
self._setup_publisher()
try:
Expand Down Expand Up @@ -103,8 +106,9 @@ def _setup_triggers(self):

def run(self):
"""Run granule triggers."""
signal.signal(signal.SIGTERM, self._handle_sigterm)
try:
while True:
while self._keep_running():
time.sleep(1)
for trigger in self.triggers:
if not trigger.is_alive():
Expand All @@ -119,9 +123,26 @@ def run(self):

return self.return_status

def _handle_sigterm(self, signum, frame):
logger.info("Caught SIGTERM, shutting down when all collections are finished.")
self._sigterm_caught = True

def _keep_running(self):
keep_running = True
if self._sigterm_caught:
keep_running = self._trigger_collectors_have_granules()
return keep_running

def _trigger_collectors_have_granules(self):
for t in self.triggers:
for c in t.collectors:
if c.granules:
return True
return False

def stop(self):
"""Stop the gatherer."""
logger.info('Ending publication the gathering of granules...')
logger.info('Ending the gathering of granules...')
for trigger in self.triggers:
trigger.stop()
self.publisher.stop()
Expand Down
20 changes: 16 additions & 4 deletions pytroll_collectors/segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@

import datetime as dt
import logging.handlers
import os
import signal
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from enum import Enum
import os

import trollsift
from posttroll import message as pmessage
Expand Down Expand Up @@ -610,6 +611,7 @@ def __init__(self, config):
self._group_by_minutes = self._config.get('group_by_minutes', None)

self._loop = False
self._sigterm_caught = False
self._providing_server = self._config.get('providing_server')
self._is_first_message_after_start = True

Expand Down Expand Up @@ -711,9 +713,10 @@ def _collect_publisher_config(self):
def run(self):
"""Run SegmentGatherer."""
self._setup_messaging()
signal.signal(signal.SIGTERM, self._handle_sigterm)

self._loop = True
while self._loop:
while self._keep_running():
self.triage_slots()

# Check listener for new messages
Expand All @@ -722,8 +725,7 @@ def run(self):
except AttributeError:
msg = self._listener.queue.get(True, 1)
except KeyboardInterrupt:
self.stop()
continue
break
except Empty:
continue

Expand All @@ -733,6 +735,16 @@ def run(self):
continue
logger.info("New message received: %s", str(msg))
self.process(msg)
self.stop()

def _handle_sigterm(self, signum, frame):
logging.info("Caught SIGTERM, shutting down when all collections are finished.")
self._sigterm_caught = True

def _keep_running(self):
if not self._loop or (self._sigterm_caught and not self.slots):
return False
return True

def triage_slots(self):
"""Check if there are slots ready for publication."""
Expand Down
16 changes: 8 additions & 8 deletions pytroll_collectors/tests/test_fsspec_to_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ def create_files_to_pack(self, tmp_path):
@pytest.mark.parametrize(
("packing", "create_packfile", "filesystem_class"),
[
("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"),
("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"),
]
)
def test_pack_file_extract(self, packing, create_packfile, filesystem_class, tmp_path):
Expand Down Expand Up @@ -153,8 +153,8 @@ def test_pack_file_extract(self, packing, create_packfile, filesystem_class, tmp
@pytest.mark.parametrize(
("packing", "create_packfile", "filesystem_class"),
[
("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"),
("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"),
]
)
def test_pack_local_file_extract(self, packing, create_packfile, filesystem_class, tmp_path):
Expand Down Expand Up @@ -184,8 +184,8 @@ def test_pack_local_file_extract(self, packing, create_packfile, filesystem_clas
@pytest.mark.parametrize(
("packing", "create_packfile", "filesystem_class"),
[
("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"),
("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"),
]
)
def test_pack_local_file_extract_filesystem(self, packing, create_packfile, filesystem_class, tmp_path):
Expand All @@ -211,8 +211,8 @@ def check_filesystem_is_understood_by_fsspec(self, filesystem_info):
@pytest.mark.parametrize(
("packing", "create_packfile", "filesystem_class"),
[
("tar", create_tar_file, "fsspec.implementations.tar.TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip.ZipFileSystem"),
("tar", create_tar_file, "fsspec.implementations.tar:TarFileSystem"),
("zip", create_zip_file, "fsspec.implementations.zip:ZipFileSystem"),
]
)
def test_pack_local_file_extract_with_custom_options(self, packing, create_packfile, filesystem_class, tmp_path):
Expand Down
86 changes: 86 additions & 0 deletions pytroll_collectors/tests/test_geographic_gatherer.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,89 @@ def test_full_pass(self, sub_factory, monkeypatch, tmp_tle):
assert snd_msg.data == expected_msg.data
finally:
gatherer.stop()


def test_sigterm(tmp_config_file, tmp_config_parser):
"""Test that SIGTERM signal is handled."""
import os
import signal
import time
from multiprocessing import Process

from pytroll_collectors.geographic_gatherer import GeographicGatherer

with open(tmp_config_file, mode="w") as fp:
tmp_config_parser.write(fp)

opts = arg_parse(["-c", "minimal_config", "-p", "40000", "-n", "false", "-i", "localhost:12345",
str(tmp_config_file)])
# We don't need the triggers here. They also interfere with completing the test (the test never exits)
with patch("pytroll_collectors.geographic_gatherer.TriggerFactory.create"):
gatherer = GeographicGatherer(opts)
proc = Process(target=gatherer.run)
proc.start()
time.sleep(1)
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0


def test_sigterm_with_collection(tmp_config_file, tmp_config_parser):
"""Test that SIGTERM signal is handled when there is collection ongoing."""
import os
import signal
import time
from multiprocessing import Process

from pytroll_collectors.geographic_gatherer import GeographicGatherer

with open(tmp_config_file, mode="w") as fp:
tmp_config_parser.write(fp)

opts = arg_parse(["-c", "posttroll_section", "-p", "40000", "-n", "false", "-i", "localhost:12345",
str(tmp_config_file)])
# Use a fake trigger that initially sets some granules and after a while clears them
with patch("pytroll_collectors.geographic_gatherer.PostTrollTrigger",
new=FakeTriggerWithGranules):
gatherer = GeographicGatherer(opts)
proc = Process(target=gatherer.run)
proc.start()
time.sleep(1)
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0


class FakeTriggerWithGranules:
"""Fake trigger class used in testing SIGTERM handling.
At creation, adds "foo" to collector granules. When is_alive() is called the second time, it clears the granules.
"""

def __init__(self, collectors, *args, **kwargs):
"""Initialize the trigger class."""
self.collectors = collectors
for col in self.collectors:
col.granules.append("foo")
self._args = args
self._kwargs = kwargs
self._counter = 0

def is_alive(self):
"""Return True for alive thread."""
if self._counter > 0:
# On the second call clear the granules
for col in self.collectors:
col.granules = []
self._counter += 1
return True

def start(self):
"""Start the trigger."""
pass

def stop(self):
"""Stop the trigger."""
pass
51 changes: 51 additions & 0 deletions pytroll_collectors/tests/test_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,57 @@ def test_listener_use_first_nameserver(self):
self.msg0deg._setup_listener()
assert_messaging(None, None, None, 'localhost', None, ListenerContainer)

def test_sigterm(self):
"""Test that SIGTERM signal is handled."""
import os
import signal
import time
from multiprocessing import Process

with patch('pytroll_collectors.segments.ListenerContainer'):
col = SegmentGatherer(CONFIG_SINGLE)
proc = Process(target=col.run)
proc.start()
time.sleep(1)
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0

def test_sigterm_nonempty_slots(self):
"""Test that SIGTERM signal is handled properly when there are active slots present."""
import os
import signal
import time
from multiprocessing import Process

with patch('pytroll_collectors.segments.ListenerContainer'):
with patch('pytroll_collectors.segments.SegmentGatherer.triage_slots',
new=_fake_triage_slots):
col = SegmentGatherer(CONFIG_SINGLE)
proc = Process(target=col.run)
proc.start()
time.sleep(1)
tic = time.time()
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0
# Triage after the kill signal takes 1 s
assert time.time() - tic > 1.


def _fake_triage_slots(self):
"""Fake the triage_slots() method.
The fake triage adds a new slot if SIGTERM has not been caught, and removes it when the signal comes.
"""
import time
self.slots["foo"] = "bar"
if self._sigterm_caught:
del self.slots["foo"]
time.sleep(1)


def _get_message_from_metadata_and_patterns(mda, patterns):
fake_message = FakeMessage(mda)
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ tag_prefix = v
omit =
pytroll_collectors/_version.py
versioneer.py
relative_files = True

0 comments on commit 49d5fe0

Please sign in to comment.