Skip to content

Commit

Permalink
flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
Nina.Hakansson committed Apr 9, 2024
1 parent 2d2bb56 commit fe4f759
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 45 deletions.
3 changes: 1 addition & 2 deletions bin/run_nwp_preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@

def prepare_and_publish(pub, options, flens):
config_file_name = options.config_file
starttime = datetime.utcnow() - timedelta(days=1)
starttime = datetime.utcnow() - timedelta(days=8)
ok_files, publish_topic = update_nwp(starttime, flens, config_file_name)
print(ok_files)
if "publish_topic" is not None:
for filename in ok_files:
publish_msg = prepare_nwp_message(filename, publish_topic)
Expand Down
5 changes: 2 additions & 3 deletions nwcsafpps_runner/message_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@


def prepare_nwp_message(result_file, publish_topic):
msg = Message(atype='file', subject=publish_topic)
to_send = {}
to_send["uri"] = result_file
filename = os.path.basename(result_file)
to_send["uid"] = filename
to_send['format'] = 'NWP grib'
to_send['type'] = 'grib'

return Message('/' + publish_topic + '/',
"file", to_send).encode()



def prepare_l1c_message(result_file, mda, **kwargs):
"""Prepare the output message for the level-1c file creation."""

Expand Down
27 changes: 13 additions & 14 deletions nwcsafpps_runner/prepare_nwp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import logging
LOG = logging.getLogger(__name__)


class NWPFileFamily(object):
"""Container for a nwp file family."""

Expand All @@ -48,7 +49,7 @@ def __init__(self, cfg, filename):
cfg["nhsf_prefix"], cfg["nhsp_prefix"])
self.file_end = os.path.basename(filename).replace(cfg["nhsf_prefix"], "")
self.tmp_filename = make_temp_filename(suffix="_" + self.file_end, dir=cfg["nwp_outdir"])
self.tmp_result_filename= self.tmp_filename + "_tmp_result"
self.tmp_result_filename = self.tmp_filename + "_tmp_result"
self.tmp_result_filename_reduced = self.tmp_filename + "_tmp_result_reduced"
out_name = cfg["nwp_output_prefix"] + self.file_end
self.result_file = os.path.join(cfg["nwp_outdir"], out_name)
Expand All @@ -59,8 +60,7 @@ def __init__(self, cfg, filename):
self.nwp_req_filename = cfg["pps_nwp_requirements"]
self.cfg = cfg
self.set_time_info(filename, cfg)



def set_time_info(self, filename, cfg):
try:
parser = Parser(cfg["nhsf_file_name_sift"])
Expand All @@ -83,7 +83,6 @@ def set_time_info(self, filename, cfg):
'Failed parsing forecast_step in file name. Check config and filename timestamp.')



def prepare_config(config_file_name):
"""Get config for NWP processing."""
LOG.debug("Prepare_nwp config file = %s", str(config_file_name))
Expand All @@ -97,12 +96,11 @@ def prepare_config(config_file_name):
'nhsf_path', 'nhsf_prefix']:
if parameter not in cfg:
LOG.exception('Parameter "{:s}" not set in config file: '.format(parameter))

if not os.path.exists(cfg['nwp_static_surface']):
LOG.error("Config parameter nwp_static_surface: {:s} does not exist."
"Can't prepare NWP data".format(cfg['nwp_static_surface']))
raise IOError('Failed getting static land-sea mask and topography')

return cfg


Expand Down Expand Up @@ -140,7 +138,7 @@ def should_be_skipped(file_obj, starttime, nlengths):
"""Skip some files. Consider only analysis times newer than
*starttime*. And consider only the forecast lead times in hours given by
the list *nlengths* of integers. Never reprocess.
"""
if file_obj.analysis_time < starttime:
return True
Expand All @@ -156,6 +154,7 @@ def should_be_skipped(file_obj, starttime, nlengths):
return True
return False


