diff --git a/bin/run_nwp_preparation.py b/bin/run_nwp_preparation.py index b3f0410..9dbf0c1 100644 --- a/bin/run_nwp_preparation.py +++ b/bin/run_nwp_preparation.py @@ -41,9 +41,8 @@ def prepare_and_publish(pub, options, flens): config_file_name = options.config_file - starttime = datetime.utcnow() - timedelta(days=1) + starttime = datetime.utcnow() - timedelta(days=8) ok_files, publish_topic = update_nwp(starttime, flens, config_file_name) - print(ok_files) if "publish_topic" is not None: for filename in ok_files: publish_msg = prepare_nwp_message(filename, publish_topic) diff --git a/nwcsafpps_runner/message_utils.py b/nwcsafpps_runner/message_utils.py index 34a3e02..b85a536 100644 --- a/nwcsafpps_runner/message_utils.py +++ b/nwcsafpps_runner/message_utils.py @@ -33,17 +33,16 @@ def prepare_nwp_message(result_file, publish_topic): - msg = Message(atype='file', subject=publish_topic) to_send = {} to_send["uri"] = result_file filename = os.path.basename(result_file) to_send["uid"] = filename to_send['format'] = 'NWP grib' to_send['type'] = 'grib' - return Message('/' + publish_topic + '/', "file", to_send).encode() - + + def prepare_l1c_message(result_file, mda, **kwargs): """Prepare the output message for the level-1c file creation.""" diff --git a/nwcsafpps_runner/prepare_nwp.py b/nwcsafpps_runner/prepare_nwp.py index 6e4ca28..2edb127 100644 --- a/nwcsafpps_runner/prepare_nwp.py +++ b/nwcsafpps_runner/prepare_nwp.py @@ -39,6 +39,7 @@ import logging LOG = logging.getLogger(__name__) + class NWPFileFamily(object): """Container for a nwp file family.""" @@ -48,7 +49,7 @@ def __init__(self, cfg, filename): cfg["nhsf_prefix"], cfg["nhsp_prefix"]) self.file_end = os.path.basename(filename).replace(cfg["nhsf_prefix"], "") self.tmp_filename = make_temp_filename(suffix="_" + self.file_end, dir=cfg["nwp_outdir"]) - self.tmp_result_filename= self.tmp_filename + "_tmp_result" + self.tmp_result_filename = self.tmp_filename + "_tmp_result" self.tmp_result_filename_reduced = self.tmp_filename + "_tmp_result_reduced" out_name = cfg["nwp_output_prefix"] + self.file_end self.result_file = os.path.join(cfg["nwp_outdir"], out_name) @@ -59,8 +60,7 @@ def __init__(self, cfg, filename): self.nwp_req_filename = cfg["pps_nwp_requirements"] self.cfg = cfg self.set_time_info(filename, cfg) - - + def set_time_info(self, filename, cfg): try: parser = Parser(cfg["nhsf_file_name_sift"]) @@ -83,7 +83,6 @@ def set_time_info(self, filename, cfg): 'Failed parsing forecast_step in file name. Check config and filename timestamp.') - def prepare_config(config_file_name): """Get config for NWP processing.""" LOG.debug("Prepare_nwp config file = %s", str(config_file_name)) @@ -97,12 +96,11 @@ def prepare_config(config_file_name): 'nhsf_path', 'nhsf_prefix']: if parameter not in cfg: LOG.exception('Parameter "{:s}" not set in config file: '.format(parameter)) - + if not os.path.exists(cfg['nwp_static_surface']): LOG.error("Config parameter nwp_static_surface: {:s} does not exist." "Can't prepare NWP data".format(cfg['nwp_static_surface'])) raise IOError('Failed getting static land-sea mask and topography') - return cfg @@ -140,7 +138,7 @@ def should_be_skipped(file_obj, starttime, nlengths): """Skip some files. Consider only analysis times newer than *starttime*. And consider only the forecast lead times in hours given by the list *nlengths* of integers. Never reprocess. - + """ if file_obj.analysis_time < starttime: return True @@ -156,6 +154,7 @@ def should_be_skipped(file_obj, starttime, nlengths): return True return False + def get_files_to_process(cfg): """Get all nhsf files in nhsf directory.""" filelist = glob(os.path.join(cfg["nhsf_path"], cfg["nhsf_prefix"] + "*")) @@ -165,15 +164,16 @@ def get_files_to_process(cfg): LOG.debug('NHSF NWP files found = {:s}'.format(str(filelist))) return filelist + def create_nwp_file(file_obj): """Create a new nwp file.""" - + LOG.info("Result and tmp files:\n\t {:s}\n\t {:s}\n\t {:s}\n\t {:s}".format( file_obj.result_file, file_obj.tmp_filename, file_obj.tmp_result_filename, file_obj.tmp_result_filename_reduced)) - + # probably to make sure files are not written at the moment! cmd = "grib_copy -w gridType=regular_ll {:s} {:s}".format(file_obj.nhsp_file, file_obj.tmp_filename) @@ -196,7 +196,7 @@ def create_nwp_file(file_obj): LOG.debug("os.system call took: %f seconds", _end - _start) LOG.debug("Returncode = " + str(retv)) if retv != 0: - LOG.warning("Failed generating nwp file {:} ...".format( file_obj.result_file)) + LOG.warning("Failed generating nwp file {:} ...".format(file_obj.result_file)) raise IOError("Failed merging grib data") nwp_file_ok = check_and_reduce_nwp_content(file_obj.tmp_result_filename, file_obj.tmp_result_filename_reduced, @@ -218,15 +218,14 @@ def create_nwp_file(file_obj): file_obj.result_file)) return None return file_obj.result_file - - + def update_nwp_inner(starttime, nlengths, cfg): """Prepare NWP grib files for PPS. Consider only analysis times newer than *starttime*. And consider only the forecast lead times in hours given by the list *nlengths* of integers """ - + LOG.info("Path to nhsf files: {:s}".format(cfg["nhsf_path"])) LOG.info("Path to nhsp files: {:s}".format(cfg["nhsp_path"])) LOG.info("nwp_output_prefix {:s}".format(cfg["nwp_output_prefix"])) @@ -241,7 +240,7 @@ def update_nwp_inner(starttime, nlengths, cfg): str(file_obj.forecast_step))) out_file = create_nwp_file(file_obj) remove_file(file_obj.tmp_result_filename_reduced) - remove_file(file_obj.tmp_result_filename) + remove_file(file_obj.tmp_result_filename) remove_file(file_obj.tmp_filename) if out_file is not None: ok_files.append(out_file) diff --git a/nwcsafpps_runner/tests/test_nwp_prepare.py b/nwcsafpps_runner/tests/test_nwp_prepare.py index 3f44d09..91d3781 100644 --- a/nwcsafpps_runner/tests/test_nwp_prepare.py +++ b/nwcsafpps_runner/tests/test_nwp_prepare.py @@ -30,7 +30,7 @@ from datetime import timedelta from nwcsafpps_runner.message_utils import prepare_nwp_message import nwcsafpps_runner.prepare_nwp as nwc_prep - + LOG = logging.getLogger(__name__) logging.basicConfig( format='%(levelname)s |%(asctime)s|: %(message)s', @@ -62,30 +62,32 @@ def fake_file_dir(tmp_path): cfg_file = my_temp_dir / 'pps_config.yaml' req_file = open(cfg_file, 'w') - req_file.write("pps_nwp_requirements: " + str(requirement_name) + "\n" - "nwp_outdir: " + str(my_temp_dir) + "\n" - "nhsp_path: " + "nwcsafpps_runner/tests/files/" + "\n" - "nhsf_path: " + "nwcsafpps_runner/tests/files/" + "\n" - "nhsp_prefix: " + "LL02_NHSP_" + "\n" - "nhsf_prefix: " + "LL02_NHSF_" + "\n" - "nwp_static_surface: " + str(my_temp_dir) + "/static_surface"+ "\n" - "ecmwf_prefix: " + "LL02_NHSF" + "\n" - "nwp_output_prefix: " + "PPS_ECMWF_" + "\n" - "nhsf_file_name_sift: " + "'" + '{ecmwf_prefix:9s}_{analysis_time:%Y%m%d%H%M}+{forecast_step:d}H00M' + "'" + "\n") - + req_file.write( + "pps_nwp_requirements: " + str(requirement_name) + "\n" + "nwp_outdir: " + str(my_temp_dir) + "\n" + "nhsp_path: " + "nwcsafpps_runner/tests/files/" + "\n" + "nhsf_path: " + "nwcsafpps_runner/tests/files/" + "\n" + "nhsp_prefix: " + "LL02_NHSP_" + "\n" + "nhsf_prefix: " + "LL02_NHSF_" + "\n" + "nwp_static_surface: " + str(my_temp_dir) + "/static_surface" + "\n" + "ecmwf_prefix: " + "LL02_NHSF" + "\n" + "nwp_output_prefix: " + "PPS_ECMWF_" + "\n" + "nhsf_file_name_sift: '" + '{ecmwf_prefix:9s}_{analysis_time:%Y%m%d%H%M}+{forecast_step:d}H00M' + "'" + "\n") + cfg_file = my_temp_dir / 'pps_config_missing_fields.yaml' req_file = open(cfg_file, 'w') - req_file.write("pps_nwp_requirements: " + str(requirement_name_m) + "\n" - "nwp_outdir: " + str(my_temp_dir) + "\n" - "nhsp_path: " + "nwcsafpps_runner/tests/files/" + "\n" - "nhsf_path: " + "nwcsafpps_runner/tests/files/" + "\n" - "nhsp_prefix: " + "LL02_NHSP_" + "\n" - "nhsf_prefix: " + "LL02_NHSF_" + "\n" - "nwp_static_surface: " + str(my_temp_dir) + "/static_surface"+ "\n" - "ecmwf_prefix: " + "LL02_NHSF" + "\n" - "nwp_output_prefix: " + "PPS_ECMWF_MANDATORY" + "\n" - "nhsf_file_name_sift: " + "'" + '{ecmwf_prefix:9s}_{analysis_time:%Y%m%d%H%M}+{forecast_step:d}H00M' + "'" + "\n") - + req_file.write( + "pps_nwp_requirements: " + str(requirement_name_m) + "\n" + "nwp_outdir: " + str(my_temp_dir) + "\n" + "nhsp_path: " + "nwcsafpps_runner/tests/files/" + "\n" + "nhsf_path: " + "nwcsafpps_runner/tests/files/" + "\n" + "nhsp_prefix: " + "LL02_NHSP_" + "\n" + "nhsf_prefix: " + "LL02_NHSF_" + "\n" + "nwp_static_surface: " + str(my_temp_dir) + "/static_surface" + "\n" + "ecmwf_prefix: " + "LL02_NHSF" + "\n" + "nwp_output_prefix: " + "PPS_ECMWF_MANDATORY" + "\n" + "nhsf_file_name_sift: '" + '{ecmwf_prefix:9s}_{analysis_time:%Y%m%d%H%M}+{forecast_step:d}H00M' + "'" + "\n") + return str(my_temp_dir) @@ -97,7 +99,7 @@ def test_nwp_message(self): filename = "dummy_dir/PPS_ECMWF_202205100000+009H00M" publish_msg = prepare_nwp_message(filename, "dummy_topic") expected_uri = '"uri": "dummy_dir/PPS_ECMWF_202205100000+009H00M"' - assert expected_uri in publish_msg + assert expected_uri in publish_msg class TestNWPprepareRunner: @@ -145,5 +147,3 @@ def test_remove_filename(self, fake_file_dir): assert not os.path.exists(nwp_surface_file) # Should be able to run on already removed file without raising exception remove_file(nwp_surface_file) - -