From 4373f04119dc5102a91607fa47d1f587afefce6d Mon Sep 17 00:00:00 2001 From: Doug Latornell Date: Thu, 13 Jun 2024 11:06:50 -0700 Subject: [PATCH] Improve WWatch3 wind and currents forcing file generation (#271) * Add h5netcdf pkg dependency Hoping to improve the reliability of the make_ww3_current_file and make_ww3_wind_file workers by changing them to use h5netcdf for dataset reads. The change results in version updates for multiple dependencies in requirements .txt, and inclusion of the h5netcdf package across multiple environment files and the pyproject.toml file. * Update make_ww3_*_file workers main() function docstrings Removed not informative "Set up and run the worker." line at the beginning. re: issue #121 * Update wind file generation re: unneeded variables Refactored the WWatch3 wind file generation to remove unnecessary variables and improved related tests. The code now drops more unneeded variables to reduce the memory load. The corresponding tests are also enhanced to accurately represent these changes. * Switch to h5netcdf engine for xarray dataset reads The h5netcdf engine has been set as the engine for opening datasets in 'make_ww3_wind_file.py'. The intent is to avoid the netcdf4 package thread-safety issues. The corresponding test cases in 'test_make_ww3_wind_file.py' have also been updated to reflect this change. * Increase wind time_counter chunk size from 1 to 24 The chunk size for the time_counter variable in the make_ww3_wind_file worker and corresponding tests has been increased. This change is anticipated to improve efficiency by processing data in larger batches. * Use dask processes for wwatch3 wind file generation The wind file creation process for the WWatch3 model has been updated to use processes rather than the default threads for the dask scheduler. Processes have been found to be more reliable for dask operations on the types of workloads we use in SalishSeaCast. * Use netcdf4 to save dataset in make_ww3_wind_file Explicitly use netcdf4 as the engine for dataset writing. This avoids incompatibilities in the resulting file that arise if it is written using h5netcdf. * Update make_ww3_current_file re: unneeded variables Refactored the WWatch3 current file generation to remove unnecessary variables and improved related tests. The code now drops more unneeded variables to reduce the memory load. The corresponding tests are also enhanced to accurately represent these changes. * Switch to h5netcdf engine for xarray dataset reads The h5netcdf engine has been set as the engine for opening datasets in 'make_ww3_current_file.py'. The intent is to avoid the netcdf4 package thread-safety issues. The corresponding test cases in 'test_make_ww3_current_file.py' have also been updated to reflect this change. * Decrease current time_counter chunk size to 1 The chunk size for the time_counter variable in the make_current_wind_file worker has decreased from 3 to 1. Testing showed that the smaller chunk size resulted in slightly faster processing. * Use dask processes for wwatch3 currents file generation The currents file creation process for the WWatch3 model has been updated to use processes rather than the default threads for the dask scheduler. Processes have been found to be more reliable for dask operations on the types of workloads we use in SalishSeaCast. * Use netcdf4 to save dataset in make_current_wind_file Explicitly use netcdf4 as the engine for dataset writing. This avoids incompatibilities in the resulting file that arise if it is written using h5netcdf. --- envs/environment-dev.yaml | 1 + envs/environment-fig-dev.yaml | 1 + envs/environment-linkcheck.yaml | 1 + envs/environment-prod.yaml | 1 + envs/requirements.txt | 27 +++++---- nowcast/workers/make_ww3_current_file.py | 29 ++++++--- nowcast/workers/make_ww3_wind_file.py | 29 ++++++--- pyproject.toml | 1 + tests/workers/test_make_ww3_current_file.py | 49 ++++++++++++++- tests/workers/test_make_ww3_wind_file.py | 66 +++++++++++++++++++-- 10 files changed, 168 insertions(+), 37 deletions(-) diff --git a/envs/environment-dev.yaml b/envs/environment-dev.yaml index 691f2f27..02174175 100644 --- a/envs/environment-dev.yaml +++ b/envs/environment-dev.yaml @@ -41,6 +41,7 @@ dependencies: - geopandas - gitpython - gsw + - h5netcdf - httpx - lxml - mako diff --git a/envs/environment-fig-dev.yaml b/envs/environment-fig-dev.yaml index 337e0f1d..70dca8c3 100644 --- a/envs/environment-fig-dev.yaml +++ b/envs/environment-fig-dev.yaml @@ -34,6 +34,7 @@ dependencies: - geopandas - gitpython - gsw + - h5netcdf - httpx - lxml - mako diff --git a/envs/environment-linkcheck.yaml b/envs/environment-linkcheck.yaml index 2fe794c8..c9a5df8f 100644 --- a/envs/environment-linkcheck.yaml +++ b/envs/environment-linkcheck.yaml @@ -30,6 +30,7 @@ dependencies: - gitpython - gsw - httpx + - h5netcdf - lxml - mako - matplotlib diff --git a/envs/environment-prod.yaml b/envs/environment-prod.yaml index 0f8df8a6..eb878f60 100644 --- a/envs/environment-prod.yaml +++ b/envs/environment-prod.yaml @@ -41,6 +41,7 @@ dependencies: - geopandas - gitpython - gsw + - h5netcdf - httpx - lxml - mako diff --git a/envs/requirements.txt b/envs/requirements.txt index 99cfa798..ce42c66a 100644 --- a/envs/requirements.txt +++ b/envs/requirements.txt @@ -23,10 +23,11 @@ bokeh==3.4.1 Bottleneck==1.3.8 branca==0.7.2 Brotli==1.1.0 +cached-property==1.5.2 Cartopy==0.23.0 certifi==2024.2.2 cffi==1.16.0 -cfgrib==0.9.11.0 +cfgrib==0.9.12.0 cfgv==3.3.1 cftime==1.6.3 charset-normalizer==3.3.2 @@ -45,10 +46,10 @@ coverage==7.5.1 cryptography==42.0.7 cycler==0.12.1 cytoolz==0.12.3 -dask==2024.5.0 -dask-expr==1.1.0 +dask==2024.5.1 +dask-expr==1.1.1 distlib==0.3.8 -distributed==2024.5.0 +distributed==2024.5.1 docutils==0.20.1 eccodes==1.7.0 editables==0.5 @@ -60,7 +61,7 @@ feedgen==1.0.0 filelock==3.14.0 findlibs==0.0.5 fiona==1.9.6 -flox==0.9.7 +flox==0.9.8 folium==0.16.0 fonttools==4.51.0 fsspec==2024.5.0 @@ -72,7 +73,9 @@ GitPython==3.1.43 gsw==3.6.18 h11==0.14.0 h2==4.1.0 -hatch==1.11.0 +h5netcdf==1.3.0 +h5py==3.11.0 +hatch==1.12.0 hatchling==1.24.2 hpack==4.0.0 httpcore==1.0.5 @@ -148,7 +151,7 @@ PyQt5==5.15.9 PyQt5-sip==12.12.2 pyshp==2.3.1 PySocks==1.7.1 -pytest==8.2.0 +pytest==8.2.1 pytest-cov==5.0.0 pytest_httpx==0.29.0 pytest-randomly==3.15.0 @@ -159,7 +162,7 @@ pytz==2024.1 PyYAML==6.0.1 pyzmq==26.0.3 rasterio==1.3.10 -requests==2.32.0 +requests==2.32.3 requests-file==2.0.0 requests-toolbelt==1.0.0 retrying==1.3.3 @@ -167,10 +170,10 @@ rich==13.7.1 Rtree==1.2.0 schedule==1.2.1 scikit-learn==1.4.2 -scipy==1.13.0 +scipy==1.13.1 scour==0.38.2 SecretStorage==3.3.3 -sentry-sdk==2.2.0 +sentry-sdk==2.3.1 setuptools==69.5.1 shapely==2.0.4 shellingham==1.5.4 @@ -193,7 +196,7 @@ sphinxcontrib-jsmath==1.0.1 sphinxcontrib-qthelp==1.0.7 sphinxcontrib-serializinghtml==1.1.10 stevedore==5.2.0 -structlog==24.1.0 +structlog==24.2.0 supervisor==4.2.5 sysrsync==1.1.1 tables==3.9.2 @@ -217,7 +220,7 @@ userpath==1.7.0 utm==0.7.0 verboselogs==1.7 virtualenv==20.26.2 -watchdog==4.0.0 +watchdog==4.0.1 wcwidth==0.2.13 wheel==0.43.0 xarray==2024.5.0 diff --git a/nowcast/workers/make_ww3_current_file.py b/nowcast/workers/make_ww3_current_file.py index 05b0f909..60ca1b91 100644 --- a/nowcast/workers/make_ww3_current_file.py +++ b/nowcast/workers/make_ww3_current_file.py @@ -35,9 +35,7 @@ def main(): - """Set up and run the worker. - - For command-line usage see: + """For command-line usage see: :command:`python -m nowcast.workers.make_ww3_current_file --help` """ @@ -108,7 +106,14 @@ def make_ww3_current_file(parsed_args, config, *args): datasets = _calc_forecast2_datasets( run_date, nemo_dir, nemo_file_tmpl, dest_dir ) - with xarray.open_dataset(mesh_mask) as grid: + drop_vars = { + 'gphiu', 'vmask', 'gdept_0', 'gdepw_0', 'umask', 'gphif', 'e3v_0', 'time_counter', + 'isfdraft', 'glamu', 'e1f', 'vmaskutil', 'mbathy', 'e2t', 'e2u', 'e3u_0', 'ff', 'gdept_1d', + 'gphit', 'e3w_0', 'e1u', 'e1t', 'e2v', 'fmaskutil', 'tmaskutil', 'gdepv', 'misf', 'gphiv', + 'e3t_1d', 'fmask', 'tmask', 'e3t_0', 'gdepw_1d', 'gdepu', 'glamt', 'glamf', + 'e3w_1d', 'e1v', 'umaskutil', 'glamv', 'e2f', + } + with xarray.open_dataset(mesh_mask, drop_variables=drop_vars, engine="h5netcdf") as grid: lats = grid.nav_lat[1:, 1:] lons = grid.nav_lon[1:, 1:] + 360 logger.debug(f"lats and lons from: {mesh_mask}") @@ -120,18 +125,19 @@ def make_ww3_current_file(parsed_args, config, *args): "bounds_nav_lat", "depthu_bounds", "depthv_bounds", + "time_centered", "time_centered_bounds", "time_counter_bounds", } chunks = { "u": { - "time_counter": 3, + "time_counter": 1, "depthu": 40, "y": 898, "x": 398, }, "v": { - "time_counter": 3, + "time_counter": 1, "depthv": 40, "y": 898, "x": 398, @@ -144,6 +150,7 @@ def make_ww3_current_file(parsed_args, config, *args): coords="minimal", data_vars="minimal", drop_variables=drop_vars, + engine="h5netcdf", ) as u_nemo: logger.debug(f'u velocities from {datasets["u"]}') with xarray.open_mfdataset( @@ -153,14 +160,13 @@ def make_ww3_current_file(parsed_args, config, *args): coords="minimal", data_vars="minimal", drop_variables=drop_vars, + engine="h5netcdf", ) as v_nemo: logger.debug(f'v velocities from {datasets["v"]}') u_unstaggered, v_unstaggered = viz_tools.unstagger( u_nemo.vozocrtx.isel(depthu=0), v_nemo.vomecrty.isel(depthv=0) ) - del u_unstaggered.coords["time_centered"] del u_unstaggered.coords["depthu"] - del v_unstaggered.coords["time_centered"] del v_unstaggered.coords["depthv"] logger.debug("unstaggered velocity components on to mesh mask lats/lons") u_current, v_current = viz_tools.rotate_vel(u_unstaggered, v_unstaggered) @@ -169,7 +175,12 @@ def make_ww3_current_file(parsed_args, config, *args): u_current.time_counter, lats, lons, u_current, v_current, datasets ) logger.debug("created currents dataset") - ds.to_netcdf(os.fspath(nc_filepath)) + dask_scheduler = { + "scheduler": "processes", + "max_workers": 8, + } + ds.compute(**dask_scheduler) + ds.to_netcdf(nc_filepath, engine="netcdf4") logger.debug(f"stored currents forcing file: {nc_filepath}") checklist = { run_type: os.fspath(nc_filepath), diff --git a/nowcast/workers/make_ww3_wind_file.py b/nowcast/workers/make_ww3_wind_file.py index 25345f64..c5c4bb2f 100644 --- a/nowcast/workers/make_ww3_wind_file.py +++ b/nowcast/workers/make_ww3_wind_file.py @@ -28,9 +28,7 @@ def main(): - """Set up and run the worker. - - For command-line usage see: + """For command-line usage see: :command:`python -m nowcast.workers.make_ww3_wind_file --help` """ @@ -110,10 +108,6 @@ def make_ww3_wind_file(parsed_args, config, *args): dest_dir = Path(config["wave forecasts"]["run prep dir"], "wind") filepath_tmpl = config["wave forecasts"]["wind file template"] nc_filepath = dest_dir / filepath_tmpl.format(yyyymmdd=run_date.format("YYYYMMDD")) - with xarray.open_dataset(datasets[0]) as lats_lons: - lats = lats_lons.nav_lat - lons = lats_lons.nav_lon - logger.debug(f"lats and lons from: {datasets[0]}") drop_vars = { "LHTFL_surface", "PRATE_surface", @@ -124,9 +118,20 @@ def make_ww3_wind_file(parsed_args, config, *args): "solar", "tair", "therm_rad", + "u_wind", + "v_wind", } + with xarray.open_dataset(datasets[0], drop_variables=drop_vars, engine = "h5netcdf") as lats_lons: + lats = lats_lons.nav_lat + lons = lats_lons.nav_lon + logger.debug(f"lats and lons from: {datasets[0]}") + drop_vars = drop_vars.union({ + "nav_lon", + "nav_lat", + }) + drop_vars = drop_vars.difference({"u_wind", "v_wind"}) chunks = { - "time_counter": 1, + "time_counter": 24, "y": 230, "x": 190, } @@ -137,12 +142,18 @@ def make_ww3_wind_file(parsed_args, config, *args): coords="minimal", data_vars="minimal", drop_variables=drop_vars, + engine="h5netcdf", ) as hrdps: ds = _create_dataset( hrdps.time_counter, lats, lons, hrdps.u_wind, hrdps.v_wind, datasets ) logger.debug("created winds dataset") - ds.to_netcdf(os.fspath(nc_filepath)) + dask_scheduler = { + "scheduler": "processes", + "max_workers": 8, + } + ds.compute(**dask_scheduler) + ds.to_netcdf(nc_filepath, engine="netcdf4") logger.debug(f"stored wind forcing file: {nc_filepath}") checklist = {run_type: os.fspath(nc_filepath)} return checklist diff --git a/pyproject.toml b/pyproject.toml index a362344b..df22af17 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,6 +60,7 @@ dependencies = [ "geopandas", "gitpython", "gsw", + "h5netcdf", "httpx", "lxml", "mako", diff --git a/tests/workers/test_make_ww3_current_file.py b/tests/workers/test_make_ww3_current_file.py index e2d1aa95..5197b35a 100644 --- a/tests/workers/test_make_ww3_current_file.py +++ b/tests/workers/test_make_ww3_current_file.py @@ -340,7 +340,54 @@ def test_mesh_mask_dataset( m_unstagger.return_value = (MagicMock(), MagicMock()) m_rotate_vel.return_value = (MagicMock(), MagicMock()) make_ww3_current_file.make_ww3_current_file(parsed_args, config) - m_open_dataset.assert_called_once_with("grid/mesh_mask201702.nc") + drop_vars = { + "gphiu", + "vmask", + "gdept_0", + "gdepw_0", + "umask", + "gphif", + "e3v_0", + "time_counter", + "isfdraft", + "glamu", + "e1f", + "vmaskutil", + "mbathy", + "e2t", + "e2u", + "e3u_0", + "ff", + "gdept_1d", + "gphit", + "e3w_0", + "e1u", + "e1t", + "e2v", + "fmaskutil", + "tmaskutil", + "gdepv", + "misf", + "gphiv", + "e3t_1d", + "fmask", + "tmask", + "e3t_0", + "gdepw_1d", + "gdepu", + "glamt", + "glamf", + "e3w_1d", + "e1v", + "umaskutil", + "glamv", + "e2f", + } + m_open_dataset.assert_called_once_with( + "grid/mesh_mask201702.nc", + drop_variables=drop_vars, + engine="h5netcdf", + ) class TestCalcNowcastDatasets: diff --git a/tests/workers/test_make_ww3_wind_file.py b/tests/workers/test_make_ww3_wind_file.py index f6674573..e6bd0b66 100644 --- a/tests/workers/test_make_ww3_wind_file.py +++ b/tests/workers/test_make_ww3_wind_file.py @@ -223,11 +223,26 @@ def test_nowcast_dataset( run_date=arrow.get("2019-03-24"), ) make_ww3_wind_file.make_ww3_wind_file(parsed_args, config) + drop_vars = { + "LHTFL_surface", + "PRATE_surface", + "RH_2maboveground", + "atmpres", + "precip", + "qair", + "solar", + "tair", + "therm_rad", + "u_wind", + "v_wind", + } m_open_dataset.assert_called_once_with( - Path("/nemoShare/MEOPAR/GEM2.5/ops/NEMO-atmos/ops_y2019m03d24.nc") + Path("/nemoShare/MEOPAR/GEM2.5/ops/NEMO-atmos/ops_y2019m03d24.nc"), + drop_variables=drop_vars, + engine="h5netcdf", ) chunks = { - "time_counter": 1, + "time_counter": 24, "y": 230, "x": 190, } @@ -241,6 +256,8 @@ def test_nowcast_dataset( "solar", "tair", "therm_rad", + "nav_lon", + "nav_lat", } m_open_mfdataset.assert_called_once_with( [Path("/nemoShare/MEOPAR/GEM2.5/ops/NEMO-atmos/ops_y2019m03d24.nc")], @@ -249,6 +266,7 @@ def test_nowcast_dataset( coords="minimal", data_vars="minimal", drop_variables=drop_vars, + engine="h5netcdf", ) def test_forecast_datasets( @@ -260,11 +278,26 @@ def test_forecast_datasets( run_date=arrow.get("2017-04-07"), ) make_ww3_wind_file.make_ww3_wind_file(parsed_args, config) + drop_vars = { + "LHTFL_surface", + "PRATE_surface", + "RH_2maboveground", + "atmpres", + "precip", + "qair", + "solar", + "tair", + "therm_rad", + "u_wind", + "v_wind", + } m_open_dataset.assert_called_once_with( - Path("/nemoShare/MEOPAR/GEM2.5/ops/NEMO-atmos/ops_y2017m04d07.nc") + Path("/nemoShare/MEOPAR/GEM2.5/ops/NEMO-atmos/ops_y2017m04d07.nc"), + drop_variables=drop_vars, + engine="h5netcdf", ) chunks = { - "time_counter": 1, + "time_counter": 24, "y": 230, "x": 190, } @@ -278,6 +311,8 @@ def test_forecast_datasets( "solar", "tair", "therm_rad", + "nav_lon", + "nav_lat", } m_open_mfdataset.assert_called_once_with( [ @@ -290,6 +325,7 @@ def test_forecast_datasets( coords="minimal", data_vars="minimal", drop_variables=drop_vars, + engine="h5netcdf" ) def test_forecast2_datasets( @@ -301,11 +337,26 @@ def test_forecast2_datasets( run_date=arrow.get("2017-04-07"), ) make_ww3_wind_file.make_ww3_wind_file(parsed_args, config) + drop_vars = { + "LHTFL_surface", + "PRATE_surface", + "RH_2maboveground", + "atmpres", + "precip", + "qair", + "solar", + "tair", + "therm_rad", + "u_wind", + "v_wind", + } m_open_dataset.assert_called_once_with( - Path("/nemoShare/MEOPAR/GEM2.5/ops/NEMO-atmos/fcst/ops_y2017m04d07.nc") + Path("/nemoShare/MEOPAR/GEM2.5/ops/NEMO-atmos/fcst/ops_y2017m04d07.nc"), + drop_variables=drop_vars, + engine="h5netcdf", ) chunks = { - "time_counter": 1, + "time_counter": 24, "y": 230, "x": 190, } @@ -319,6 +370,8 @@ def test_forecast2_datasets( "solar", "tair", "therm_rad", + "nav_lon", + "nav_lat", } m_open_mfdataset.assert_called_once_with( [ @@ -331,4 +384,5 @@ def test_forecast2_datasets( coords="minimal", data_vars="minimal", drop_variables=drop_vars, + engine="h5netcdf" )