def get_files_to_process(cfg):
"""Get all nhsf files in nhsf directory."""
filelist = glob(os.path.join(cfg["nhsf_path"], cfg["nhsf_prefix"] + "*"))
Expand All @@ -165,15 +164,16 @@ def get_files_to_process(cfg):
LOG.debug('NHSF NWP files found = {:s}'.format(str(filelist)))
return filelist


def create_nwp_file(file_obj):
"""Create a new nwp file."""

LOG.info("Result and tmp files:\n\t {:s}\n\t {:s}\n\t {:s}\n\t {:s}".format(
file_obj.result_file,
file_obj.tmp_filename,
file_obj.tmp_result_filename,
file_obj.tmp_result_filename_reduced))

# probably to make sure files are not written at the moment!
cmd = "grib_copy -w gridType=regular_ll {:s} {:s}".format(file_obj.nhsp_file,
file_obj.tmp_filename)
Expand All @@ -196,7 +196,7 @@ def create_nwp_file(file_obj):
LOG.debug("os.system call took: %f seconds", _end - _start)
LOG.debug("Returncode = " + str(retv))
if retv != 0:
LOG.warning("Failed generating nwp file {:} ...".format( file_obj.result_file))
LOG.warning("Failed generating nwp file {:} ...".format(file_obj.result_file))
raise IOError("Failed merging grib data")
nwp_file_ok = check_and_reduce_nwp_content(file_obj.tmp_result_filename,
file_obj.tmp_result_filename_reduced,
Expand All @@ -218,15 +218,14 @@ def create_nwp_file(file_obj):
file_obj.result_file))
return None
return file_obj.result_file



def update_nwp_inner(starttime, nlengths, cfg):
"""Prepare NWP grib files for PPS. Consider only analysis times newer than
*starttime*. And consider only the forecast lead times in hours given by
the list *nlengths* of integers
"""

LOG.info("Path to nhsf files: {:s}".format(cfg["nhsf_path"]))
LOG.info("Path to nhsp files: {:s}".format(cfg["nhsp_path"]))
LOG.info("nwp_output_prefix {:s}".format(cfg["nwp_output_prefix"]))
Expand All @@ -241,7 +240,7 @@ def update_nwp_inner(starttime, nlengths, cfg):
str(file_obj.forecast_step)))
out_file = create_nwp_file(file_obj)
remove_file(file_obj.tmp_result_filename_reduced)
remove_file(file_obj.tmp_result_filename)
remove_file(file_obj.tmp_result_filename)
remove_file(file_obj.tmp_filename)
if out_file is not None:
ok_files.append(out_file)
Expand Down
52 changes: 26 additions & 26 deletions nwcsafpps_runner/tests/test_nwp_prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from datetime import timedelta
from nwcsafpps_runner.message_utils import prepare_nwp_message
import nwcsafpps_runner.prepare_nwp as nwc_prep

