Skip to content

Commit

Permalink
Automate creation of day and month averaged biology, chemistry & phys…
Browse files Browse the repository at this point in the history
…ics datasets (#242)

* Automate creation of day averaged datasets

Add launch of make_averaged_dataset work for var_groups="biology", "chemistry",
"physics" for "nowcast-green" after completion of results download. Also added
test cases for the functionality.

* Remove make_averaged_dataset from hindcast log

make_averaged_dataset is included in nowcast automation now, so it is included
in the nowcast.log files.

* Expand make_averaged_dataset success/failure msgs

The success and failure message types for 'make_averaged_dataset' have been made
more detailed. These now include information on time interval and variable group,
enhancing the granularity and precision of the logs. This revision impacts both
how messages are registered and how they are handled in the
'make_averaged_dataset' function and corresponding tests.

* Improve make_averaged_dataset checklist element

Move run date from checklist dict key to be an item in the dict. This allows
next_worker functions to more easily access the run date that the averaged
dataset is from.

* Automate creation of month-averaged datasets

Add launch of make_averaged_dataset worker for month-averaged datasets for
var_groups="biology", "chemistry", "physics" for "nowcast-green" after
completion of day-averaged dataset creation on the last day of the month. Also
added test cases for the functionality.

* Remove "host_name" arg from make_averaged_dataset worker

The "host_name" argument in the "make_averaged_dataset" worker was removed as
it's unnecessary. Tests and calls to NextWorker were updated accordingly to
reflect this change.
  • Loading branch information
douglatornell authored Mar 7, 2024
1 parent da0b0ea commit ebbb307
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 50 deletions.
21 changes: 12 additions & 9 deletions config/nowcast.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1115,13 +1115,6 @@ logging:
handlers:
- hindcast_info
- hindcast_debug
make_averaged_dataset:
qualname: make_averaged_dataset
level: DEBUG
propagate: False
handlers:
- hindcast_info
- hindcast_debug
reshapr:
qualname: reshapr
level: DEBUG
Expand Down Expand Up @@ -1586,8 +1579,18 @@ message registry:

make_averaged_dataset:
checklist key: averaged dataset
success: dataset averaged
failure: dataset averaging failed
success day biology: biology dataset day-averaged
failure day biology: biology dataset day-averaging failed
success day chemistry: chemistry dataset day-averaged
failure day chemistry: chemistry dataset day-averaging failed
success day physics: physics dataset day-averaged
failure day physics: physics dataset day-averaging failed
success month biology: biology dataset month-averaged
failure month biology: biology dataset month-averaging failed
success month chemistry: chemistry dataset month-averaged
failure month chemistry: chemistry dataset month-averaging failed
success month physics: physics dataset month-averaged
failure month physics: physics dataset month-averaging failed
crash: make_averaged_dataset worker crashed

archive_tarball:
Expand Down
38 changes: 36 additions & 2 deletions nowcast/next_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1503,8 +1503,15 @@ def after_download_results(msg, config, checklist):
)
if run_type == "nowcast-green":
next_workers[msg.type].append(
NextWorker("nowcast.workers.ping_erddap", args=["nowcast-green"])
NextWorker("nowcast.workers.ping_erddap", args=["nowcast-green"]),
)
for var_group in {"biology", "chemistry", "physics"}:
next_workers[msg.type].append(
NextWorker(
"nowcast.workers.make_averaged_dataset",
args=["day", var_group, "--run-date", run_date],
)
)
if arrow.get(run_date).shift(days=+1).day == 1:
yyyymmm = arrow.get(run_date).format("YYYY-MMM").lower()
next_workers[msg.type].append(
Expand Down Expand Up @@ -1541,7 +1548,34 @@ def after_make_averaged_dataset(msg, config, checklist):
:returns: Worker(s) to launch next
:rtype: list
"""
return []
next_workers = {
"crash": [],
"failure day biology": [],
"failure day chemistry": [],
"failure day physics": [],
"failure month biology": [],
"failure month chemistry": [],
"failure month physics": [],
"success day biology": [],
"success day chemistry": [],
"success day physics": [],
"success month biology": [],
"success month chemistry": [],
"success month physics": [],
}
if msg.type.startswith("success day"):
*_, reshapr_var_group = msg.type.split()
run_date = arrow.get(msg.payload[f"day {reshapr_var_group}"]["run date"])
if run_date.shift(days=+1).day == 1:
first_of_month = run_date.format("YYYY-MM-01")
next_workers[msg.type].append(
NextWorker(
"nowcast.workers.make_averaged_dataset",
args=["month", reshapr_var_group, "--run-date", first_of_month],
host="localhost",
)
)
return next_workers[msg.type]


def after_archive_tarball(msg, config, checklist):
Expand Down
14 changes: 6 additions & 8 deletions nowcast/workers/make_averaged_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ def main():
_configure_structlog()
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_argument(
"host_name", help="Name of the host to run the downsampling extraction on"
)
worker.cli.add_argument(
"avg_time_interval",
choices={"day", "month"},
Expand Down Expand Up @@ -125,7 +122,7 @@ def success(parsed_args):
f"{avg_time_interval}-averaged dataset for {run_date.format('MMM-YYYY')} "
f"{reshapr_var_group} created on {host_name}"
)
msg_type = "success"
msg_type = f"success {avg_time_interval} {reshapr_var_group}"
return msg_type


Expand All @@ -145,7 +142,7 @@ def failure(parsed_args):
f"{avg_time_interval}-averaged dataset for {run_date.format('MMM-YYYY')} "
f"{reshapr_var_group} creation on {host_name} failed"
)
msg_type = "failure"
msg_type = f"failure {avg_time_interval} {reshapr_var_group}"
return msg_type


Expand Down Expand Up @@ -195,9 +192,10 @@ def make_averaged_dataset(parsed_args, config, *args):
dest_nc_filename = file_pattern.format(yyyymmdd=run_date.format("YYYYMMDD"))
nc_path = nc_path.rename(nc_path.with_name(dest_nc_filename))
return {
f"{run_date.format('YYYY-MM-DD')} {avg_time_interval} {reshapr_var_group}": os.fspath(
nc_path
)
f"{avg_time_interval} {reshapr_var_group}": {
"run date": run_date.format("YYYY-MM-DD"),
"file path": os.fspath(nc_path),
}
}


Expand Down
1 change: 0 additions & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def test_loggers(self, prod_config, tmpdir):
"run_NEMO_hindcast",
"watch_NEMO_hindcast",
"split_results",
"make_averaged_dataset",
"reshapr",
"checklist",
"cfgrib",
Expand Down
66 changes: 65 additions & 1 deletion tests/test_next_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,26 @@ def test_success_nowcast_green_launch_ping_erddap_nowcast_green(
)
assert expected in workers

@pytest.mark.parametrize("var_group", ("biology", "chemistry", "physics"))
def test_success_nowcast_green_launch_make_averaged_dataset_day(
self, var_group, config, checklist
):
workers = next_workers.after_download_results(
Message(
"download_results",
"success nowcast-green",
payload={"nowcast-green": {"run date": "2024-02-07"}},
),
config,
checklist,
)
expected = NextWorker(
"nowcast.workers.make_averaged_dataset",
args=["day", var_group, "--run-date", "2024-02-07"],
host="localhost",
)
assert expected in workers

def test_success_nowcast_green_not_monthend_no_launch_archive_tarball(
self, config, checklist
):
Expand Down Expand Up @@ -2180,13 +2200,57 @@ def test_success_hindcast_launch_split_results(
class TestAfterMakeAveragedDataset:
"""Unit tests for the after_make_averaged_dataset function."""

@pytest.mark.parametrize("msg_type", ["crash", "failure", "success"])
@pytest.mark.parametrize(
"msg_type",
[
"crash",
"failure day biology",
"failure day chemistry",
"failure day physics",
"success month biology",
"success month chemistry",
"success month physics",
"failure month biology",
"failure month chemistry",
"failure month physics",
],
)
def test_no_next_worker_msg_types(self, msg_type, config, checklist):
workers = next_workers.after_make_averaged_dataset(
Message("make_averaged_dataset", msg_type), config, checklist
)
assert workers == []

@pytest.mark.parametrize(
"msg_type",
[
"success day biology",
"success day chemistry",
"success day physics",
],
)
def test_month_end_day_success_launch_month_average(
self, msg_type, config, checklist
):
*_, reshapr_var_group = msg_type.split()
msg = Message(
"make_averaged_dataset",
msg_type,
payload={
f"day {reshapr_var_group}": {
"run date": "2024-02-29",
"file path": "SalishSea_1d_20240301_20240301_biol_T.nc",
}
},
)
workers = next_workers.after_make_averaged_dataset(msg, config, checklist)
expected = NextWorker(
"nowcast.workers.make_averaged_dataset",
args=["month", reshapr_var_group, "--run-date", "2024-02-01"],
host="localhost",
)
assert expected in workers


class TestAfterArchiveTarball:
"""Unit tests for the after_archive_tarball function."""
Expand Down
64 changes: 35 additions & 29 deletions tests/workers/test_make_averaged_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,37 +86,31 @@ def test_instantiate_worker(self, mock_worker):
"SalishSeaCast worker that creates a down-sampled time-series dataset netCDF4 file"
)

def test_add_host_name_arg(self, mock_worker):
worker = make_averaged_dataset.main()

assert worker.cli.parser._actions[3].dest == "host_name"
assert worker.cli.parser._actions[3].help

def test_add_avg_time_interval_arg(self, mock_worker):
worker = make_averaged_dataset.main()

assert worker.cli.parser._actions[4].dest == "avg_time_interval"
assert worker.cli.parser._actions[4].choices == {"day", "month"}
assert worker.cli.parser._actions[4].help
assert worker.cli.parser._actions[3].dest == "avg_time_interval"
assert worker.cli.parser._actions[3].choices == {"day", "month"}
assert worker.cli.parser._actions[3].help

def test_add_reshapr_var_group_arg(self, mock_worker):
worker = make_averaged_dataset.main()

assert worker.cli.parser._actions[5].dest == "reshapr_var_group"
assert worker.cli.parser._actions[5].choices == {
assert worker.cli.parser._actions[4].dest == "reshapr_var_group"
assert worker.cli.parser._actions[4].choices == {
"biology",
"chemistry",
"physics",
}
assert worker.cli.parser._actions[5].help
assert worker.cli.parser._actions[4].help

def test_add_run_date_option(self, mock_worker):
worker = make_averaged_dataset.main()
assert worker.cli.parser._actions[6].dest == "run_date"
assert worker.cli.parser._actions[5].dest == "run_date"
expected = nemo_nowcast.cli.CommandLineInterface.arrow_date
assert worker.cli.parser._actions[6].type == expected
assert worker.cli.parser._actions[6].default == arrow.now().floor("day")
assert worker.cli.parser._actions[6].help
assert worker.cli.parser._actions[5].type == expected
assert worker.cli.parser._actions[5].default == arrow.now().floor("day")
assert worker.cli.parser._actions[5].help


class TestConfig:
Expand All @@ -138,8 +132,18 @@ def test_message_registry_keys(self, prod_config):

assert list(msg_registry.keys()) == [
"checklist key",
"success",
"failure",
"success day biology",
"failure day biology",
"success day chemistry",
"failure day chemistry",
"success day physics",
"failure day physics",
"success month biology",
"failure month biology",
"success month chemistry",
"failure month chemistry",
"success month physics",
"failure month physics",
"crash",
]

Expand Down Expand Up @@ -238,7 +242,7 @@ def test_day_average_success(self, avg_time_interval, reshapr_var_group, caplog)
host_name = parsed_args.host_name
expected = f"{avg_time_interval}-averaged dataset for 10-Nov-2022 {reshapr_var_group} created on {host_name}"
assert caplog.messages[0] == expected
assert msg_type == "success"
assert msg_type == f"success {avg_time_interval} {reshapr_var_group}"

@pytest.mark.parametrize(
"avg_time_interval, reshapr_var_group",
Expand All @@ -265,7 +269,7 @@ def test_month_average_success(self, avg_time_interval, reshapr_var_group, caplo
host_name = parsed_args.host_name
expected = f"{avg_time_interval}-averaged dataset for Nov-2022 {reshapr_var_group} created on {host_name}"
assert caplog.messages[0] == expected
assert msg_type == "success"
assert msg_type == f"success {avg_time_interval} {reshapr_var_group}"


class TestFailure:
Expand Down Expand Up @@ -296,7 +300,7 @@ def test_day_average_failure(self, avg_time_interval, reshapr_var_group, caplog)
host_name = parsed_args.host_name
expected = f"{avg_time_interval}-averaged dataset for 10-Nov-2022 {reshapr_var_group} creation on {host_name} failed"
assert caplog.messages[0] == expected
assert msg_type == "failure"
assert msg_type == f"failure {avg_time_interval} {reshapr_var_group}"

@pytest.mark.parametrize(
"avg_time_interval, reshapr_var_group",
Expand All @@ -323,7 +327,7 @@ def test_month_average_failure(self, avg_time_interval, reshapr_var_group, caplo
host_name = parsed_args.host_name
expected = f"{avg_time_interval}-averaged dataset for Nov-2022 {reshapr_var_group} creation on {host_name} failed"
assert caplog.messages[0] == expected
assert msg_type == "failure"
assert msg_type == f"failure {avg_time_interval} {reshapr_var_group}"


class TestMakeAveragedDataset:
Expand Down Expand Up @@ -378,9 +382,10 @@ def mock_extract_netcdf(reshapr_config, reshapr_config_yaml):
)
assert caplog.messages[0] == expected
expected = {
f"2022-11-16 {avg_time_interval} {reshapr_var_group}": os.fspath(
tmp_path / "16nov22" / nc_filename
)
f"{avg_time_interval} {reshapr_var_group}": {
"run date": "2022-11-16",
"file path": os.fspath(tmp_path / "16nov22" / nc_filename),
}
}
assert checklist == expected

Expand Down Expand Up @@ -430,13 +435,14 @@ def mock_extract_netcdf(reshapr_config, reshapr_config_yaml):
)
assert caplog.messages[0] == expected
expected = {
f"2022-11-01 {avg_time_interval} {reshapr_var_group}": os.fspath(
tmp_path / "test_results.nc"
)
f"{avg_time_interval} {reshapr_var_group}": {
"run date": "2022-11-01",
"file path": os.fspath(tmp_path / "test_results.nc"),
}
}
assert checklist == expected

def test_bad_month_avg_run_date(self, caplog):
def test_bad_month_avg_run_date(self, caplog, config):
parsed_args = SimpleNamespace(
avg_time_interval="month",
run_date=arrow.get("2022-11-10"),
Expand Down

0 comments on commit ebbb307

Please sign in to comment.