Skip to content

Commit

Permalink
Add publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
Nina.Hakansson committed Apr 5, 2024
1 parent c926143 commit 5354d27
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
32 changes: 23 additions & 9 deletions bin/run_nwp_preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,32 @@
from nwcsafpps_runner.logger import setup_logging
from nwcsafpps_runner.prepare_nwp import update_nwp
from nwcsafpps_runner.utils import NwpPrepareError
from posttroll.publisher import Publish
from nwcsafpps_runner.message_utils import publish_l1c, prepare_nwp_message

NWP_FLENS = [6, 9, 12, 15, 18, 21, 24]

LOG = logging.getLogger('nwp-preparation')

# datetime.datetime.utcnow => datetime.datetime.now(datetime.UTC) ~python 3.12

def prepare_nwp4pps(options, flens):
"""Prepare NWP data for pps."""

def prepare_and_publish(pub, options, flens):
config_file_name = options.config_file
every_hour_minute = options.run_every_hour_at_minute
starttime = datetime.utcnow() - timedelta(days=1)
ok_files, publish_topic = update_nwp(starttime, flens, config_file_name)
print(ok_files)
if "publish_topic" is not None:
for filename in ok_files:
publish_msg = prepare_nwp_message(filename, publish_topic)
LOG.debug("Will publish")
LOG.debug("publish_msg")
publish_l1c(pub, publish_msg, publish_topic)

def _run_subscribe_publisher(pub, options, flens):
"""Prepare NWP data for pps."""
every_hour_minute = options.run_every_hour_at_minute
LOG.info("Preparing nwp for PPS")
update_nwp(starttime, flens, config_file_name)
prepare_and_publish(pub, options, flens)
if every_hour_minute > 60:
return
while True:
Expand All @@ -53,16 +64,19 @@ def prepare_nwp4pps(options, flens):
LOG.info("Not time for nwp preparation for pps yet, waiting 5 minutes")
time.sleep(60 * 5)
else:
starttime = datetime.utcnow() - timedelta(days=1)
LOG.info("Preparing nwp for PPS")
try:
update_nwp(starttime, flens, config_file_name)
prepare_and_publish(pub, options, flens)
except (NwpPrepareError, IOError):
LOG.exception("Something went wrong in update_nwp...")
raise
LOG.info("Ready with nwp preparation for pps, waiting 45 minutes")
time.sleep(45 * 60)


def prepare_nwp4pps_runner(options, flens):
"""Start runner for nwp data preparations."""
with Publish("pps-nwp-preparation-runner", 0) as pub:
_run_subscribe_publisher(pub, options, flens)

def get_arguments():
"""Get command line arguments."""
Expand Down Expand Up @@ -99,4 +113,4 @@ def get_arguments():

options = get_arguments()
setup_logging(options)
prepare_nwp4pps(options, NWP_FLENS)
prepare_nwp4pps_runner(options, NWP_FLENS)
12 changes: 12 additions & 0 deletions nwcsafpps_runner/message_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@
LOG = logging.getLogger(__name__)


def prepare_nwp_message(result_file, publish_topic):
msg = Message(atype='file', subject=publish_topic)
to_send = {}
to_send["uri"] = result_file
filename = os.path.basename(result_file)
to_send["uid"] = filename
to_send['format'] = 'NWP grib'
to_send['type'] = 'grib'

Check warning on line 42 in nwcsafpps_runner/message_utils.py

View check run for this annotation

Codecov / codecov/patch

nwcsafpps_runner/message_utils.py#L36-L42

Added lines #L36 - L42 were not covered by tests

return Message('/' + publish_topic + '/',

Check warning on line 44 in nwcsafpps_runner/message_utils.py

View check run for this annotation

Codecov / codecov/patch

nwcsafpps_runner/message_utils.py#L44

Added line #L44 was not covered by tests
"file", to_send).encode()

def prepare_l1c_message(result_file, mda, **kwargs):
"""Prepare the output message for the level-1c file creation."""

Expand Down

0 comments on commit 5354d27

Please sign in to comment.