diff --git a/setup.py b/setup.py index 81c6a6ea..688c3aa3 100644 --- a/setup.py +++ b/setup.py @@ -36,10 +36,10 @@ pass -install_requires = ['pyyaml', 'dpath', 'trollsift'] +install_requires = ['pyyaml', 'dpath', 'trollsift', 'posttroll>=1.10.0'] if "test" not in sys.argv: - install_requires += ['posttroll', 'satpy>=0.32.0', 'pyorbital'] + install_requires += ['satpy>=0.32.0', 'pyorbital'] NAME = 'trollflow2' README = open('README.md', 'r').read() diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 65cc287b..9bd3b35a 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -23,7 +23,7 @@ import datetime as dt import os import pathlib -from contextlib import contextmanager, suppress, nullcontext +from contextlib import contextmanager, nullcontext, suppress from logging import getLogger from tempfile import NamedTemporaryFile from urllib.parse import urlsplit, urlunsplit @@ -471,14 +471,14 @@ def __setstate__(self, kwargs): logger.debug('Starting publisher') self.port = kwargs.get('port', 0) self.nameservers = kwargs.get('nameservers', "") - self._pub_starter = create_publisher_from_dict_config( + self.pub = create_publisher_from_dict_config( { 'port': self.port, 'nameservers': self.nameservers, 'name': 'l2processor', } ) - self.pub = self._pub_starter.start() + self.pub.start() @staticmethod def create_message(fmat, mda): diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 5036b4be..2e27338e 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -26,19 +26,20 @@ import datetime as dt import logging import os -import unittest import pathlib import queue +import unittest from functools import partial from unittest import mock +import dask.array as da import numpy as np import pytest -from pyresample.geometry import DynamicAreaDefinition, create_area_def import rasterio - -import dask.array as da import xarray as xr +from posttroll.message import Message +from posttroll.testing import patched_publisher +from pyresample.geometry import DynamicAreaDefinition, create_area_def from trollflow2.launcher import read_config from trollflow2.tests.utils import TestCase, create_filenames_and_topics @@ -380,8 +381,8 @@ def test_save_datasets(self): the_queue = mock.MagicMock() job = _create_job_for_save_datasets() job['produced_files'] = the_queue - with mock.patch('trollflow2.plugins.compute_writer_results'),\ - mock.patch('trollflow2.plugins.DataQuery') as dsid,\ + with mock.patch('trollflow2.plugins.compute_writer_results'), \ + mock.patch('trollflow2.plugins.DataQuery') as dsid, \ mock.patch('os.rename') as rename: save_datasets(job) expected_sd = [mock.call(dsid.return_value, compute=False, @@ -560,8 +561,8 @@ def test_save_datasets_eager(self): job = _create_job_for_save_datasets() job['product_list']['product_list']['eager_writing'] = True - with mock.patch('trollflow2.plugins.compute_writer_results') as compute_writer_results,\ - mock.patch('trollflow2.plugins.DataQuery'),\ + with mock.patch('trollflow2.plugins.compute_writer_results') as compute_writer_results, \ + mock.patch('trollflow2.plugins.DataQuery'), \ mock.patch('os.rename'): save_datasets(job) sd_calls = (job['resampled_scenes']['euron1'].save_dataset.mock_calls @@ -696,12 +697,12 @@ def fake_scene(): def test_save_datasets_callback(tmp_path, caplog, fake_scene): - """Test callback functionality for save_datasets + """Test callback functionality for save_datasets. Test that the functionality for a callback after each produced file is working correctly in the save_datasets plugin. """ - from trollflow2.plugins import save_datasets, callback_close + from trollflow2.plugins import callback_close, save_datasets job = {} job['input_mda'] = input_mda @@ -709,7 +710,7 @@ def test_save_datasets_callback(tmp_path, caplog, fake_scene): logger = logging.getLogger("testlogger") def testlog(obj, targs, job, fmat_config): - """Toy function doing some logging""" + """Toy function doing some logging.""" filename = fmat_config["filename"] # ensure computation has indeed completed and file was flushed p = pathlib.Path(filename) @@ -762,10 +763,9 @@ def testlog(obj, targs, job, fmat_config): def test_save_datasets_callback_move_check_products(tmp_path, caplog, fake_scene): - """Test that check_products and the callback move can cooperate. - """ - from trollflow2.plugins import save_datasets, callback_close, callback_move + """Test that check_products and the callback move can cooperate.""" from trollflow2.launcher import check_results + from trollflow2.plugins import callback_close, callback_move, save_datasets job = {} job['input_mda'] = input_mda @@ -1205,9 +1205,9 @@ def test_metadata_is_read_from_scene(self): """Test that the scene and message metadata are merged correctly.""" from trollflow2.plugins import check_sunlight_coverage - with mock.patch('trollflow2.plugins.Pass') as ts_pass,\ - mock.patch('trollflow2.plugins.get_twilight_poly'),\ - mock.patch('trollflow2.plugins.get_area_def'),\ + with mock.patch('trollflow2.plugins.Pass') as ts_pass, \ + mock.patch('trollflow2.plugins.get_twilight_poly'), \ + mock.patch('trollflow2.plugins.get_area_def'), \ mock.patch("trollflow2.plugins._get_sunlight_coverage") as _get_sunlight_coverage: _get_sunlight_coverage.return_value = .3 scene = _get_mocked_scene_with_properties() @@ -1223,7 +1223,7 @@ def test_fully_sunlit_scene_returns_full_coverage(self): from pyresample.spherical import SphPolygon from trollflow2.plugins import check_sunlight_coverage - with mock.patch('trollflow2.plugins.Pass') as tst_pass,\ + with mock.patch('trollflow2.plugins.Pass') as tst_pass, \ mock.patch('trollflow2.plugins.get_twilight_poly') as twilight: tst_pass.return_value.boundary.contour_poly = SphPolygon(np.deg2rad(np.array([(0, 0), (0, 90), (45, 0)]))) twilight.return_value = SphPolygon(np.deg2rad(np.array([(0, 0), (0, 90), (90, 0)]))) @@ -1237,9 +1237,9 @@ def test_fully_sunlit_scene_returns_full_coverage(self): def test_product_not_loaded(self): """Test that product isn't loaded when sunlight coverage is too low.""" from trollflow2.plugins import check_sunlight_coverage, metadata_alias - with mock.patch('trollflow2.plugins.Pass') as ts_pass,\ - mock.patch('trollflow2.plugins.get_twilight_poly'),\ - mock.patch('trollflow2.plugins.get_area_def'),\ + with mock.patch('trollflow2.plugins.Pass') as ts_pass, \ + mock.patch('trollflow2.plugins.get_twilight_poly'), \ + mock.patch('trollflow2.plugins.get_area_def'), \ mock.patch("trollflow2.plugins._get_sunlight_coverage") as _get_sunlight_coverage: job = {} scene = _get_mocked_scene_with_properties() @@ -1260,9 +1260,9 @@ def test_product_not_loaded(self): def test_sunlight_filter(self): """Test that product isn't loaded when sunlight coverage is to low.""" from trollflow2.plugins import check_sunlight_coverage, metadata_alias - with mock.patch('trollflow2.plugins.Pass'),\ - mock.patch('trollflow2.plugins.get_twilight_poly'),\ - mock.patch('trollflow2.plugins.get_area_def'),\ + with mock.patch('trollflow2.plugins.Pass'), \ + mock.patch('trollflow2.plugins.get_twilight_poly'), \ + mock.patch('trollflow2.plugins.get_area_def'), \ mock.patch("trollflow2.plugins._get_sunlight_coverage") as _get_sunlight_coverage: job = {} scene = _get_mocked_scene_with_properties() @@ -1425,7 +1425,7 @@ def test_covers_uses_only_one_sensor(self): "scene": scn} job2 = copy.deepcopy(job) - with mock.patch('trollflow2.plugins.get_scene_coverage') as get_scene_coverage,\ + with mock.patch('trollflow2.plugins.get_scene_coverage') as get_scene_coverage, \ mock.patch('trollflow2.plugins.Pass'): get_scene_coverage.return_value = 10.0 covers(job) @@ -1443,7 +1443,7 @@ def test_covers_uses_only_one_sensor(self): def test_scene_coverage(self): """Test scene coverage.""" from trollflow2.plugins import get_scene_coverage - with mock.patch('trollflow2.plugins.get_area_def') as get_area_def,\ + with mock.patch('trollflow2.plugins.get_area_def') as get_area_def, \ mock.patch('trollflow2.plugins.Pass') as ts_pass: area_coverage = mock.MagicMock() area_coverage.return_value = 0.2 @@ -1740,7 +1740,7 @@ def setUp(self): def test_add_overviews(self): """Test adding overviews.""" from trollflow2.plugins import add_overviews - with mock.patch('trollflow2.plugins.Resampling') as resampling,\ + with mock.patch('trollflow2.plugins.Resampling') as resampling, \ mock.patch('trollflow2.plugins.rasterio') as rasterio: # Mock the rasterio.open context manager dst = mock.MagicMock() @@ -1788,6 +1788,19 @@ def test_filepublisher_is_stopped_on_exit(self): publisher.stop.assert_called() assert pub.pub is None + def test_publisher_is_deleted_from_instance(self): + """Real test.""" + import gc + + from posttroll.testing import patched_publisher + + from trollflow2.plugins import FilePublisher + with patched_publisher(): + file_pub = FilePublisher(nameservers=False, port=2023) + publisher = file_pub.pub + file_pub.__del__() + assert file_pub not in gc.get_referrers(publisher) + def test_filepublisher_with_compose(self): """Test filepublisher with compose.""" from satpy import Scene @@ -1802,35 +1815,35 @@ def test_filepublisher_with_compose(self): 'input_mda': self.input_mda, 'resampled_scenes': dict(euron1=scn_euron1)} - with mock.patch('trollflow2.plugins.Message') as Message, \ - mock.patch.multiple( - 'posttroll.publisher', NoisyPublisher=mock.DEFAULT, Publisher=mock.DEFAULT): - message = Message - - pub = FilePublisher() + with patched_publisher() as published_messages: + pub = FilePublisher(nameservers=False, port=2009) product_list = self.product_list.copy() product_list['product_list']['publish_topic'] = '/{areaname}/{productname}' - topics = create_filenames_and_topics(job) + _ = create_filenames_and_topics(job) pub(job) - message.assert_called() - pub.pub.send.assert_called() - call_count = 0 - for area in job['product_list']['product_list']['areas']: - for _prod in job['product_list']['product_list']['areas'][area]: - # Skip calls to __str__ - if 'call().__str__()' != str(message.mock_calls[call_count]): - self.assertTrue(topics[call_count] in str(message.mock_calls[call_count])) - call_count += 1 - self.assertEqual(call_count, 1) - self.assertEqual(message.call_args[0][2]['processing_center'], 'SMHI') + assert len(published_messages) == 3 + + formats = [] + directory = "/tmp/satdmz/pps/www/latest_2018/" + filename_base = "NOAA-15_20190217_0600_euron1_in_fname_ctth_static" + for rawmsg in published_messages: + msg = Message(rawstr=rawmsg) + if msg.type == "file": + assert msg.data["area"] == "euron1" + assert msg.data["product"] == "cloud_top_height" + assert msg.data["uri"].startswith(os.path.join(directory, filename_base)) + assert msg.data["processing_center"] == "SMHI" + assert msg.subject == "/euron1_in_fname/cloud_top_height_in_fname" + formats.append(msg.data["format"]) + assert formats == ["png", "jpg"] + del pub def test_filepublisher_without_compose(self): """Test filepublisher without compose.""" from satpy import Scene from satpy.tests.utils import make_dataid - scn_euron1 = Scene() dataid = make_dataid(name='cloud_top_height', resolution=1000) scn_euron1[dataid] = mock.MagicMock() @@ -1838,23 +1851,23 @@ def test_filepublisher_without_compose(self): 'input_mda': self.input_mda, 'resampled_scenes': dict(euron1=scn_euron1)} - with mock.patch('trollflow2.plugins.Message') as Message, \ - mock.patch.multiple( - 'posttroll.publisher', NoisyPublisher=mock.DEFAULT, Publisher=mock.DEFAULT): - message = Message - + with patched_publisher() as published_messages: pub, topics = self._run_publisher_on_job(job) - message.assert_called() - pub.pub.send.assert_called() - - call_count = 0 - for area in job['product_list']['product_list']['areas']: - for _prod in job['product_list']['product_list']['areas'][area]: - # Skip calls to __str__ - if 'call().__str__()' != str(message.mock_calls[call_count]): - self.assertTrue(topics[call_count] in str(message.mock_calls[call_count])) - call_count += 1 - self.assertEqual(call_count, 1) + assert len(published_messages) == 3 + + formats = [] + directory = "/tmp/satdmz/pps/www/latest_2018/" + filename_base = "NOAA-15_20190217_0600_euron1_in_fname_ctth_static" + for rawmsg in published_messages: + msg = Message(rawstr=rawmsg) + if msg.type == "file": + assert msg.data["area"] == "euron1" + assert msg.data["product"] == "cloud_top_height" + assert msg.data["uri"].startswith(os.path.join(directory, filename_base)) + assert msg.subject in topics + formats.append(msg.data["format"]) + assert formats == ["png", "jpg"] + del pub def test_filepublisher_s3_files(self): """Test filepublisher with files saved to S3.""" @@ -1868,20 +1881,15 @@ def test_filepublisher_s3_files(self): 'input_mda': self.input_mda, 'resampled_scenes': dict(euron1=scn_euron1)} - with mock.patch('trollflow2.plugins.Message') as message: - with mock.patch.multiple('posttroll.publisher', - NoisyPublisher=mock.DEFAULT, - Publisher=mock.DEFAULT): - _, _ = self._run_publisher_on_job(job, s3_paths=True) - for call_ in message.mock_calls: - if 'call().__str__()' != str(call_): - type_ = call_.args[1] - mda = call_.args[2] - if type_ == 'dispatch': - uri = mda['file_mda']['uri'] - else: - uri = mda['uri'] - assert uri.startswith('s3://bucket-name/') + with patched_publisher() as published_messages: + _, _ = self._run_publisher_on_job(job, s3_paths=True) + for rawmsg in published_messages: + msg = Message(rawstr=rawmsg) + if msg.type == "dispatch": + uri = msg.data["file_mda"]["uri"] + else: + uri = msg.data["uri"] + assert uri.startswith('s3://bucket-name/') def test_non_existing_products_are_not_published(self): """Test that non existing products are not published.""" @@ -1891,9 +1899,9 @@ def test_non_existing_products_are_not_published(self): job = {"scene": scn, "product_list": self.product_list, 'input_mda': self.input_mda, 'resampled_scenes': dict(euron1=Scene(), germ=Scene())} - with mock.patch('trollflow2.plugins.Message') as message, mock.patch('posttroll.publisher.NoisyPublisher'): + with patched_publisher() as published_messages: self._run_publisher_on_job(job) - message.assert_not_called() + assert len(published_messages) == 0 def test_multiple_dataset_files_can_be_published(self): """Test that netcdf files with multiple datasets can be published normally.""" @@ -1907,17 +1915,19 @@ def test_multiple_dataset_files_can_be_published(self): job = {"scene": scn, "product_list": self.product_list, 'input_mda': self.input_mda, 'resampled_scenes': {'None': resampled_scene}} - with mock.patch('trollflow2.plugins.Message') as message, mock.patch('posttroll.publisher.NoisyPublisher'): + with patched_publisher() as published_messages: self._run_publisher_on_job(job) - assert message.call_args_list[-1][0][2]['product'] == ( - 'chl_nn', 'chl_oc4me', 'trsp', 'tsm_nn', 'iop_nn', 'mask', 'latitude', 'longitude') + + assert len(published_messages) == 1 + product = ['chl_nn', 'chl_oc4me', 'trsp', 'tsm_nn', 'iop_nn', 'mask', 'latitude', 'longitude'] + assert Message(rawstr=published_messages[0]).data["product"] == product def _run_publisher_on_job(self, job, s3_paths=False): """Run a publisher on *job*.""" from trollflow2.dict_tools import plist_iter from trollflow2.plugins import FilePublisher - pub = FilePublisher() + pub = FilePublisher(nameservers=False, port=2023) product_list = self.product_list.copy() product_list['product_list']['publish_topic'] = '/static_topic' topics = create_filenames_and_topics(job) @@ -1968,7 +1978,7 @@ def test_filepublisher_kwargs_direct_instance_defaults(self): pub = FilePublisher() NoisyPublisher.assert_called_once() - assert pub.pub is NoisyPublisher.return_value.start.return_value + assert pub.pub is NoisyPublisher.return_value assert mock.call('l2processor', port=0, aliases=None, broadcast_interval=2, nameservers="", min_port=None, max_port=None) in NoisyPublisher.mock_calls Publisher.assert_not_called() @@ -2027,7 +2037,7 @@ def test_filepublisher_kwargs(self): assert mock.call('l2processor', port=40002, aliases=None, broadcast_interval=2, nameservers=['localhost'], min_port=None, max_port=None) in NoisyPublisher.mock_calls Publisher.assert_not_called() - assert fpub.pub is NoisyPublisher.return_value.start.return_value + assert fpub.pub is NoisyPublisher.return_value NoisyPublisher.assert_called_once() assert fpub.port == 40002 assert fpub.nameservers == ['localhost'] @@ -2086,9 +2096,9 @@ def test_deleting(self): 'resampled_scenes': {}} pub(job) - nb_.start.return_value.stop.assert_not_called() + nb_.stop.assert_not_called() del pub - nb_.start.return_value.stop.assert_called_once() + nb_.stop.assert_called_once() def test_stopping(self): """Test stopping the publisher.""" @@ -2106,9 +2116,9 @@ def test_stopping(self): 'resampled_scenes': {}} pub(job) - nb_.start.return_value.stop.assert_not_called() + nb_.stop.assert_not_called() pub.stop() - nb_.start.return_value.stop.assert_called_once() + nb_.stop.assert_called_once() class FakeScene(dict): @@ -2257,7 +2267,9 @@ def test_callback_move(caplog, tmp_path): def test_format_decoration(): """Test that decoration text in fmt_config is formated based on fmat.""" import datetime + from trollflow2.plugins import format_decoration + # set input data fmat = {'orig_platform_name': 'npp', 'start_time': datetime.datetime(2022, 5, 3, 12, 7, 52)} @@ -2274,7 +2286,9 @@ def test_format_decoration(): def test_format_decoration_plain_text(): """Test that decoration text is plain text if text in fmt_config does not include name of any key in fmat.""" import datetime + from trollflow2.plugins import format_decoration + # set input data. Text does not include name of any key in fmat. fmat = {'orig_platform_name': 'npp', 'start_time': datetime.datetime(2022, 5, 3, 12, 7, 52)}