Skip to content

Commit

Permalink
Add --backfill option to crop_gribs worker (#216)
Browse files Browse the repository at this point in the history
`crop_gribs --backfill` option is for use for recovery from automation
failures. Additionally, added a condition to skip cropping the GRIB files when
SalishSeaCast subdomain files exist, which is reported as a debug message. That
will optimize the cropping process by avoiding redundancy. Tests were updated to
reflect these changes.
  • Loading branch information
douglatornell authored Nov 15, 2023
1 parent 9ef8e08 commit 1c05cde
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 20 deletions.
34 changes: 29 additions & 5 deletions nowcast/workers/crop_gribs.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,23 @@ def main():
default=arrow.now().floor("day"),
help="Forecast date to crop files in.",
)
worker.cli.add_argument(
"--backfill",
action="store_true",
help="""
Crop files immediately instead of waiting for file system events to signal their
existence. This is intended for use in recovery from automation failures.
""",
)
worker.cli.add_argument(
"--var-hour",
type=int,
help="forecast hour to crop file for specific variable in; must be used with --var",
help="Forecast hour to crop file for specific variable in; must be used with --var",
)
worker.cli.add_argument(
"--var",
dest="msc_var_name",
help="forecast variable to crop file for; must be used with --var-hour",
help="Forecast variable to crop file for; must be used with --var-hour",
)
worker.run(crop_gribs, success, failure)
return worker
Expand All @@ -96,6 +104,7 @@ def crop_gribs(parsed_args, config, *args):
checklist = {}
fcst_hr = parsed_args.forecast
fcst_date = parsed_args.fcst_date
backfill = parsed_args.backfill
var_hour = parsed_args.var_hour
msc_var_name = parsed_args.msc_var_name
logger.info(
Expand All @@ -117,6 +126,15 @@ def crop_gribs(parsed_args, config, *args):
msc_var_name,
)

if backfill:
for eccc_grib_file in eccc_grib_files:
_write_ssc_grib_file(eccc_grib_file, config)
logger.info(
f"finished cropping ECCC grib file to SalishSeaCast subdomain: {eccc_grib_file}"
)
checklist[fcst_hr] = "cropped to SalishSeaCast subdomain"
return checklist

