Skip to content

Commit

Permalink
More subfunctions in nwp-prepare code
Browse files Browse the repository at this point in the history
  • Loading branch information
Nina.Hakansson committed Apr 5, 2024
1 parent 130bf2e commit c926143
Showing 1 changed file with 162 additions and 156 deletions.
318 changes: 162 additions & 156 deletions nwcsafpps_runner/prepare_nwp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,71 @@
import logging
LOG = logging.getLogger(__name__)

class NWPFileFamily(object):
"""Container for a nwp file family."""

def __init__(self, cfg, filename):
self.nhsf_file = filename
self.nhsp_file = filename.replace(cfg["nhsf_path"], cfg["nhsp_path"]).replace(
cfg["nhsf_prefix"], cfg["nhsp_prefix"])
self.file_end = os.path.basename(filename).replace(cfg["nhsf_prefix"], "")
self.tmp_filename = make_temp_filename(suffix="_" + self.file_end, dir=cfg["nwp_outdir"])
self.tmp_result_filename= self.tmp_filename + "_tmp_result"
self.tmp_result_filename_reduced = self.tmp_filename + "_tmp_result_reduced"
out_name = cfg["nwp_output_prefix"] + self.file_end
self.result_file = os.path.join(cfg["nwp_outdir"], out_name)
self.forecast_step = None
self.analysis_time = None
self.timestamp = None
self.nwp_lsmz_filename = cfg["nwp_static_surface"]
self.nwp_req_filename = cfg["pps_nwp_requirements"]
self.cfg = cfg
self.set_time_info(filename, cfg)


def set_time_info(self, filename, cfg):
try:
parser = Parser(cfg["nhsf_file_name_sift"])
except NoOptionError as noe:
LOG.error("NoOptionError {}".format(noe))
if not parser.validate(os.path.basename(self.nhsf_file)):
LOG.error("Parser validate on filename: {} failed.".format(self.nhsf_file))
res = parser.parse("{}".format(os.path.basename(self.nhsf_file)))
if 'analysis_time' in res:
if res['analysis_time'].year == 1900:
res['analysis_time'] = res['analysis_time'].replace(year=datetime.utcnow().year)
self.analysis_time = res['analysis_time']
self.timestamp = self.analysis_time.strftime("%Y%m%d%H%M")
else:
raise NwpPrepareError("Can not parse analysis_time in file name. Check config and filename timestamp")
if 'forecast_step' in res:
self.forecast_step = res['forecast_step']
else:
raise NwpPrepareError(
'Failed parsing forecast_step in file name. Check config and filename timestamp.')



def prepare_config(config_file_name):
"""Get config for NWP processing."""
LOG.debug("Prepare_nwp config file = %s", str(config_file_name))

OPTIONS = load_config_from_file(config_file_name)
cfg = load_config_from_file(config_file_name)

for parameter in ['nhsp_path', 'nhsp_prefix',
'nhsf_file_name_sift',
'pps_nwp_requirements',
'nwp_output_prefix',
'nhsf_path', 'nhsf_prefix']:
if parameter not in OPTIONS:
if parameter not in cfg:
LOG.exception('Parameter "{:s}" not set in config file: '.format(parameter))
return OPTIONS

if not os.path.exists(cfg['nwp_static_surface']):
LOG.error("Config parameter nwp_static_surface: {:s} does not exist."
"Can't prepare NWP data".format(cfg['nwp_static_surface']))
raise IOError('Failed getting static land-sea mask and topography')

return cfg


def logreader(stream, log_func):
Expand All @@ -67,8 +120,6 @@ def remove_file(filename):
if os.path.exists(filename):
LOG.warning("Removing tmp file: %s.", filename)
os.remove(filename)
else:
LOG.warning("tmp file %s gone! Cannot remove it...", filename)


def make_temp_filename(*args, **kwargs):
Expand All @@ -81,162 +132,120 @@ def make_temp_filename(*args, **kwargs):
def update_nwp(starttime, nlengths, config_file_name):
"""Get config options and then prepare nwp."""
LOG.info("Path to prepare_nwp config file = %s", config_file_name)
OPTIONS = prepare_config(config_file_name)
update_nwp_inner(starttime, nlengths, OPTIONS)
cfg = prepare_config(config_file_name)
return update_nwp_inner(starttime, nlengths, cfg)


