diff --git a/doc/source/index.rst b/doc/source/index.rst index 18eec183..a0b3f8b5 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -147,9 +147,21 @@ perhaps from `trollstalker`_ or `segment-gatherer`_. Using the configured granule duration and the area of interest, it calculates the starting times of granules it should expect to be covered in this area before and after the granule it was messaged about. Collection is considered finished when either -all expected granules have been collected or when a timeout is reached, -whatever comes first. Timeout is configured with the ``timeliness`` option -(see below). +of three conditions is reached: + +- All expected granules have been collected. +- A timeout is reached due to the ``timeliness`` option. This timeout is + calculated based on expected *remaining* granules. That means the timeout + can change if the last granule is collected. For example, we expect + granules at times 0, 3, 6, 9, and 12. Granule duration is 3 minutes and + timeliness is 5 minutes. Initially the timeout is set at t=12+3+5=20. But + if we collect 0, 6, 9, and 12 (but not 3), then after 12 has been collected, + timeout is adjusted to 3+3+5=11. Since the granule at t=12 is probably + collected when the clock time is later than t=11, the collection of the final granule + at t=12 leads to an immediate trigger of the timeout after the collection of t=12. +- No granules are collected at all for a period of ``silence`` seconds. + Considering the previous example, if we collect 3, 6, 9, but not 12; if + silence is set to 5 minutes, then the timeout will be reached at t=9+5=14. .. _pytroll-schedule: http://pytroll-schedule.readthedocs.org/ .. _pyorbital: https://pyorbital.readthedocs.io/en/latest/ @@ -183,6 +195,11 @@ timeliness ``timeliness`` minutes after the expected end time of the last expected granule. +silence + Monitor for silence for this time (in seconds). If no messages are + received at all for this period, ship what we have regardless of other + timeouts. + And the following optional fields: service @@ -218,7 +235,7 @@ orbit_type nameserver Nameserver to use to publish posttroll messages. -.. literalinclude:: ../../examples/gatherer_config.ini_template +.. literalinclude:: ../../examples/geographic_gatherer_config.ini_template :language: ini scisys_receiver diff --git a/examples/geographic_gatherer_config.ini_template b/examples/geographic_gatherer_config.ini_template index 1c91fa90..5f9a442c 100644 --- a/examples/geographic_gatherer_config.ini_template +++ b/examples/geographic_gatherer_config.ini_template @@ -19,6 +19,9 @@ sensor = viirs timeliness = 10 # duration of a granule in SECONDS duration = 180 +# silence monitoring in seconds. Collection is stopped if no granules/messages +# are received for this amount of time. +# silence = 600 publish_topic = [ears_viirs] diff --git a/pytroll_collectors/region_collector.py b/pytroll_collectors/region_collector.py index e1382fa8..a5617c64 100644 --- a/pytroll_collectors/region_collector.py +++ b/pytroll_collectors/region_collector.py @@ -48,8 +48,22 @@ class RegionCollector(object): def __init__(self, region, timeliness=None, - granule_duration=None): - """Initialize the region collector.""" + granule_duration=None, + silence=timedelta(days=9999)): + """Initialize the region collector. + + Args: + region (AreaDefinition): Area for which to collect granules. + timeliness (datetime.timedelta): Timeout after latest expected + granule. This timeout will be reached if the clock time + exceeds the time for the latest expected granule + + granule_duration + timeliness. + granule_duration (datetime.timedelta): Duration for a granule. + This will be used to calculate expected granules and to + estimate the timeliness-timeout (see above). + silence (datetime.timedelta): Regardless of what we expect, abort + collection if nothing has been received for this duration. + """ self.region = region # area def self.granule_times = set() self.granules = [] @@ -58,6 +72,7 @@ def __init__(self, region, self.timeout = None self.granule_duration = granule_duration self.last_file_added = False + self.silence = silence def __call__(self, granule_metadata): """Perform the collection on the granule.""" @@ -146,6 +161,7 @@ def _adjust_timeout(self): self.granule_times) + self.granule_duration + self.timeliness) + silence_timeout = datetime.now() + self.silence except ValueError: logger.error("Calculation of new timeout failed, " "keeping previous timeout.") @@ -155,6 +171,10 @@ def _adjust_timeout(self): if new_timeout < self.timeout: self.timeout = new_timeout logger.info("Adjusted timeout: %s", self.timeout.isoformat()) + elif silence_timeout > self.timeout: + self.timeout = min(new_timeout, silence_timeout) + if silence_timeout > new_timeout: + logger.debug(f"Silence timeout: {self.timeout:%Y-%m-%d %H:%M:%S}") def cleanup(self): """Clear members.""" @@ -200,9 +220,11 @@ def _predict_pass_granules(self, granule_metadata): _get_platform_name(granule_metadata), self.region.description, str(sorted(self.planned_granule_times))) - self.timeout = (max(self.planned_granule_times) + - self.granule_duration + - self.timeliness) + self.timeout = min( + datetime.now() + self.silence, + (max(self.planned_granule_times) + + self.granule_duration + + self.timeliness)) logger.info("Planned timeout for %s: %s", self.region.description, self.timeout.isoformat()) else: diff --git a/pytroll_collectors/tests/test_geographic_gatherer.py b/pytroll_collectors/tests/test_geographic_gatherer.py index 905bd749..0619ae38 100644 --- a/pytroll_collectors/tests/test_geographic_gatherer.py +++ b/pytroll_collectors/tests/test_geographic_gatherer.py @@ -79,6 +79,8 @@ def setUp(self): 'publish_topic': '/topic', 'watcher': 'Observer', } + self.config["silence_section"] = {**self.config["minimal_config"], + "silence": 300} self.RegionCollector = self._patch_and_add_cleanup( 'pytroll_collectors.geographic_gatherer.RegionCollector') @@ -185,6 +187,17 @@ def test_init_observer(self): self._watchdog_test( sections, gatherer, self.publisher, self.PostTrollTrigger, self.WatchDogTrigger, self.RegionCollector) + def test_init_silence(self): + """Test initialisation of GeographicGatherer with silence. + + Test that the GeographicGatherer is correctly initiated when monitoring + for silence is included in the configuration. + """ + from pytroll_collectors.geographic_gatherer import GeographicGatherer + sections = ["silence_section"] + opts = FakeOpts(sections) + GeographicGatherer(self.config, opts) + def _watchdog_test(self, sections, gatherer, publisher, PostTrollTrigger, WatchDogTrigger, RegionCollector): # There's one trigger assert len(gatherer.triggers) == 1 @@ -229,7 +242,7 @@ def test_init_all_sections(self): assert len(gatherer.triggers) == num_sections # See that the trigger classes have been accessed the correct times - assert self.PostTrollTrigger.call_count == 2 + assert self.PostTrollTrigger.call_count == 3 assert self.WatchDogTrigger.call_count == 2 # N regions for each section diff --git a/pytroll_collectors/tests/test_region_collector.py b/pytroll_collectors/tests/test_region_collector.py index ded2a007..9b806075 100644 --- a/pytroll_collectors/tests/test_region_collector.py +++ b/pytroll_collectors/tests/test_region_collector.py @@ -133,12 +133,14 @@ def test_adjust_timeout(europe, caplog): "uri": "file://alt/0"} alt_europe_collector = RegionCollector( europe, + timeliness=datetime.timedelta(seconds=600), granule_duration=datetime.timedelta(seconds=180)) with caplog.at_level(logging.DEBUG): alt_europe_collector.collect( {**granule_metadata, "start_time": datetime.datetime(2021, 4, 11, 10, 0)}) + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 28) alt_europe_collector.collect( {**granule_metadata, "start_time": datetime.datetime(2021, 4, 11, 10, 15)}) @@ -146,6 +148,43 @@ def test_adjust_timeout(europe, caplog): {**granule_metadata, "start_time": datetime.datetime(2021, 4, 11, 10, 12)}) assert "Adjusted timeout" in caplog.text + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 22) + + +@unittest.mock.patch("pyorbital.tlefile.urlopen", new=_fakeopen) +def test_silence_timeout(europe, caplog): + """Test that the monitor for silence timeout is working.""" + from pytroll_collectors.region_collector import RegionCollector + granule_metadata = { + "sensor": "avhrr", + "tle_platform_name": "Metop-C", + "uri": "file://alt/0"} + alt_europe_collector = RegionCollector( + europe, + timeliness=datetime.timedelta(seconds=60), + granule_duration=datetime.timedelta(seconds=180), + silence=datetime.timedelta(seconds=900)) + caplog.set_level(logging.DEBUG) + with unittest.mock.patch("pytroll_collectors.region_collector.datetime") as prd: + prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 1) + alt_europe_collector.collect( + {**granule_metadata, + "start_time": datetime.datetime(2021, 4, 11, 10, 0)}) + # earliest timeout is due to monitor for silence + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 16) + prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 4) + alt_europe_collector.collect( + {**granule_metadata, + "start_time": datetime.datetime(2021, 4, 11, 10, 3)}) + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 19) + assert "Planned timeout for euro_ma: 2021-04-11T10:19" in caplog.text + prd.now.return_value = datetime.datetime(2021, 4, 11, 10, 16) + alt_europe_collector.collect( + {**granule_metadata, + "start_time": datetime.datetime(2021, 4, 11, 10, 15)}) + # earliest timeout is due to duration + timeliness + assert alt_europe_collector.timeout == datetime.datetime(2021, 4, 11, 10, 16) + assert "Silence timeout: 2021-04-11 10:16" not in caplog.text @pytest.mark.skip(reason="test never finishes")