From 645824879a59bef1bbef605efb9eb4506872c4f4 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 21 May 2021 11:09:44 +0200 Subject: [PATCH 1/6] Improve region collector unit test In preparation for adding a timeout, improve the region collector unit test. --- pytroll_collectors/tests/test_region_collector.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pytroll_collectors/tests/test_region_collector.py b/pytroll_collectors/tests/test_region_collector.py index 9764aa51..63c1965d 100644 --- a/pytroll_collectors/tests/test_region_collector.py +++ b/pytroll_collectors/tests/test_region_collector.py @@ -110,12 +110,14 @@ def test_adjust_timeout(europe, caplog): "uri": "file://alt/0"} alt_europe_collector = RegionCollector( europe, + timeliness=datetime.timedelta(seconds=600), granule_duration=datetime.timedelta(seconds=180)) with caplog.at_level(logging.DEBUG): alt_europe_collector.collect( {**granule_metadata, "start_time": datetime.datetime(2021, 4, 11, 10, 0)}) + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 28) alt_europe_collector.collect( {**granule_metadata, "start_time": datetime.datetime(2021, 4, 11, 10, 15)}) @@ -123,6 +125,7 @@ def test_adjust_timeout(europe, caplog): {**granule_metadata, "start_time": datetime.datetime(2021, 4, 11, 10, 12)}) assert "Adjusted timeout" in caplog.text + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 22) @pytest.mark.skip(reason="test never finishes") From 2fc041096bcf83bca4adb9be36d6ac1d781a3e65 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 21 May 2021 11:17:06 +0200 Subject: [PATCH 2/6] Added unit test to test silence timeout Added a unit test to test that monitor for silence is working as specified. --- .../tests/test_region_collector.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pytroll_collectors/tests/test_region_collector.py b/pytroll_collectors/tests/test_region_collector.py index 63c1965d..894e1e20 100644 --- a/pytroll_collectors/tests/test_region_collector.py +++ b/pytroll_collectors/tests/test_region_collector.py @@ -128,6 +128,36 @@ def test_adjust_timeout(europe, caplog): assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 22) +@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen) +def test_silence_timeout(europe, caplog): + """Test that the monitor for silence timeout is working.""" + from pytroll_collectors.region_collector import RegionCollector + granule_metadata = { + "sensor": "avhrr", + "tle_platform_name": "Metop-C", + "uri": "file://alt/0"} + alt_europe_collector = RegionCollector( + europe, + timeliness=datetime.timedelta(seconds=60), + granule_duration=datetime.timedelta(seconds=180), + silence=datetime.timedelta(seconds=300)) + alt_europe_collector.collect( + {**granule_metadata, + "start_time": datetime.datetime(2021, 4, 11, 10, 0)}) + # earliest timeout is due to monitor for silence + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 5) + alt_europe_collector.collect( + {**granule_metadata, + "start_time": datetime.datetime(2021, 4, 11, 10, 3)}) + # earliest timeout is due to monitor for silence + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 8) + alt_europe_collector.collect( + {**granule_metadata, + "start_time": datetime.datetime(2021, 4, 11, 10, 15)}) + # earliest timeout is due to duration + timelines + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 16) + + @pytest.mark.skip(reason="test never finishes") @unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen) def test_faulty_end_time(europe_collector, caplog): From 72faaa8bea34442fd5700f3fc395ffc9b676ce93 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 21 May 2021 11:40:51 +0200 Subject: [PATCH 3/6] Add geographic gatherer test using silence Add a test to the geographic gatherer to test that it's initiating the region collector with the silence keyword when such a silence is included in the configuration file. --- .../tests/test_geographic_gatherer.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 905bd749..d3f0f50e 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -79,6 +79,8 @@ def setUp(self): 'publish_topic': '/topic', 'watcher': 'Observer', } + self.config["silence_section"] = {**self.config["minimal_config"], + "silence": 300} self.RegionCollector = self._patch_and_add_cleanup( 'pytroll_collectors.geographic_gatherer.RegionCollector') @@ -185,6 +187,17 @@ def test_init_observer(self): self._watchdog_test( sections, gatherer, self.publisher, self.PostTrollTrigger, self.WatchDogTrigger, self.RegionCollector) + def test_init_silence(self): + """Test initialisation of GeographicGatherer with silence. + + Test that the GeographicGatherer is correctly initiated when monitoring + for silence is included in the configuration. + """ + from pytroll_collectors.geographic_gatherer import GeographicGatherer + sections = ["silence_config"] + opts = FakeOpts(sections) + GeographicGatherer(self.config, opts) + def _watchdog_test(self, sections, gatherer, publisher, PostTrollTrigger, WatchDogTrigger, RegionCollector): # There's one trigger assert len(gatherer.triggers) == 1 From a8f3f68d2e5dd8b763d74f177845b8f92a324307 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 21 May 2021 14:33:47 +0200 Subject: [PATCH 4/6] Add functionality for geographic gatherer silence Add functionality to monitor for silence in the geographic gatherer. --- pytroll_collectors/region_collector.py | 32 ++++++++++++++--- .../tests/test_geographic_gatherer.py | 4 +-- .../tests/test_region_collector.py | 36 ++++++++++--------- 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/pytroll_collectors/region_collector.py b/pytroll_collectors/region_collector.py index e1382fa8..a5617c64 100644 --- a/pytroll_collectors/region_collector.py +++ b/pytroll_collectors/region_collector.py @@ -48,8 +48,22 @@ class RegionCollector(object): def __init__(self, region, timeliness=None, - granule_duration=None): - """Initialize the region collector.""" + granule_duration=None, + silence=timedelta(days=9999)): + """Initialize the region collector. + + Args: + region (AreaDefinition): Area for which to collect granules. + timeliness (datetime.timedelta): Timeout after latest expected + granule. This timeout will be reached if the clock time + exceeds the time for the latest expected granule + + granule_duration + timeliness. + granule_duration (datetime.timedelta): Duration for a granule. + This will be used to calculate expected granules and to + estimate the timeliness-timeout (see above). + silence (datetime.timedelta): Regardless of what we expect, abort + collection if nothing has been received for this duration. + """ self.region = region # area def self.granule_times = set() self.granules = [] @@ -58,6 +72,7 @@ def __init__(self, region, self.timeout = None self.granule_duration = granule_duration self.last_file_added = False + self.silence = silence def __call__(self, granule_metadata): """Perform the collection on the granule.""" @@ -146,6 +161,7 @@ def _adjust_timeout(self): self.granule_times) + self.granule_duration + self.timeliness) + silence_timeout = datetime.now() + self.silence except ValueError: logger.error("Calculation of new timeout failed, " "keeping previous timeout.") @@ -155,6 +171,10 @@ def _adjust_timeout(self): if new_timeout < self.timeout: self.timeout = new_timeout logger.info("Adjusted timeout: %s", self.timeout.isoformat()) + elif silence_timeout > self.timeout: + self.timeout = min(new_timeout, silence_timeout) + if silence_timeout > new_timeout: + logger.debug(f"Silence timeout: {self.timeout:%Y-%m-%d %H:%M:%S}") def cleanup(self): """Clear members.""" @@ -200,9 +220,11 @@ def _predict_pass_granules(self, granule_metadata): _get_platform_name(granule_metadata), self.region.description, str(sorted(self.planned_granule_times))) - self.timeout = (max(self.planned_granule_times) + - self.granule_duration + - self.timeliness) + self.timeout = min( + datetime.now() + self.silence, + (max(self.planned_granule_times) + + self.granule_duration + + self.timeliness)) logger.info("Planned timeout for %s: %s", self.region.description, self.timeout.isoformat()) else: diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index d3f0f50e..0619ae38 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -194,7 +194,7 @@ def test_init_silence(self): for silence is included in the configuration. """ from pytroll_collectors.geographic_gatherer import GeographicGatherer - sections = ["silence_config"] + sections = ["silence_section"] opts = FakeOpts(sections) GeographicGatherer(self.config, opts) @@ -242,7 +242,7 @@ def test_init_all_sections(self): assert len(gatherer.triggers) == num_sections # See that the trigger classes have been accessed the correct times - assert self.PostTrollTrigger.call_count == 2 + assert self.PostTrollTrigger.call_count == 3 assert self.WatchDogTrigger.call_count == 2 # N regions for each section diff --git a/pytroll_collectors/tests/test_region_collector.py b/pytroll_collectors/tests/test_region_collector.py index 894e1e20..d2b26545 100644 --- a/pytroll_collectors/tests/test_region_collector.py +++ b/pytroll_collectors/tests/test_region_collector.py @@ -140,22 +140,26 @@ def test_silence_timeout(europe, caplog): europe, timeliness=datetime.timedelta(seconds=60), granule_duration=datetime.timedelta(seconds=180), - silence=datetime.timedelta(seconds=300)) - alt_europe_collector.collect( - {**granule_metadata, - "start_time": datetime.datetime(2021, 4, 11, 10, 0)}) - # earliest timeout is due to monitor for silence - assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 5) - alt_europe_collector.collect( - {**granule_metadata, - "start_time": datetime.datetime(2021, 4, 11, 10, 3)}) - # earliest timeout is due to monitor for silence - assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 8) - alt_europe_collector.collect( - {**granule_metadata, - "start_time": datetime.datetime(2021, 4, 11, 10, 15)}) - # earliest timeout is due to duration + timelines - assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 16) + silence=datetime.timedelta(seconds=900)) + caplog.set_level(logging.DEBUG) + with unittest.mock.patch("pytroll_collectors.region_collector.datetime") as prd: + prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 1) + alt_europe_collector.collect( + {**granule_metadata, + "start_time": datetime.datetime(2021, 4, 11, 10, 0)}) + # earliest timeout is due to monitor for silence + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 16) + prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 4) + alt_europe_collector.collect( + {**granule_metadata, + "start_time": datetime.datetime(2021, 4, 11, 10, 3)}) + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 19) + prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 16) + alt_europe_collector.collect( + {**granule_metadata, + "start_time": datetime.datetime(2021, 4, 11, 10, 15)}) + # earliest timeout is due to duration + timelines + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 16) @pytest.mark.skip(reason="test never finishes") From 1bb84b839d805651ecfccb74661408f64395a306 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 21 May 2021 14:47:18 +0200 Subject: [PATCH 5/6] Document silence monitoring Document the recently introduced silence monitoring feature. --- doc/source/index.rst | 25 ++++++++++++++++--- .../geographic_gatherer_config.ini_template | 3 +++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/doc/source/index.rst b/doc/source/index.rst index 18eec183..a0b3f8b5 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -147,9 +147,21 @@ perhaps from `trollstalker`_ or `segment-gatherer`_. Using the configured granule duration and the area of interest, it calculates the starting times of granules it should expect to be covered in this area before and after the granule it was messaged about. Collection is considered finished when either -all expected granules have been collected or when a timeout is reached, -whatever comes first. Timeout is configured with the ``timeliness`` option -(see below). +of three conditions is reached: + +- All expected granules have been collected. +- A timeout is reached due to the ``timeliness`` option. This timeout is + calculated based on expected *remaining* granules. That means the timeout + can change if the last granule is collected. For example, we expect + granules at times 0, 3, 6, 9, and 12. Granule duration is 3 minutes and + timeliness is 5 minutes. Initially the timeout is set at t=12+3+5=20. But + if we collect 0, 6, 9, and 12 (but not 3), then after 12 has been collected, + timeout is adjusted to 3+3+5=11. Since the granule at t=12 is probably + collected when the clock time is later than t=11, the collection of the final granule + at t=12 leads to an immediate trigger of the timeout after the collection of t=12. +- No granules are collected at all for a period of ``silence`` seconds. + Considering the previous example, if we collect 3, 6, 9, but not 12; if + silence is set to 5 minutes, then the timeout will be reached at t=9+5=14. .. _pytroll-schedule: http://pytroll-schedule.readthedocs.org/ .. _pyorbital: https://pyorbital.readthedocs.io/en/latest/ @@ -183,6 +195,11 @@ timeliness ``timeliness`` minutes after the expected end time of the last expected granule. +silence + Monitor for silence for this time (in seconds). If no messages are + received at all for this period, ship what we have regardless of other + timeouts. + And the following optional fields: service @@ -218,7 +235,7 @@ orbit_type nameserver Nameserver to use to publish posttroll messages. -.. literalinclude:: ../../examples/gatherer_config.ini_template +.. literalinclude:: ../../examples/geographic_gatherer_config.ini_template :language: ini scisys_receiver diff --git a/examples/geographic_gatherer_config.ini_template b/examples/geographic_gatherer_config.ini_template index 1c91fa90..5f9a442c 100644 --- a/examples/geographic_gatherer_config.ini_template +++ b/examples/geographic_gatherer_config.ini_template @@ -19,6 +19,9 @@ sensor = viirs timeliness = 10 # duration of a granule in SECONDS duration = 180 +# silence monitoring in seconds. Collection is stopped if no granules/messages +# are received for this amount of time. +# silence = 600 publish_topic = [ears_viirs] From bf5bf83fbcca0537742aab0a9321aca355daa409 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 26 May 2021 18:43:43 +0200 Subject: [PATCH 6/6] Add failing test It doesn't DWIM. Add failing test to reproduce this. --- pytroll_collectors/tests/test_region_collector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pytroll_collectors/tests/test_region_collector.py b/pytroll_collectors/tests/test_region_collector.py index 76235500..9b806075 100644 --- a/pytroll_collectors/tests/test_region_collector.py +++ b/pytroll_collectors/tests/test_region_collector.py @@ -177,12 +177,14 @@ def test_silence_timeout(europe, caplog): {**granule_metadata, "start_time": datetime.datetime(2021, 4, 11, 10, 3)}) assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 19) + assert "Planned timeout for euro_ma: 2021-04-11T10:19" in caplog.text prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 16) alt_europe_collector.collect( {**granule_metadata, "start_time": datetime.datetime(2021, 4, 11, 10, 15)}) - # earliest timeout is due to duration + timelines + # earliest timeout is due to duration + timeliness assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 16) + assert "Silence timeout: 2021-04-11 10:16" not in caplog.text @pytest.mark.skip(reason="test never finishes")