diff --git a/bin/run_nwp_preparation.py b/bin/run_nwp_preparation.py index 72bfaec..b3f0410 100644 --- a/bin/run_nwp_preparation.py +++ b/bin/run_nwp_preparation.py @@ -30,6 +30,8 @@ from nwcsafpps_runner.logger import setup_logging from nwcsafpps_runner.prepare_nwp import update_nwp from nwcsafpps_runner.utils import NwpPrepareError +from posttroll.publisher import Publish +from nwcsafpps_runner.message_utils import publish_l1c, prepare_nwp_message NWP_FLENS = [6, 9, 12, 15, 18, 21, 24] @@ -37,14 +39,23 @@ # datetime.datetime.utcnow => datetime.datetime.now(datetime.UTC) ~python 3.12 -def prepare_nwp4pps(options, flens): - """Prepare NWP data for pps.""" - +def prepare_and_publish(pub, options, flens): config_file_name = options.config_file - every_hour_minute = options.run_every_hour_at_minute starttime = datetime.utcnow() - timedelta(days=1) + ok_files, publish_topic = update_nwp(starttime, flens, config_file_name) + print(ok_files) + if "publish_topic" is not None: + for filename in ok_files: + publish_msg = prepare_nwp_message(filename, publish_topic) + LOG.debug("Will publish") + LOG.debug("publish_msg") + publish_l1c(pub, publish_msg, publish_topic) + +def _run_subscribe_publisher(pub, options, flens): + """Prepare NWP data for pps.""" + every_hour_minute = options.run_every_hour_at_minute LOG.info("Preparing nwp for PPS") - update_nwp(starttime, flens, config_file_name) + prepare_and_publish(pub, options, flens) if every_hour_minute > 60: return while True: @@ -53,16 +64,19 @@ def prepare_nwp4pps(options, flens): LOG.info("Not time for nwp preparation for pps yet, waiting 5 minutes") time.sleep(60 * 5) else: - starttime = datetime.utcnow() - timedelta(days=1) LOG.info("Preparing nwp for PPS") try: - update_nwp(starttime, flens, config_file_name) + prepare_and_publish(pub, options, flens) except (NwpPrepareError, IOError): LOG.exception("Something went wrong in update_nwp...") raise LOG.info("Ready with nwp preparation for pps, waiting 45 minutes") time.sleep(45 * 60) - + +def prepare_nwp4pps_runner(options, flens): + """Start runner for nwp data preparations.""" + with Publish("pps-nwp-preparation-runner", 0) as pub: + _run_subscribe_publisher(pub, options, flens) def get_arguments(): """Get command line arguments.""" @@ -99,4 +113,4 @@ def get_arguments(): options = get_arguments() setup_logging(options) - prepare_nwp4pps(options, NWP_FLENS) + prepare_nwp4pps_runner(options, NWP_FLENS) diff --git a/nwcsafpps_runner/message_utils.py b/nwcsafpps_runner/message_utils.py index cdf63cb..34a3e02 100644 --- a/nwcsafpps_runner/message_utils.py +++ b/nwcsafpps_runner/message_utils.py @@ -32,6 +32,18 @@ LOG = logging.getLogger(__name__) +def prepare_nwp_message(result_file, publish_topic): + msg = Message(atype='file', subject=publish_topic) + to_send = {} + to_send["uri"] = result_file + filename = os.path.basename(result_file) + to_send["uid"] = filename + to_send['format'] = 'NWP grib' + to_send['type'] = 'grib' + + return Message('/' + publish_topic + '/', + "file", to_send).encode() + def prepare_l1c_message(result_file, mda, **kwargs): """Prepare the output message for the level-1c file creation."""