diff --git a/nowcast/workers/crop_gribs.py b/nowcast/workers/crop_gribs.py index fcf29c3b..964bb26c 100644 --- a/nowcast/workers/crop_gribs.py +++ b/nowcast/workers/crop_gribs.py @@ -61,15 +61,23 @@ def main(): default=arrow.now().floor("day"), help="Forecast date to crop files in.", ) + worker.cli.add_argument( + "--backfill", + action="store_true", + help=""" + Crop files immediately instead of waiting for file system events to signal their + existence. This is intended for use in recovery from automation failures. + """, + ) worker.cli.add_argument( "--var-hour", type=int, - help="forecast hour to crop file for specific variable in; must be used with --var", + help="Forecast hour to crop file for specific variable in; must be used with --var", ) worker.cli.add_argument( "--var", dest="msc_var_name", - help="forecast variable to crop file for; must be used with --var-hour", + help="Forecast variable to crop file for; must be used with --var-hour", ) worker.run(crop_gribs, success, failure) return worker @@ -96,6 +104,7 @@ def crop_gribs(parsed_args, config, *args): checklist = {} fcst_hr = parsed_args.forecast fcst_date = parsed_args.fcst_date + backfill = parsed_args.backfill var_hour = parsed_args.var_hour msc_var_name = parsed_args.msc_var_name logger.info( @@ -117,6 +126,15 @@ def crop_gribs(parsed_args, config, *args): msc_var_name, ) + if backfill: + for eccc_grib_file in eccc_grib_files: + _write_ssc_grib_file(eccc_grib_file, config) + logger.info( + f"finished cropping ECCC grib file to SalishSeaCast subdomain: {eccc_grib_file}" + ) + checklist[fcst_hr] = "cropped to SalishSeaCast subdomain" + return checklist + if msc_var_name and var_hour: # Crop a single variable-hour file eccc_grib_file = eccc_grib_files.pop() @@ -203,6 +221,15 @@ def _write_ssc_grib_file(eccc_grib_file, config): :param :py:class:`pathlib.Path` eccc_grib_file: :param :py:class:`nemo_nowcast.Config` config: """ + ssc_grib_file = ( + f"{eccc_grib_file.parent / eccc_grib_file.stem}_SSC{eccc_grib_file.suffix}" + ) + if Path(ssc_grib_file).exists(): + logger.debug( + f"cropping skipped because SalishSeaCast subdomain GRIB file exist: {ssc_grib_file}" + ) + return + y_min, y_max = config["weather"]["download"]["2.5 km"]["lat indices"] x_min, x_max = config["weather"]["download"]["2.5 km"]["lon indices"] # We need 1 point more than the final domain size to facilitate calculation of the @@ -231,9 +258,6 @@ def _write_ssc_grib_file(eccc_grib_file, config): "GRIB_Ny": ny, } ) - ssc_grib_file = ( - f"{eccc_grib_file.parent / eccc_grib_file.stem}_SSC{eccc_grib_file.suffix}" - ) _xarray_to_grib(ssc_ds, ssc_grib_file) logger.debug(f"wrote GRIB file cropped to SalishSeaCast subdomain: {ssc_grib_file}") diff --git a/tests/workers/test_crop_gribs.py b/tests/workers/test_crop_gribs.py index 938cac38..4de4d622 100644 --- a/tests/workers/test_crop_gribs.py +++ b/tests/workers/test_crop_gribs.py @@ -105,17 +105,23 @@ def test_add_forecast_date_option(self, mock_worker): assert worker.cli.parser._actions[4].default == arrow.now().floor("day") assert worker.cli.parser._actions[4].help - def test_add_forecast_hour_option(self, mock_worker): + def test_add_backfill_option(self, mock_worker): worker = crop_gribs.main() - assert worker.cli.parser._actions[5].dest == "var_hour" - assert worker.cli.parser._actions[5].type == int + assert worker.cli.parser._actions[5].dest == "backfill" + assert worker.cli.parser._actions[5].default is False assert worker.cli.parser._actions[5].help - def test_add_variable_option(self, mock_worker): + def test_add_forecast_hour_option(self, mock_worker): worker = crop_gribs.main() - assert worker.cli.parser._actions[6].dest == "msc_var_name" + assert worker.cli.parser._actions[6].dest == "var_hour" + assert worker.cli.parser._actions[6].type == int assert worker.cli.parser._actions[6].help + def test_add_variable_option(self, mock_worker): + worker = crop_gribs.main() + assert worker.cli.parser._actions[7].dest == "msc_var_name" + assert worker.cli.parser._actions[7].help + class TestConfig: """Unit tests for production YAML config file elements related to worker.""" @@ -246,11 +252,11 @@ class TestCropGribs: @staticmethod @pytest.fixture def mock_calc_grib_file_paths(monkeypatch): - def mock_calc_grib_file_paths(*args): + def _mock_calc_grib_file_paths(*args): return set() monkeypatch.setattr( - crop_gribs, "_calc_grib_file_paths", mock_calc_grib_file_paths + crop_gribs, "_calc_grib_file_paths", _mock_calc_grib_file_paths ) @staticmethod @@ -271,7 +277,17 @@ def stop(self): monkeypatch.setattr(crop_gribs.watchdog.observers, "Observer", MockObserver) - def test_checklist( + @staticmethod + @pytest.fixture + def mock_write_ssc_grib_file(monkeypatch): + def _mock_write_ssc_grib_file(eccc_grib_file, config): + pass + + monkeypatch.setattr( + crop_gribs, "_write_ssc_grib_file", _mock_write_ssc_grib_file + ) + + def test_checklist_not_backfill( self, forecast, mock_calc_grib_file_paths, @@ -285,6 +301,7 @@ def test_checklist( parsed_args = SimpleNamespace( forecast=forecast, fcst_date=arrow.get("2023-08-11"), + backfill=False, var_hour=None, msc_var_name=None, ) @@ -295,6 +312,48 @@ def test_checklist( expected = {forecast: "cropped to SalishSeaCast subdomain"} assert checklist == expected + def test_checklist_backfill( + self, + forecast, + mock_write_ssc_grib_file, + config, + caplog, + monkeypatch, + ): + def _mock_calc_grib_file_paths(*args): + return { + f"forcing/atmospheric/continental2.5/GRIB/20231115/{forecast}/029/" + f"20231115T{forecast}Z_MSC_HRDPS_APCP_Sfc_RLatLon0.0225_PT029H.grib2", + } + + monkeypatch.setattr( + crop_gribs, "_calc_grib_file_paths", _mock_calc_grib_file_paths + ) + + grp_name = grp.getgrgid(os.getgid()).gr_name + monkeypatch.setitem(config, "file group", grp_name) + parsed_args = SimpleNamespace( + forecast=forecast, + fcst_date=arrow.get("2023-11-15"), + backfill=True, + var_hour=None, + msc_var_name=None, + ) + caplog.set_level(logging.DEBUG) + + checklist = crop_gribs.crop_gribs(parsed_args, config) + + assert caplog.records[1].levelname == "INFO" + expected = ( + f"finished cropping ECCC grib file to SalishSeaCast subdomain: " + f"forcing/atmospheric/continental2.5/GRIB/20231115/{forecast}/029/" + f"20231115T{forecast}Z_MSC_HRDPS_APCP_Sfc_RLatLon0.0225_PT029H.grib2" + ) + assert caplog.messages[1] == expected + + expected = {forecast: "cropped to SalishSeaCast subdomain"} + assert checklist == expected + def test_observer_log_messages( self, forecast, @@ -309,6 +368,7 @@ def test_observer_log_messages( parsed_args = SimpleNamespace( forecast=forecast, fcst_date=arrow.get("2023-08-14"), + backfill=False, var_hour=None, msc_var_name=None, ) @@ -340,21 +400,16 @@ def test_observer_log_messages( def test_crop_one_file_log_messages( self, forecast, + mock_write_ssc_grib_file, mock_observer, config, caplog, monkeypatch, ): - def mock_write_ssc_grib_file(eccc_grib_file, config): - pass - - monkeypatch.setattr( - crop_gribs, "_write_ssc_grib_file", mock_write_ssc_grib_file - ) - parsed_args = SimpleNamespace( forecast=forecast, fcst_date=arrow.get("2023-08-14"), + backfill=False, var_hour=29, msc_var_name="APCP_Sfc", ) @@ -493,6 +548,31 @@ def _mock_xarray_to_grib(ssc_ds, ssc_grib_file): monkeypatch.setattr(crop_gribs, "_xarray_to_grib", _mock_xarray_to_grib) + def test_grib_file_exists_so_no_write(self, config, caplog, tmp_path): + grib_dir = tmp_path / config["weather"]["download"]["2.5 km"]["GRIB dir"] + grib_dir.mkdir(parents=True) + grib_file_dir = grib_dir / "20231115/12/001/" + grib_file_dir.mkdir(parents=True) + eccc_grib_file = Path( + grib_file_dir + / "20231115T12Z_MSC_HRDPS_UGRD_AGL-10m_RLatLon0.0225_PT001H.grib2" + ) + ssc_grib_file = Path( + grib_file_dir + / "20231115T12Z_MSC_HRDPS_UGRD_AGL-10m_RLatLon0.0225_PT001H_SSC.grib2" + ) + ssc_grib_file.write_bytes(b"") + caplog.set_level(logging.DEBUG) + + crop_gribs._write_ssc_grib_file(eccc_grib_file, config) + + assert caplog.records[0].levelname == "DEBUG" + expected = ( + f"cropping skipped because SalishSeaCast subdomain GRIB file exist: " + f"{grib_file_dir / '20231115T12Z_MSC_HRDPS_UGRD_AGL-10m_RLatLon0.0225_PT001H_SSC.grib2'}" + ) + assert caplog.messages[0] == expected + def test_log_message( self, mock_open_dataset,