LOG = logging.getLogger(__name__)
logging.basicConfig(
format='%(levelname)s |%(asctime)s|: %(message)s',
Expand Down Expand Up @@ -62,30 +62,32 @@ def fake_file_dir(tmp_path):

cfg_file = my_temp_dir / 'pps_config.yaml'
req_file = open(cfg_file, 'w')
req_file.write("pps_nwp_requirements: " + str(requirement_name) + "\n"
"nwp_outdir: " + str(my_temp_dir) + "\n"
"nhsp_path: " + "nwcsafpps_runner/tests/files/" + "\n"
"nhsf_path: " + "nwcsafpps_runner/tests/files/" + "\n"
"nhsp_prefix: " + "LL02_NHSP_" + "\n"
"nhsf_prefix: " + "LL02_NHSF_" + "\n"
"nwp_static_surface: " + str(my_temp_dir) + "/static_surface"+ "\n"
"ecmwf_prefix: " + "LL02_NHSF" + "\n"
"nwp_output_prefix: " + "PPS_ECMWF_" + "\n"
"nhsf_file_name_sift: " + "'" + '{ecmwf_prefix:9s}_{analysis_time:%Y%m%d%H%M}+{forecast_step:d}H00M' + "'" + "\n")

req_file.write(
"pps_nwp_requirements: " + str(requirement_name) + "\n"
"nwp_outdir: " + str(my_temp_dir) + "\n"
"nhsp_path: " + "nwcsafpps_runner/tests/files/" + "\n"
"nhsf_path: " + "nwcsafpps_runner/tests/files/" + "\n"
"nhsp_prefix: " + "LL02_NHSP_" + "\n"
"nhsf_prefix: " + "LL02_NHSF_" + "\n"
"nwp_static_surface: " + str(my_temp_dir) + "/static_surface" + "\n"
"ecmwf_prefix: " + "LL02_NHSF" + "\n"
"nwp_output_prefix: " + "PPS_ECMWF_" + "\n"
"nhsf_file_name_sift: '" + '{ecmwf_prefix:9s}_{analysis_time:%Y%m%d%H%M}+{forecast_step:d}H00M' + "'" + "\n")

cfg_file = my_temp_dir / 'pps_config_missing_fields.yaml'
req_file = open(cfg_file, 'w')
req_file.write("pps_nwp_requirements: " + str(requirement_name_m) + "\n"
"nwp_outdir: " + str(my_temp_dir) + "\n"
"nhsp_path: " + "nwcsafpps_runner/tests/files/" + "\n"
"nhsf_path: " + "nwcsafpps_runner/tests/files/" + "\n"
"nhsp_prefix: " + "LL02_NHSP_" + "\n"
"nhsf_prefix: " + "LL02_NHSF_" + "\n"
"nwp_static_surface: " + str(my_temp_dir) + "/static_surface"+ "\n"
"ecmwf_prefix: " + "LL02_NHSF" + "\n"
"nwp_output_prefix: " + "PPS_ECMWF_MANDATORY" + "\n"
"nhsf_file_name_sift: " + "'" + '{ecmwf_prefix:9s}_{analysis_time:%Y%m%d%H%M}+{forecast_step:d}H00M' + "'" + "\n")

req_file.write(
"pps_nwp_requirements: " + str(requirement_name_m) + "\n"
"nwp_outdir: " + str(my_temp_dir) + "\n"
"nhsp_path: " + "nwcsafpps_runner/tests/files/" + "\n"
"nhsf_path: " + "nwcsafpps_runner/tests/files/" + "\n"
"nhsp_prefix: " + "LL02_NHSP_" + "\n"
"nhsf_prefix: " + "LL02_NHSF_" + "\n"
"nwp_static_surface: " + str(my_temp_dir) + "/static_surface" + "\n"
"ecmwf_prefix: " + "LL02_NHSF" + "\n"
"nwp_output_prefix: " + "PPS_ECMWF_MANDATORY" + "\n"
"nhsf_file_name_sift: '" + '{ecmwf_prefix:9s}_{analysis_time:%Y%m%d%H%M}+{forecast_step:d}H00M' + "'" + "\n")

return str(my_temp_dir)


Expand All @@ -97,7 +99,7 @@ def test_nwp_message(self):
filename = "dummy_dir/PPS_ECMWF_202205100000+009H00M"
publish_msg = prepare_nwp_message(filename, "dummy_topic")
expected_uri = '"uri": "dummy_dir/PPS_ECMWF_202205100000+009H00M"'
assert expected_uri in publish_msg
assert expected_uri in publish_msg


class TestNWPprepareRunner:
Expand Down Expand Up @@ -145,5 +147,3 @@ def test_remove_filename(self, fake_file_dir):
assert not os.path.exists(nwp_surface_file)
# Should be able to run on already removed file without raising exception
remove_file(nwp_surface_file)


0 comments on commit fe4f759

Please sign in to comment.