Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update make_ww3_*_file workers and their place in the automation workflow #213

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions config/nowcast.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,6 @@ run:
wave forecasts:
# Compute host to run wave forecast on
host: arbutus.cloud-nowcast
# NEMO run to run wwatch3 forecast run after
# User 'after forecast' during storm surge season and 'after nowcast-green`
# the rest of the year
run when: after nowcast-green
# Directory on compute host where files (e.g. ww3_*.inp, mod_def.ww3) and
# directories (e,g. wind/ current/) necessary to prepare the wwatch3 runs
# are stored
Expand All @@ -768,6 +764,10 @@ wave forecasts:
# Template for NEMO hourly results file name
# **Must be quoted to project {} characters**
NEMO file template: 'SalishSea_1h_{s_yyyymmdd}_{e_yyyymmdd}_grid_{grid}.nc'
# Location on the compute host of the file that contains IP addresses
# and MPI slots specifications.
# Only required for runs on cloud hosts.
mpi hosts file: ${HOME}/mpi_hosts.wwatch3
# Path to the wwatch3 executables directory
wwatch3 exe path: /nemoShare/MEOPAR/nowcast-sys/wwatch3-5.16/exe
# Path to the salishsea command processor executable to use in the run script
Expand Down
81 changes: 26 additions & 55 deletions nowcast/next_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,6 @@ def after_watch_NEMO(msg, config, checklist):
race_condition_workers = {}
if msg.type.startswith("success"):
run_type = msg.type.split()[1]
wave_forecast_after = config["wave forecasts"]["run when"].split("after ")[1]
if run_type == "nowcast":
next_workers[msg.type].extend(
[
Expand All @@ -811,33 +810,32 @@ def after_watch_NEMO(msg, config, checklist):
]
)
if run_type == "forecast":
if wave_forecast_after == "forecast":
host_name = config["wave forecasts"]["host"]
next_workers[msg.type].extend(
[
NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
NextWorker(
"nowcast.workers.make_ww3_current_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
]
)
race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"}
host_name = config["wave forecasts"]["host"]
next_workers[msg.type].extend(
[
NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
NextWorker(
"nowcast.workers.make_ww3_current_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
]
)
race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"}
for host in config["run"]["enabled hosts"]:
if not config["run"]["enabled hosts"][host]["shared storage"]:
next_workers[msg.type].append(
Expand All @@ -852,33 +850,6 @@ def after_watch_NEMO(msg, config, checklist):
)
)
if run_type == "nowcast-green":
if wave_forecast_after == "nowcast-green":
host_name = config["wave forecasts"]["host"]
next_workers[msg.type].extend(
[
NextWorker(
"nowcast.workers.make_ww3_wind_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
NextWorker(
"nowcast.workers.make_ww3_current_file",
args=[
host_name,
"forecast",
"--run-date",
msg.payload[run_type]["run date"],
],
host=host_name,
),
]
)
race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"}
for host in config["run"]["enabled hosts"]:
run_types = config["run"]["enabled hosts"][host]["run types"]
if "nowcast-dev" in run_types:
Expand Down
66 changes: 48 additions & 18 deletions nowcast/workers/make_ww3_current_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,13 @@ def make_ww3_current_file(parsed_args, config, *args):
mesh_mask = os.fspath(grid_dir / config["run types"]["nowcast"]["mesh mask"])
nemo_dir = Path(host_config["run types"]["nowcast"]["results"]).parent
nemo_file_tmpl = config["wave forecasts"]["NEMO file template"]
wave_forecast_after = config["wave forecasts"]["run when"].split("after ")[1]
dest_dir = Path(config["wave forecasts"]["run prep dir"], "current")
filepath_tmpl = config["wave forecasts"]["current file template"]
nc_filepath = dest_dir / filepath_tmpl.format(yyyymmdd=run_date.format("YYYYMMDD"))
if run_type in {"nowcast", "forecast"}:
datasets = _calc_nowcast_datasets(
run_date, nemo_dir, nemo_file_tmpl, wave_forecast_after
)
datasets = _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl)
if run_type == "forecast":
datasets.update(
_calc_forecast_datasets(
run_date, nemo_dir, nemo_file_tmpl, wave_forecast_after
)
)
datasets.update(_calc_forecast_datasets(run_date, nemo_dir, nemo_file_tmpl))
if run_type == "forecast2":
datasets = _calc_forecast2_datasets(
run_date, nemo_dir, nemo_file_tmpl, dest_dir
Expand All @@ -119,12 +112,51 @@ def make_ww3_current_file(parsed_args, config, *args):
lats = grid.nav_lat[1:, 1:]
lons = grid.nav_lon[1:, 1:] + 360
logger.debug(f"lats and lons from: {mesh_mask}")
with xarray.open_mfdataset(datasets["u"]) as u_nemo:
drop_vars = {
"area",
"bounds_lon",
"bounds_lat",
"bounds_nav_lon",
"bounds_nav_lat",
"depthu_bounds",
"depthv_bounds",
"time_centered_bounds",
"time_counter_bounds",
}
chunks = {
"u": {
"time_counter": 3,
"depthu": 40,
"y": 898,
"x": 398,
},
"v": {
"time_counter": 3,
"depthv": 40,
"y": 898,
"x": 398,
},
}
with xarray.open_mfdataset(
datasets["u"],
chunks=chunks["u"],
compat="override",
coords="minimal",
data_vars="minimal",
drop_variables=drop_vars,
) as u_nemo:
logger.debug(f'u velocities from {datasets["u"]}')
with xarray.open_mfdataset(datasets["v"]) as v_nemo:
with xarray.open_mfdataset(
datasets["v"],
chunks=chunks["v"],
compat="override",
coords="minimal",
data_vars="minimal",
drop_variables=drop_vars,
) as v_nemo:
logger.debug(f'v velocities from {datasets["v"]}')
u_unstaggered, v_unstaggered = viz_tools.unstagger(
u_nemo.vozocrtx[:, 0, ...], v_nemo.vomecrty[:, 0, ...]
u_nemo.vozocrtx.isel(depthu=0), v_nemo.vomecrty.isel(depthv=0)
)
del u_unstaggered.coords["time_centered"]
del u_unstaggered.coords["depthu"]
Expand All @@ -146,15 +178,14 @@ def make_ww3_current_file(parsed_args, config, *args):
return checklist


def _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl, wave_forecast_after):
def _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl):
datasets = {"u": [], "v": []}
nemo_results = "nowcast" if wave_forecast_after == "forecast" else "nowcast-green"
dmy = run_date.format("DDMMMYY").lower()
s_yyyymmdd = e_yyyymmdd = run_date.format("YYYYMMDD")
for grid in datasets:
nowcast_file = (
nemo_dir
/ Path(nemo_results, dmy)
/ Path("nowcast", dmy)
/ nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=e_yyyymmdd, grid=grid.upper()
)
Expand All @@ -164,15 +195,14 @@ def _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl, wave_forecast_aft
return datasets


