Skip to content

Commit

Permalink
Update make_ww3_*_file workers and their place in the automation wo…
Browse files Browse the repository at this point in the history
…rkflow (#213)

* Add mpi hosts file to wave forecasts configuration

Enables wwatch3 to be run on different VMs to those where NEMO is run.
This is a step towards running wwatch3 nowcast/forecast runs concurrently with
nowcast-green.

* Optimize xarray.open_mfdataset in make_ww3* workers

Add `chunks` and `drop_variables` parameters into `xarray.open_mfdataset()`
in both workers and tests to optimize data loading and memory usage. This update
improves efficiency and performance by limiting loading of unnecessary variables
and enables parallel processing for large datasets. The hope is that these
changes will resolve (or at least reduce the frequency of) the issue of one or
both of the `make_ww3*` workers stalling during the `ds.to_netcdf()` step in
which dask is computing the task graph.

* Change to run wwatch3 concurrent with nowcast-green

Always launch make_ww3_*_file workers after NEMO forecast finishes instead of
using a config flag to control whether they launch then or after nowcast-green
runs. Concurrency is possible because we have changed to running wwatch3 on
different VMs to NEMO.

* Drop uses of 'wave_forecast_after' missed in 3e332b2
  • Loading branch information
douglatornell authored Nov 17, 2023
1 parent 1c05cde commit f19fe00
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 275 deletions.
8 changes: 4 additions & 4 deletions config/nowcast.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,6 @@ run:
wave forecasts:
# Compute host to run wave forecast on
host: arbutus.cloud-nowcast
# NEMO run to run wwatch3 forecast run after
# User 'after forecast' during storm surge season and 'after nowcast-green`
# the rest of the year
run when: after nowcast-green
# Directory on compute host where files (e.g. ww3_*.inp, mod_def.ww3) and
# directories (e,g. wind/ current/) necessary to prepare the wwatch3 runs
# are stored
Expand All @@ -768,6 +764,10 @@ wave forecasts:
# Template for NEMO hourly results file name
# **Must be quoted to project {} characters**
NEMO file template: 'SalishSea_1h_{s_yyyymmdd}_{e_yyyymmdd}_grid_{grid}.nc'
# Location on the compute host of the file that contains IP addresses
# and MPI slots specifications.
# Only required for runs on cloud hosts.
mpi hosts file: ${HOME}/mpi_hosts.wwatch3
# Path to the wwatch3 executables directory
wwatch3 exe path: /nemoShare/MEOPAR/nowcast-sys/wwatch3-5.16/exe
# Path to the salishsea command processor executable to use in the run script
Expand Down
81 changes: 26 additions & 55 deletions nowcast/next_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,6 @@ def after_watch_NEMO(msg, config, checklist):
race_condition_workers = {}
if msg.type.startswith("success"):
run_type = msg.type.split()[1]
wave_forecast_after = config["wave forecasts"]["run when"].split("after ")[1]
if run_type == "nowcast":
next_workers[msg.type].extend(
[
Expand All @@ -811,33 +810,32 @@ def after_watch_NEMO(msg, config, checklist):
]
)
if run_type == "forecast":
if wave_forecast_after == "forecast":
host_name = config["wave forecasts"]["host"]
next_workers[msg.type].extend(
[
NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
NextWorker(
"nowcast.workers.make_ww3_current_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
]
)
race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"}
host_name = config["wave forecasts"]["host"]
next_workers[msg.type].extend(
[
NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
NextWorker(
"nowcast.workers.make_ww3_current_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
]
)
race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"}
for host in config["run"]["enabled hosts"]:
if not config["run"]["enabled hosts"][host]["shared storage"]:
next_workers[msg.type].append(
Expand All @@ -852,33 +850,6 @@ def after_watch_NEMO(msg, config, checklist):
)
)
if run_type == "nowcast-green":
if wave_forecast_after == "nowcast-green":
host_name = config["wave forecasts"]["host"]
next_workers[msg.type].extend(
[
NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
NextWorker(
"nowcast.workers.make_ww3_current_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
]
)
race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"}
for host in config["run"]["enabled hosts"]:
run_types = config["run"]["enabled hosts"][host]["run types"]
if "nowcast-dev" in run_types:
Expand Down
66 changes: 48 additions & 18 deletions nowcast/workers/make_ww3_current_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,13 @@ def make_ww3_current_file(parsed_args, config, *args):
mesh_mask = os.fspath(grid_dir / config["run types"]["nowcast"]["mesh mask"])
nemo_dir = Path(host_config["run types"]["nowcast"]["results"]).parent
nemo_file_tmpl = config["wave forecasts"]["NEMO file template"]
wave_forecast_after = config["wave forecasts"]["run when"].split("after ")[1]
dest_dir = Path(config["wave forecasts"]["run prep dir"], "current")
filepath_tmpl = config["wave forecasts"]["current file template"]
nc_filepath = dest_dir / filepath_tmpl.format(yyyymmdd=run_date.format("YYYYMMDD"))
if run_type in {"nowcast", "forecast"}:
datasets = _calc_nowcast_datasets(
run_date, nemo_dir, nemo_file_tmpl, wave_forecast_after
)
datasets = _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl)
if run_type == "forecast":
datasets.update(
_calc_forecast_datasets(
run_date, nemo_dir, nemo_file_tmpl, wave_forecast_after
)
)
datasets.update(_calc_forecast_datasets(run_date, nemo_dir, nemo_file_tmpl))
if run_type == "forecast2":
datasets = _calc_forecast2_datasets(
run_date, nemo_dir, nemo_file_tmpl, dest_dir
Expand All @@ -119,12 +112,51 @@ def make_ww3_current_file(parsed_args, config, *args):
lats = grid.nav_lat[1:, 1:]
lons = grid.nav_lon[1:, 1:] + 360
logger.debug(f"lats and lons from: {mesh_mask}")
with xarray.open_mfdataset(datasets["u"]) as u_nemo:
drop_vars = {
"area",
"bounds_lon",
"bounds_lat",
"bounds_nav_lon",
"bounds_nav_lat",
"depthu_bounds",
"depthv_bounds",
"time_centered_bounds",
"time_counter_bounds",
}
chunks = {
"u": {
"time_counter": 3,
"depthu": 40,
"y": 898,
"x": 398,
},
"v": {
"time_counter": 3,
"depthv": 40,
"y": 898,
"x": 398,
},
}
with xarray.open_mfdataset(
datasets["u"],
chunks=chunks["u"],
compat="override",
coords="minimal",
data_vars="minimal",
drop_variables=drop_vars,
) as u_nemo:
logger.debug(f'u velocities from {datasets["u"]}')
with xarray.open_mfdataset(datasets["v"]) as v_nemo:
with xarray.open_mfdataset(
datasets["v"],
chunks=chunks["v"],
compat="override",
coords="minimal",
data_vars="minimal",
drop_variables=drop_vars,
) as v_nemo:
logger.debug(f'v velocities from {datasets["v"]}')
u_unstaggered, v_unstaggered = viz_tools.unstagger(
u_nemo.vozocrtx[:, 0, ...], v_nemo.vomecrty[:, 0, ...]
u_nemo.vozocrtx.isel(depthu=0), v_nemo.vomecrty.isel(depthv=0)
)
del u_unstaggered.coords["time_centered"]
del u_unstaggered.coords["depthu"]
Expand All @@ -146,15 +178,14 @@ def make_ww3_current_file(parsed_args, config, *args):
return checklist


def _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl, wave_forecast_after):
def _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl):
datasets = {"u": [], "v": []}
nemo_results = "nowcast" if wave_forecast_after == "forecast" else "nowcast-green"
dmy = run_date.format("DDMMMYY").lower()
s_yyyymmdd = e_yyyymmdd = run_date.format("YYYYMMDD")
for grid in datasets:
nowcast_file = (
nemo_dir
/ Path(nemo_results, dmy)
/ Path("nowcast", dmy)
/ nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=e_yyyymmdd, grid=grid.upper()
)
Expand All @@ -164,15 +195,14 @@ def _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl, wave_forecast_aft
return datasets


def _calc_forecast_datasets(run_date, nemo_dir, nemo_file_tmpl, wave_forecast_after):
def _calc_forecast_datasets(run_date, nemo_dir, nemo_file_tmpl):
datasets = {"u": [], "v": []}
nemo_results = "nowcast" if wave_forecast_after == "forecast" else "nowcast-green"
dmy = run_date.format("DDMMMYY").lower()
s_yyyymmdd = e_yyyymmdd = run_date.format("YYYYMMDD")
for grid in datasets:
nowcast_file = (
nemo_dir
/ Path(nemo_results, dmy)
/ Path("nowcast", dmy)
/ nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=e_yyyymmdd, grid=grid.upper()
)
Expand Down
25 changes: 24 additions & 1 deletion nowcast/workers/make_ww3_wind_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,30 @@ def make_ww3_wind_file(parsed_args, config, *args):
lats = lats_lons.nav_lat
lons = lats_lons.nav_lon
logger.debug(f"lats and lons from: {datasets[0]}")
with xarray.open_mfdataset(datasets) as hrdps:
drop_vars = {
"LHTFL_surface",
"PRATE_surface",
"RH_2maboveground",
"atmpres",
"precip",
"qair",
"solar",
"tair",
"therm_rad",
}
chunks = {
"time_counter": 1,
"y": 230,
"x": 190,
}
with xarray.open_mfdataset(
datasets,
chunks=chunks,
compat="override",
coords="minimal",
data_vars="minimal",
drop_variables=drop_vars,
) as hrdps:
ds = _create_dataset(
hrdps.time_counter, lats, lons, hrdps.u_wind, hrdps.v_wind, datasets
)
Expand Down
3 changes: 2 additions & 1 deletion nowcast/workers/run_ww3.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,14 @@ def _definitions(run_date, run_type, run_dir_path, results_path, config):
"""
ddmmmyy = run_date.format("DDMMMYY").lower()
wwatch3_exe_path = config["wave forecasts"]["wwatch3 exe path"]
mpi_hosts_file = config["wave forecasts"]["mpi hosts file"]
salishsea_cmd = config["wave forecasts"]["salishsea cmd"]
defns = (
f'RUN_ID="{ddmmmyy}ww3-{run_type}"\n'
f'WORK_DIR="{run_dir_path}"\n'
f'RESULTS_DIR="{results_path/ddmmmyy}"\n'
f'WW3_EXE="{wwatch3_exe_path}"\n'
f'MPIRUN="mpirun --mca btl ^openib --mca orte_tmpdir_base /dev/shm --hostfile ${{HOME}}/mpi_hosts"\n'
f'MPIRUN="mpirun --mca btl ^openib --mca orte_tmpdir_base /dev/shm --hostfile {mpi_hosts_file}"\n'
f'GATHER="{salishsea_cmd} gather"\n'
)
return defns
Expand Down
Loading

0 comments on commit f19fe00

Please sign in to comment.