def update_nwp_inner(starttime, nlengths, OPTIONS):
"""Prepare NWP grib files for PPS. Consider only analysis times newer than
def should_be_skipped(file_obj, starttime, nlengths):
"""Skip some files. Consider only analysis times newer than
*starttime*. And consider only the forecast lead times in hours given by
the list *nlengths* of integers
the list *nlengths* of integers. Never reprocess.
"""
nhsf_file_name_sift = OPTIONS.get('nhsf_file_name_sift')
nhsf_path = OPTIONS.get('nhsf_path', None)
nhsf_prefix = OPTIONS.get('nhsf_prefix', None)
nhsp_path = OPTIONS.get('nhsp_path', None)
nhsp_prefix = OPTIONS.get('nhsp_prefix', None)
nwp_outdir = OPTIONS.get('nwp_outdir', None)
nwp_lsmz_filename = OPTIONS.get('nwp_static_surface', None)
nwp_output_prefix = OPTIONS.get('nwp_output_prefix', None)
nwp_req_filename = OPTIONS.get('pps_nwp_requirements', None)

LOG.info("Path to nhsf files: %s", nhsf_path)
LOG.info("Path to nhsp files: %s", nhsp_path)
LOG.info("nwp_output_prefix %s", OPTIONS["nwp_output_prefix"])

filelist = glob(os.path.join(nhsf_path, nhsf_prefix + "*"))
if file_obj.analysis_time < starttime:
return True
if file_obj.forecast_step not in nlengths:
LOG.debug("Skip step. Forecast step and nlengths: {:s} {:s}".format(
str(file_obj.forecast_step), str(nlengths)))
return True
if not os.path.exists(file_obj.nhsp_file):
LOG.warning("Corresponding nhsp-file not there: {:s}".format(file_obj.nhsp_file))
return True
if os.path.exists(file_obj.result_file):
LOG.info("File: {:s} already there...".format(file_obj.result_file))
return True
return False

def get_files_to_process(cfg):
"""Get all nhsf files in nhsf directory."""
filelist = glob(os.path.join(cfg["nhsf_path"], cfg["nhsf_prefix"] + "*"))
if len(filelist) == 0:
LOG.info("No input files! dir = %s", str(nhsf_path))
return

LOG.debug('NHSF NWP files found = %s', str(filelist))
nfiles_error = 0
for filename in filelist:
if nhsf_file_name_sift is None:
raise NwpPrepareError()

try:
parser = Parser(nhsf_file_name_sift)
except NoOptionError as noe:
LOG.error("NoOptionError {}".format(noe))
continue
if not parser.validate(os.path.basename(filename)):
LOG.error("Parser validate on filename: {} failed.".format(filename))
continue

res = parser.parse("{}".format(os.path.basename(filename)))
if 'analysis_time' in res:
if res['analysis_time'].year == 1900:
res['analysis_time'] = res['analysis_time'].replace(year=datetime.utcnow().year)

analysis_time = res['analysis_time']
timestamp = analysis_time.strftime("%Y%m%d%H%M")
else:
raise NwpPrepareError("Can not parse analysis_time in file name. Check config and filename timestamp")

if 'forecast_time' in res:
if res['forecast_time'].year == 1900:
res['forecast_time'] = res['forecast_time'].replace(year=datetime.utcnow().year)
forecast_time = res['forecast_time']
forecast_step = forecast_time - analysis_time
forecast_step = "{:03d}H{:02d}M".format(forecast_step.days * 24 + forecast_step.seconds / 3600, 0)
timeinfo = "{:s}{:s}{:s}".format(analysis_time.strftime(
"%m%d%H%M"), forecast_time.strftime("%m%d%H%M"), res['end'])
else:
# This needs to be done more solid using the sift pattern! FIXME!
timeinfo = filename.rsplit("_", 1)[-1]
# Forecast step in hours:
if 'forecast_step' in res:
forecast_step = res['forecast_step']
else:
raise NwpPrepareError(
'Failed parsing forecast_step in file name. Check config and filename timestamp.')

if analysis_time < starttime:
continue
if forecast_step not in nlengths:
LOG.debug("Skip step. Forecast step and nlengths: %s %s", str(forecast_step), str(nlengths))
continue

LOG.debug("Analysis time and start time: %s %s", str(analysis_time), str(starttime))
LOG.info("timestamp, step: %s %s", str(timestamp), str(forecast_step))
result_file = os.path.join(
nwp_outdir, nwp_output_prefix + timestamp + "+" + '%.3dH00M' % forecast_step)
if os.path.exists(result_file):
LOG.info("File: " + str(result_file) + " already there...")
continue
LOG.info("No input files! dir = {:s}".format(cfg["nhsf_path"]))
return []
LOG.debug('NHSF NWP files found = {:s}'.format(str(filelist)))
return filelist

def create_nwp_file(file_obj):
"""Create a new nwp file."""

LOG.info("Result and tmp files:\n\t {:s}\n\t {:s}\n\t {:s}\n\t {:s}".format(
file_obj.result_file,
file_obj.tmp_filename,
file_obj.tmp_result_filename,
file_obj.tmp_result_filename_reduced))

# probably to make sure files are not written at the moment!
cmd = "grib_copy -w gridType=regular_ll {:s} {:s}".format(file_obj.nhsp_file,
file_obj.tmp_filename)
retv = run_command(cmd)
LOG.debug("Returncode = " + str(retv))
if retv != 0:
LOG.error(
"Failed doing the grib-copy! Will continue with the next file")
return None

tmp_filename = make_temp_filename(suffix="_" + timestamp + "+" +
'%.3dH00M' % forecast_step, dir=nwp_outdir)
cmd = "cat {:s} {:s} {:s} > {:s}".format(file_obj.tmp_filename,
file_obj.nhsf_file,
file_obj.nwp_lsmz_filename,
file_obj.tmp_result_filename)
LOG.debug("Merge data and add topography and land-sea mask:")
LOG.debug("Command = " + str(cmd))
_start = time.time()
retv = os.system(cmd)
_end = time.time()
LOG.debug("os.system call took: %f seconds", _end - _start)
LOG.debug("Returncode = " + str(retv))
if retv != 0:
LOG.warning("Failed generating nwp file {:} ...".format( file_obj.result_file))
raise IOError("Failed merging grib data")
nwp_file_ok = check_and_reduce_nwp_content(file_obj.tmp_result_filename,
file_obj.tmp_result_filename_reduced,
file_obj.nwp_req_filename)

if nwp_file_ok is None:
LOG.info('NWP file content could not be checked, use anyway.')
os.rename(file_obj.tmp_result_filename, file_obj.result_file)
LOG.debug("Renamed file {:s} to {:s}".format(file_obj.tmp_result_filename,
file_obj.result_file))
elif nwp_file_ok:
os.rename(file_obj.tmp_result_filename_reduced, file_obj.result_file)
LOG.debug("Renamed file {:s} to {:s}".format(file_obj.tmp_result_filename_reduced,
file_obj.result_file))
LOG.info('NWP file with reduced content has been created: {:s}'.format(
file_obj.result_file))
else:
LOG.warning("Missing important fields. No nwp file ({:s}) created".format(
result_file.result_file))
return None
return file_obj.result_file


LOG.info("result and tmp files: " + str(result_file) + " " + str(tmp_filename))
nhsp_file = os.path.join(nhsp_path, nhsp_prefix + timeinfo)
if not os.path.exists(nhsp_file):
LOG.warning("Corresponding nhsp-file not there: " + str(nhsp_file))

def update_nwp_inner(starttime, nlengths, cfg):
"""Prepare NWP grib files for PPS. Consider only analysis times newer than
*starttime*. And consider only the forecast lead times in hours given by
the list *nlengths* of integers
"""

LOG.info("Path to nhsf files: {:s}".format(cfg["nhsf_path"]))
LOG.info("Path to nhsp files: {:s}".format(cfg["nhsp_path"]))
LOG.info("nwp_output_prefix {:s}".format(cfg["nwp_output_prefix"]))
ok_files = []
for fname in get_files_to_process(cfg):
file_obj = NWPFileFamily(cfg, fname)
if should_be_skipped(file_obj, starttime, nlengths):
continue

cmd = ("grib_copy -w gridType=regular_ll " + nhsp_file + " " + tmp_filename)
retv = run_command(cmd)
LOG.debug("Returncode = " + str(retv))
if retv != 0:
LOG.error(
"Failed doing the grib-copy! Will continue with the next file")
nfiles_error = nfiles_error + 1
if nfiles_error > len(filelist) / 2:
LOG.error(
"More than half of the Grib files failed upon grib_copy!")
raise IOError('Failed running grib_copy on many Grib files')

if not os.path.exists(nwp_lsmz_filename):
LOG.error("No static grib file with land-sea mask and " +
"topography available. Can't prepare NWP data")
raise IOError('Failed getting static land-sea mask and topography')

tmp_result_filename = tmp_filename + "_tmp_result"
tmp_result_filename_reduced = tmp_result_filename + '_reduced'
cmd = ('cat ' + tmp_filename + " " +
os.path.join(nhsf_path, nhsf_prefix + timeinfo) +
" " + nwp_lsmz_filename + " > " + tmp_result_filename)
LOG.debug("Add topography and land-sea mask to data:")
LOG.debug("Command = " + str(cmd))
_start = time.time()
retv = os.system(cmd)
_end = time.time()
LOG.debug("os.system call took: %f seconds", _end - _start)
LOG.debug("Returncode = " + str(retv))
if retv != 0:
LOG.warning("Failed generating nwp file %s ...", result_file)
remove_file(tmp_result_filename)
raise IOError("Failed adding topography and land-sea " +
"mask data to grib file")
remove_file(tmp_filename)

nwp_file_ok = check_and_reduce_nwp_content(tmp_result_filename,
tmp_result_filename_reduced,
nwp_req_filename)

if nwp_file_ok is None:
LOG.info('NWP file content could not be checked, use anyway.')
_start = time.time()
os.rename(tmp_result_filename, result_file)
_end = time.time()
LOG.debug("Rename file %s to %s: This took %f seconds",
tmp_result_filename, result_file, _end - _start)
elif nwp_file_ok:
remove_file(tmp_result_filename)
_start = time.time()
os.rename(tmp_result_filename_reduced, result_file)
_end = time.time()
LOG.debug("Rename file %s to %s: This took %f seconds",
tmp_result_filename_reduced, result_file, _end - _start)
LOG.info('NWP file with reduced content has been created: %s',
result_file)
else:
LOG.warning("Missing important fields. No nwp file %s written to disk",
result_file)
remove_file(tmp_result_filename)
return
LOG.debug("Analysis time and start time: {:s} {:s}".format(str(file_obj.analysis_time),
str(starttime)))
LOG.info("timestamp, step: {:s} {:s}".format(file_obj.timestamp,
str(file_obj.forecast_step)))
out_file = create_nwp_file(file_obj)
remove_file(file_obj.tmp_result_filename_reduced)
remove_file(file_obj.tmp_result_filename)
remove_file(file_obj.tmp_filename)
if out_file is not None:
ok_files.append(out_file)
return ok_files, cfg.get("publish_topic", None)


def get_mandatory_and_all_fields(lines):
Expand Down Expand Up @@ -278,8 +287,6 @@ def check_nwp_requirement(grb_entries, mandatory_fields, result_file):
for item in mandatory_fields:
if item not in grb_entries:
LOG.warning("Mandatory field missing in NWP file: %s", str(item))
if os.path.exists(result_file):
os.remove(result_file)
return False
LOG.info("NWP file has all required fields for PPS: %s", result_file)
return True
Expand Down Expand Up @@ -308,7 +315,6 @@ def check_and_reduce_nwp_content(gribfile, result_file, nwp_req_filename):
msg = grb.tostring()
grbout.write(msg)
grbout.close()

LOG.info("Check fields in file: %s", result_file)
return check_nwp_requirement(grb_entries, mandatory_fields, result_file)

Expand Down

0 comments on commit c926143

Please sign in to comment.