if msc_var_name and var_hour:
# Crop a single variable-hour file
eccc_grib_file = eccc_grib_files.pop()
Expand Down Expand Up @@ -203,6 +221,15 @@ def _write_ssc_grib_file(eccc_grib_file, config):
:param :py:class:`pathlib.Path` eccc_grib_file:
:param :py:class:`nemo_nowcast.Config` config:
"""
ssc_grib_file = (
f"{eccc_grib_file.parent / eccc_grib_file.stem}_SSC{eccc_grib_file.suffix}"
)
if Path(ssc_grib_file).exists():
logger.debug(
f"cropping skipped because SalishSeaCast subdomain GRIB file exist: {ssc_grib_file}"
)
return

y_min, y_max = config["weather"]["download"]["2.5 km"]["lat indices"]
x_min, x_max = config["weather"]["download"]["2.5 km"]["lon indices"]
# We need 1 point more than the final domain size to facilitate calculation of the
Expand Down Expand Up @@ -231,9 +258,6 @@ def _write_ssc_grib_file(eccc_grib_file, config):
"GRIB_Ny": ny,
}
)
ssc_grib_file = (
f"{eccc_grib_file.parent / eccc_grib_file.stem}_SSC{eccc_grib_file.suffix}"
)
_xarray_to_grib(ssc_ds, ssc_grib_file)

logger.debug(f"wrote GRIB file cropped to SalishSeaCast subdomain: {ssc_grib_file}")
Expand Down
110 changes: 95 additions & 15 deletions tests/workers/test_crop_gribs.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,23 @@ def test_add_forecast_date_option(self, mock_worker):
assert worker.cli.parser._actions[4].default == arrow.now().floor("day")
assert worker.cli.parser._actions[4].help

def test_add_forecast_hour_option(self, mock_worker):
def test_add_backfill_option(self, mock_worker):
worker = crop_gribs.main()
assert worker.cli.parser._actions[5].dest == "var_hour"
assert worker.cli.parser._actions[5].type == int
assert worker.cli.parser._actions[5].dest == "backfill"
assert worker.cli.parser._actions[5].default is False
assert worker.cli.parser._actions[5].help

def test_add_variable_option(self, mock_worker):
def test_add_forecast_hour_option(self, mock_worker):
worker = crop_gribs.main()
assert worker.cli.parser._actions[6].dest == "msc_var_name"
assert worker.cli.parser._actions[6].dest == "var_hour"
assert worker.cli.parser._actions[6].type == int
assert worker.cli.parser._actions[6].help

def test_add_variable_option(self, mock_worker):
worker = crop_gribs.main()
assert worker.cli.parser._actions[7].dest == "msc_var_name"
assert worker.cli.parser._actions[7].help


class TestConfig:
"""Unit tests for production YAML config file elements related to worker."""
Expand Down Expand Up @@ -246,11 +252,11 @@ class TestCropGribs:
@staticmethod
@pytest.fixture
def mock_calc_grib_file_paths(monkeypatch):
def mock_calc_grib_file_paths(*args):
def _mock_calc_grib_file_paths(*args):
return set()

monkeypatch.setattr(
crop_gribs, "_calc_grib_file_paths", mock_calc_grib_file_paths
crop_gribs, "_calc_grib_file_paths", _mock_calc_grib_file_paths
)

@staticmethod
Expand All @@ -271,7 +277,17 @@ def stop(self):

monkeypatch.setattr(crop_gribs.watchdog.observers, "Observer", MockObserver)

def test_checklist(
@staticmethod
@pytest.fixture
def mock_write_ssc_grib_file(monkeypatch):
def _mock_write_ssc_grib_file(eccc_grib_file, config):
pass

monkeypatch.setattr(
crop_gribs, "_write_ssc_grib_file", _mock_write_ssc_grib_file
)

def test_checklist_not_backfill(
self,
forecast,
mock_calc_grib_file_paths,
Expand All @@ -285,6 +301,7 @@ def test_checklist(
parsed_args = SimpleNamespace(
forecast=forecast,
fcst_date=arrow.get("2023-08-11"),
backfill=False,
var_hour=None,
msc_var_name=None,
)
Expand All @@ -295,6 +312,48 @@ def test_checklist(
expected = {forecast: "cropped to SalishSeaCast subdomain"}
assert checklist == expected

def test_checklist_backfill(
self,
forecast,
mock_write_ssc_grib_file,
config,
caplog,
monkeypatch,
):
def _mock_calc_grib_file_paths(*args):
return {
f"forcing/atmospheric/continental2.5/GRIB/20231115/{forecast}/029/"
f"20231115T{forecast}Z_MSC_HRDPS_APCP_Sfc_RLatLon0.0225_PT029H.grib2",
}

monkeypatch.setattr(
crop_gribs, "_calc_grib_file_paths", _mock_calc_grib_file_paths
)

grp_name = grp.getgrgid(os.getgid()).gr_name
monkeypatch.setitem(config, "file group", grp_name)
parsed_args = SimpleNamespace(
forecast=forecast,
fcst_date=arrow.get("2023-11-15"),
backfill=True,
var_hour=None,
msc_var_name=None,
)
caplog.set_level(logging.DEBUG)

checklist = crop_gribs.crop_gribs(parsed_args, config)

assert caplog.records[1].levelname == "INFO"
expected = (
f"finished cropping ECCC grib file to SalishSeaCast subdomain: "
f"forcing/atmospheric/continental2.5/GRIB/20231115/{forecast}/029/"
f"20231115T{forecast}Z_MSC_HRDPS_APCP_Sfc_RLatLon0.0225_PT029H.grib2"
)
assert caplog.messages[1] == expected

expected = {forecast: "cropped to SalishSeaCast subdomain"}
assert checklist == expected

def test_observer_log_messages(
self,
forecast,
Expand All @@ -309,6 +368,7 @@ def test_observer_log_messages(
parsed_args = SimpleNamespace(
forecast=forecast,
fcst_date=arrow.get("2023-08-14"),
backfill=False,
var_hour=None,
msc_var_name=None,
)
Expand Down Expand Up @@ -340,21 +400,16 @@ def test_observer_log_messages(
def test_crop_one_file_log_messages(
self,
forecast,
mock_write_ssc_grib_file,
mock_observer,
config,
caplog,
monkeypatch,
):
def mock_write_ssc_grib_file(eccc_grib_file, config):
pass

monkeypatch.setattr(
crop_gribs, "_write_ssc_grib_file", mock_write_ssc_grib_file
)

parsed_args = SimpleNamespace(
forecast=forecast,
fcst_date=arrow.get("2023-08-14"),
backfill=False,
var_hour=29,
msc_var_name="APCP_Sfc",
)
Expand Down Expand Up @@ -493,6 +548,31 @@ def _mock_xarray_to_grib(ssc_ds, ssc_grib_file):

monkeypatch.setattr(crop_gribs, "_xarray_to_grib", _mock_xarray_to_grib)

def test_grib_file_exists_so_no_write(self, config, caplog, tmp_path):
grib_dir = tmp_path / config["weather"]["download"]["2.5 km"]["GRIB dir"]
grib_dir.mkdir(parents=True)
grib_file_dir = grib_dir / "20231115/12/001/"
grib_file_dir.mkdir(parents=True)
eccc_grib_file = Path(
grib_file_dir
/ "20231115T12Z_MSC_HRDPS_UGRD_AGL-10m_RLatLon0.0225_PT001H.grib2"
)
ssc_grib_file = Path(
grib_file_dir
/ "20231115T12Z_MSC_HRDPS_UGRD_AGL-10m_RLatLon0.0225_PT001H_SSC.grib2"
)
ssc_grib_file.write_bytes(b"")
caplog.set_level(logging.DEBUG)

crop_gribs._write_ssc_grib_file(eccc_grib_file, config)

assert caplog.records[0].levelname == "DEBUG"
expected = (
f"cropping skipped because SalishSeaCast subdomain GRIB file exist: "
f"{grib_file_dir / '20231115T12Z_MSC_HRDPS_UGRD_AGL-10m_RLatLon0.0225_PT001H_SSC.grib2'}"
)
assert caplog.messages[0] == expected

def test_log_message(
self,
mock_open_dataset,
Expand Down

0 comments on commit 1c05cde

Please sign in to comment.