def _calc_forecast_datasets(run_date, nemo_dir, nemo_file_tmpl, wave_forecast_after):
def _calc_forecast_datasets(run_date, nemo_dir, nemo_file_tmpl):
datasets = {"u": [], "v": []}
nemo_results = "nowcast" if wave_forecast_after == "forecast" else "nowcast-green"
dmy = run_date.format("DDMMMYY").lower()
s_yyyymmdd = e_yyyymmdd = run_date.format("YYYYMMDD")
for grid in datasets:
nowcast_file = (
nemo_dir
/ Path(nemo_results, dmy)
/ Path("nowcast", dmy)
/ nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=e_yyyymmdd, grid=grid.upper()
)
Expand Down
25 changes: 24 additions & 1 deletion nowcast/workers/make_ww3_wind_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,30 @@ def make_ww3_wind_file(parsed_args, config, *args):
lats = lats_lons.nav_lat
lons = lats_lons.nav_lon
logger.debug(f"lats and lons from: {datasets[0]}")
with xarray.open_mfdataset(datasets) as hrdps:
drop_vars = {
"LHTFL_surface",
"PRATE_surface",
"RH_2maboveground",
"atmpres",
"precip",
"qair",
"solar",
"tair",
"therm_rad",
}
chunks = {
"time_counter": 1,
"y": 230,
"x": 190,
}
with xarray.open_mfdataset(
datasets,
chunks=chunks,
compat="override",
coords="minimal",
data_vars="minimal",
drop_variables=drop_vars,
) as hrdps:
ds = _create_dataset(
hrdps.time_counter, lats, lons, hrdps.u_wind, hrdps.v_wind, datasets
)
Expand Down
3 changes: 2 additions & 1 deletion nowcast/workers/run_ww3.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,14 @@ def _definitions(run_date, run_type, run_dir_path, results_path, config):
"""
ddmmmyy = run_date.format("DDMMMYY").lower()
wwatch3_exe_path = config["wave forecasts"]["wwatch3 exe path"]
mpi_hosts_file = config["wave forecasts"]["mpi hosts file"]
salishsea_cmd = config["wave forecasts"]["salishsea cmd"]
defns = (
f'RUN_ID="{ddmmmyy}ww3-{run_type}"\n'
f'WORK_DIR="{run_dir_path}"\n'
f'RESULTS_DIR="{results_path/ddmmmyy}"\n'
f'WW3_EXE="{wwatch3_exe_path}"\n'
f'MPIRUN="mpirun --mca btl ^openib --mca orte_tmpdir_base /dev/shm --hostfile ${{HOME}}/mpi_hosts"\n'
f'MPIRUN="mpirun --mca btl ^openib --mca orte_tmpdir_base /dev/shm --hostfile {mpi_hosts_file}"\n'
f'GATHER="{salishsea_cmd} gather"\n'
)
return defns
Expand Down
Loading
Loading