Skip to content

Commit

Permalink
Move wwatch3 prep to after forecast2 result download
Browse files Browse the repository at this point in the history
Testing the hypothesis that the make_ww3_* worker stalls are due to file system
overload that results downloading contributes to.
  • Loading branch information
douglatornell committed Nov 7, 2023
1 parent 96f8cc4 commit 5839684
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 80 deletions.
59 changes: 31 additions & 28 deletions nowcast/next_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,34 +888,6 @@ def after_watch_NEMO(msg, config, checklist):
args=[host, "nowcast+", "--shared-storage"],
)
)
if run_type == "forecast2":
host_name = config["wave forecasts"]["host"]
run_date = arrow.get(msg.payload[run_type]["run date"]).shift(days=+1)
next_workers[msg.type].extend(
[
NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=[
host_name,
"forecast2",
"--run-date",
run_date.format("YYYY-MM-DD"),
],
host=host_name,
),
NextWorker(
"nowcast.workers.make_ww3_current_file",
args=[
host_name,
"forecast2",
"--run-date",
run_date.format("YYYY-MM-DD"),
],
host=host_name,
),
]
)
race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"}
enabled_host_config = config["run"]["enabled hosts"][
msg.payload[run_type]["host"]
]
Expand Down Expand Up @@ -1511,6 +1483,7 @@ def after_download_results(msg, config, checklist):
"success hindcast": [],
"success nowcast-agrif": [],
}
race_condition_workers = {}
if msg.type.startswith("success"):
run_type = msg.type.split()[1]
run_date = msg.payload[run_type]["run date"]
Expand Down Expand Up @@ -1566,6 +1539,36 @@ def after_download_results(msg, config, checklist):
args=[run_type, "--run-date", run_date],
)
)
if run_type == "forecast2":
host_name = config["wave forecasts"]["host"]
run_date = arrow.get(msg.payload[run_type]["run date"]).shift(days=+1)
next_workers[msg.type].extend(
[
NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=[
host_name,
"forecast2",
"--run-date",
run_date.format("YYYY-MM-DD"),
],
host=host_name,
),
NextWorker(
"nowcast.workers.make_ww3_current_file",
args=[
host_name,
"forecast2",
"--run-date",
run_date.format("YYYY-MM-DD"),
],
host=host_name,
),
]
)
race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"}
if race_condition_workers:
return next_workers[msg.type], race_condition_workers
return next_workers[msg.type]


Expand Down
96 changes: 44 additions & 52 deletions tests/test_next_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1273,58 +1273,6 @@ def test_success_forecast_no_launch_make_fvcom_boundary(self, config, checklist)
)
assert expected not in workers

def test_success_forecast2_launch_make_ww3_wind_file_forecast2(
self, config, checklist
):
workers, race_condition_workers = next_workers.after_watch_NEMO(
Message(
"watch_NEMO",
"success forecast2",
{
"forecast2": {
"host": "arbutus.cloud",
"run date": "2023-04-07",
"completed": True,
}
},
),
config,
checklist,
)
expected = NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=["arbutus.cloud", "forecast2", "--run-date", "2023-04-08"],
host="arbutus.cloud",
)
assert workers[0] == expected
assert race_condition_workers == {"make_ww3_wind_file", "make_ww3_current_file"}

def test_success_forecast2_launch_make_ww3_current_file_forecast2(
self, config, checklist
):
workers, race_condition_workers = next_workers.after_watch_NEMO(
Message(
"watch_NEMO",
"success forecast2",
{
"forecast2": {
"host": "arbutus.cloud",
"run date": "2023-04-07",
"completed": True,
}
},
),
config,
checklist,
)
expected = NextWorker(
"nowcast.workers.make_ww3_current_file",
args=["arbutus.cloud", "forecast2", "--run-date", "2023-04-08"],
host="arbutus.cloud",
)
assert workers[1] == expected
assert race_condition_workers == {"make_ww3_wind_file", "make_ww3_current_file"}

def test_success_nowcast_green_launch_make_ww3_wind_file_forecast(
self, config, checklist, monkeypatch
):
Expand Down Expand Up @@ -2247,6 +2195,46 @@ def test_success_nowcast_launch_make_plots_specials(
)
assert expected in workers

def test_success_forecast2_launch_make_ww3_wind_file_forecast2(
self, config, checklist
):
workers, race_condition_workers = next_workers.after_download_results(
Message(
"download_results",
"success forecast2",
payload={"forecast2": {"run date": "2023-11-07"}},
),
config,
checklist,
)
expected = NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=["arbutus.cloud", "forecast2", "--run-date", "2023-11-08"],
host="arbutus.cloud",
)
assert workers[1] == expected
assert race_condition_workers == {"make_ww3_wind_file", "make_ww3_current_file"}

def test_success_forecast2_launch_make_ww3_current_file_forecast2(
self, config, checklist
):
workers, race_condition_workers = next_workers.after_download_results(
Message(
"watch_NEMO",
"success forecast2",
payload={"forecast2": {"run date": "2023-11-07"}},
),
config,
checklist,
)
expected = NextWorker(
"nowcast.workers.make_ww3_current_file",
args=["arbutus.cloud", "forecast2", "--run-date", "2023-11-08"],
host="arbutus.cloud",
)
assert workers[2] == expected
assert race_condition_workers == {"make_ww3_wind_file", "make_ww3_current_file"}

def test_success_nowcast_green_launch_ping_erddap_nowcast_green(
self, config, checklist
):
Expand Down Expand Up @@ -2327,6 +2315,10 @@ def test_success_launch_make_CHS_currents_file(
args=[run_type, "--run-date", run_date],
host="localhost",
)
try:
workers, race_condition_workers = workers
except ValueError:
pass
assert expected in workers

def test_success_hindcast_launch_split_results(
Expand Down

0 comments on commit 5839684

Please sign in to comment.