diff --git a/bin/run_nwp_preparation.py b/bin/run_nwp_preparation.py index 4c60d1b..466fd81 100644 --- a/bin/run_nwp_preparation.py +++ b/bin/run_nwp_preparation.py @@ -52,7 +52,7 @@ def prepare_and_publish(pub, options, flens): publish_msg = prepare_nwp_message(filename, publish_topic) LOG.debug("Will publish") LOG.debug("publish_msg") - publish_l1c(pub, publish_msg, publish_topic) + publish_l1c(pub, publish_msg, [publish_topic]) def _run_subscribe_publisher(pub, options, flens): diff --git a/nwcsafpps_runner/message_utils.py b/nwcsafpps_runner/message_utils.py index 157c0d7..6eeaec5 100644 --- a/nwcsafpps_runner/message_utils.py +++ b/nwcsafpps_runner/message_utils.py @@ -38,8 +38,7 @@ def prepare_nwp_message(result_file, publish_topic): to_send["uid"] = filename to_send['format'] = 'NWP grib' to_send['type'] = 'grib' - return Message('/' + publish_topic + '/', - "file", to_send).encode() + return to_send def prepare_l1c_message(result_file, mda, **kwargs): diff --git a/nwcsafpps_runner/prepare_nwp.py b/nwcsafpps_runner/prepare_nwp.py index 608b20d..1f63295 100644 --- a/nwcsafpps_runner/prepare_nwp.py +++ b/nwcsafpps_runner/prepare_nwp.py @@ -108,7 +108,7 @@ def prepare_config(config_file_name): def remove_file(filename): """Remove a temporary file.""" if os.path.exists(filename): - LOG.warning("Removing tmp file: %s.", filename) + LOG.info("Removing tmp file: %s.", filename) os.remove(filename) @@ -227,6 +227,7 @@ def update_nwp_inner(starttime, nlengths, cfg): for fname in get_files_to_process(cfg): file_obj = NWPFileFamily(cfg, fname) if should_be_skipped(file_obj, starttime, nlengths): + remove_file(file_obj.tmp_filename) continue LOG.debug("Analysis time and start time: {:s} {:s}".format(str(file_obj.analysis_time), str(starttime))) diff --git a/nwcsafpps_runner/tests/test_nwp_prepare.py b/nwcsafpps_runner/tests/test_nwp_prepare.py index e4441fa..9267bde 100644 --- a/nwcsafpps_runner/tests/test_nwp_prepare.py +++ b/nwcsafpps_runner/tests/test_nwp_prepare.py @@ -22,6 +22,7 @@ # along with this program. If not, see . """Test the nwp_prepare runner code.""" +import glob import logging import os import unittest @@ -99,8 +100,8 @@ def test_nwp_message(self): """Test the nwp message.""" filename = "dummy_dir/PPS_ECMWF_202205100000+009H00M" publish_msg = prepare_nwp_message(filename, "dummy_topic") - expected_uri = '"uri": "dummy_dir/PPS_ECMWF_202205100000+009H00M"' - assert expected_uri in publish_msg + expected_uri = "dummy_dir/PPS_ECMWF_202205100000+009H00M" + assert publish_msg["uri"] == expected_uri class TestNWPprepareRunner: @@ -116,6 +117,8 @@ def test_update_nwp(self, fake_file_dir): # Run again when file is already created nwc_prep.update_nwp(date - timedelta(days=2), [9], cfg_file) assert os.path.exists(outfile) + out_files = glob.glob(os.path.join(str(my_temp_dir), "*_202205100000+009H00M*")) + assert len(out_files) == 1 def test_update_nwp_no_requirement_file(self, fake_file_dir): """Create file no requirement file.""" @@ -127,6 +130,8 @@ def test_update_nwp_no_requirement_file(self, fake_file_dir): date = datetime(year=2022, month=5, day=10, hour=0, tzinfo=timezone.utc) nwc_prep.update_nwp(date - timedelta(days=2), [9], cfg_file) assert os.path.exists(outfile) + out_files = glob.glob(os.path.join(str(my_temp_dir), "*_202205100000+009H00M*")) + assert len(out_files) == 1 def test_update_nwp_missing_fields(self, fake_file_dir): """Test that no file without mandatory data is created.""" @@ -136,6 +141,8 @@ def test_update_nwp_missing_fields(self, fake_file_dir): date = datetime(year=2022, month=5, day=10, hour=0, tzinfo=timezone.utc) nwc_prep.update_nwp(date - timedelta(days=2), [9], cfg_file) assert not (os.path.exists(outfile)) + out_files = glob.glob(os.path.join(str(my_temp_dir), "*_202205100000+009H00M*")) + assert len(out_files) == 0 def test_remove_filename(self, fake_file_dir): """Test the function for removing files."""