Skip to content

Commit

Permalink
Allow grid based basin and water user definitions and facilitate subs…
Browse files Browse the repository at this point in the history
…etting of subgrid-df (#336)

* Allow xr.Dataset and xr.DataArray as basin definition 

* Use meta_label in subgrid-df to deal with stacked subgrid elements

---------

Co-authored-by: Hendrik Kok <[email protected]>
  • Loading branch information
Huite and HendrikKok authored Oct 25, 2024
1 parent a0c23f7 commit c0c5e33
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 104 deletions.
49 changes: 33 additions & 16 deletions pre-processing/primod/driver_coupling/ribameta.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import imod
import numpy as np
import ribasim
import xarray as xr
from imod.msw import GridData, MetaSwapModel, Sprinkling

from primod.driver_coupling.driver_coupling_base import DriverCoupling
Expand All @@ -22,14 +23,14 @@ class RibaMetaDriverCoupling(DriverCoupling):
Attributes
----------
basin_definition: gpd.GeoDataFrame
GeoDataFrame of basin polygons
user_demand_definition: gpd.GeoDataFrame
GeoDataFrame of user demand polygons
basin_definition: gpd.GeoDataFrame | xr.DataArray
GeoDataFrame of basin polygons or xr.DataArray with gridded basin nodes
user_demand_definition: gpd.GeoDataFrame| xr.DataArray
GeoDataFrame of user demand polygons or xr.DataArray with gridded demand nodes
"""

ribasim_basin_definition: gpd.GeoDataFrame
ribasim_user_demand_definition: gpd.GeoDataFrame | None = None
ribasim_basin_definition: gpd.GeoDataFrame | xr.DataArray
ribasim_user_demand_definition: gpd.GeoDataFrame | xr.DataArray | None = None

def _check_sprinkling(self, msw_model: MetaSwapModel) -> bool:
sprinkling_key = msw_model._get_pkg_key(Sprinkling, optional_package=True)
Expand Down Expand Up @@ -60,11 +61,19 @@ def derive_mapping(
basin_ids = _validate_node_ids(
ribasim_model.basin.node.df, self.ribasim_basin_definition
)
gridded_basin = imod.prepare.rasterize(
self.ribasim_basin_definition,
like=svat.isel(subunit=0, drop=True),
column="node_id",
)
if isinstance(self.ribasim_basin_definition, gpd.GeoDataFrame):
gridded_basin = imod.prepare.rasterize(
self.ribasim_basin_definition,
like=svat.isel(subunit=0, drop=True),
column="node_id",
)
elif isinstance(self.ribasim_basin_definition, xr.DataArray):
gridded_basin = self.ribasim_basin_definition
else:
raise TypeError(
"Expected geopandas.GeoDataFrame or xr.Dataset: "
f"received {type(self.ribasim_basin_definition.__name__)}"
)
svat_basin_mapping = SvatBasinMapping(
name="msw_ponding",
gridded_basin=gridded_basin,
Expand All @@ -77,11 +86,19 @@ def derive_mapping(
user_demand_ids = _validate_node_ids(
ribasim_model.user_demand.node.df, self.ribasim_user_demand_definition
)
gridded_user_demand = imod.prepare.rasterize(
self.ribasim_user_demand_definition,
like=svat.isel(subunit=0, drop=True),
column="node_id",
)
if isinstance(self.ribasim_user_demand_definition, gpd.GeoDataFrame):
gridded_user_demand = imod.prepare.rasterize(
self.ribasim_user_demand_definition,
like=svat.isel(subunit=0, drop=True),
column="node_id",
)
elif isinstance(self.ribasim_user_demand_definition, xr.DataArray):
gridded_user_demand = self.ribasim_user_demand_definition
else:
raise TypeError(
"Expected geopandas.GeoDataFrame or xr.Dataset: "
f"received {type(self.ribasim_user_demand_definition.__name__)}" # type: ignore
)
# sprinkling surface water for subsection of svats determined in 'sprinkling'
swspr_grid_data = copy.deepcopy(msw_model[grid_data_key])
nsu = swspr_grid_data.dataset["area"].sizes["subunit"]
Expand Down
182 changes: 111 additions & 71 deletions pre-processing/primod/driver_coupling/ribamod.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ class RibaModDriverCoupling(DriverCoupling, abc.ABC):
----------
mf6_model: str
The name of the GWF model
ribasim_basin_definition : gpd.GeoDataFrame
GeoDataFrame of basin polygons
ribasim_basin_definition : gpd.GeoDataFrame | xr.Dataset
* GeoDataFrame: basin polygons
* Dataset: mapping of mf6 package name to grid containing basin IDs.
mf6_packages : list of str
A list of river or drainage packages.
"""

mf6_model: str
ribasim_basin_definition: gpd.GeoDataFrame
ribasim_basin_definition: gpd.GeoDataFrame | xr.Dataset
mf6_packages: list[str]
subgrid_id_range: pd.DataFrame | None = None

@abc.abstractmethod
def derive_mapping(
Expand Down Expand Up @@ -69,93 +71,131 @@ def write_exchanges(

dis = gwf_model[gwf_model._get_pkgkey("dis")]
basin_definition = self.ribasim_basin_definition
# Validate and fetch
basin_ids = _validate_node_ids(ribasim_model.basin.node.df, basin_definition)

# Spatial overlays of MF6 boundaries with basin polygons.
gridded_basin = imod.prepare.rasterize(
basin_definition,
like=dis["idomain"].isel(layer=0, drop=True),
column="node_id",
)
if isinstance(basin_definition, gpd.GeoDataFrame):
# Validate and fetch
basin_ids = _validate_node_ids(
ribasim_model.basin.node.df, basin_definition
)
gridded_basin = imod.prepare.rasterize(
basin_definition,
like=dis["idomain"].isel(layer=0, drop=True),
column="node_id",
)
basin_dataset = xr.Dataset(
{key: gridded_basin for key in self.mf6_packages}
)
elif isinstance(basin_definition, xr.Dataset):
basin_ids = _validate_node_ids(
ribasim_model.basin.node.df, basin_definition[self.mf6_packages]
)
basin_dataset = basin_definition
else:
raise TypeError(
"Expected geopandas.GeoDataFrame or xr.Dataset: "
f"received {type(basin_definition.__name__)}"
)

# Collect which Ribasim basins are coupled, so that we can set their
# drainage and infiltration to NoData later on.
coupling_dict = self._empty_coupling_dict()
coupling_dict["mf6_model"] = self.mf6_model
coupled_basin_indices = []
for key in self.mf6_packages:
package = gwf_model[key]
mapping = self.derive_mapping(
name=key,
gridded_basin=gridded_basin,
basin_ids=basin_ids,
conductance=package["conductance"],
ribasim_model=ribasim_model,
for gwf_package_key in self.mf6_packages:
gridded_basin = basin_dataset[gwf_package_key]
filename, destination, basin_index = self._write_exchange(
directory,
gwf_model,
gwf_package_key,
ribasim_model,
basin_ids,
gridded_basin,
)
if mapping.dataframe.empty:
raise ValueError(
f"No coupling can be derived for MODFLOW 6 package: {key}. "
"No spatial overlap exists between the basin_definition and this package."
)
coupling_dict[f"mf6_{self._prefix}_{destination}_packages"][
gwf_package_key
] = filename
coupled_basin_indices.append(basin_index)

if isinstance(package, imod.mf6.River):
# Add a MF6 API package for correction flows
gwf_model["api_" + key] = imod.mf6.ApiPackage(
maxbound=package._max_active_n(),
save_flows=True,
)
destination = "river"
coupled_basin_indices = np.unique(np.concatenate(coupled_basin_indices))
coupled_basin_node_ids = basin_ids[coupled_basin_indices]

# check if not coupling passive when adding a river package
if isinstance(self, RibaModPassiveDriverCoupling):
raise TypeError(
f"Expected Drainage packages for passive coupling, received: {type(package).__name__}"
)
_nullify_ribasim_exchange_input(
ribasim_component=ribasim_model.basin,
coupled_node_ids=coupled_basin_node_ids,
columns=["infiltration", "drainage"],
)
return coupling_dict

def _write_exchange(
self,
directory: Path,
gwf_model: imod.mf6.GroundwaterFlowModel,
gwf_package_key: str,
ribasim_model: ribasim.Model,
basin_ids: pd.Series,
gridded_basin: xr.DataArray,
) -> tuple[str, str, pd.DataFrame]:
package = gwf_model[gwf_package_key]
mapping = self.derive_mapping(
name=gwf_package_key,
gridded_basin=gridded_basin,
basin_ids=basin_ids,
conductance=package["conductance"],
ribasim_model=ribasim_model,
)
if mapping.dataframe.empty:
raise ValueError(
f"No coupling can be derived for MODFLOW 6 package: {gwf_package_key}. "
"No spatial overlap exists between the basin_definition and this package."
)

if isinstance(package, imod.mf6.River):
# Add a MF6 API package for correction flows
gwf_model["api_" + gwf_package_key] = imod.mf6.ApiPackage(
maxbound=package._max_active_n(),
save_flows=True,
)
destination = "river"
# check if not coupling passive when adding a river package
if isinstance(self, RibaModPassiveDriverCoupling):
raise TypeError(
f"Expected Drainage packages for passive coupling, received: {type(package).__name__}"
)
# in active coupling, check subgrid levels versus modflow bottom elevation
if isinstance(self, RibaModActiveDriverCoupling):
# check on the bottom elevation and ribasim minimal subgrid level
minimum_subgrid_level = (
ribasim_model.basin.subgrid.df.groupby("subgrid_id")
ribasim_model.basin.subgrid.df.groupby("subgrid_id") # type: ignore
.min()["subgrid_level"]
.to_numpy()
)
# in active coupling, check subgrid levels versus modflow bottom elevation
if isinstance(self, RibaModActiveDriverCoupling):
subgrid_index = mapping.dataframe["subgrid_index"]
bound_index = mapping.dataframe["bound_index"]
bottom_elevation = package["bottom_elevation"].to_numpy()
if np.any(
subgrid_index = mapping.dataframe["subgrid_index"]
bound_index = mapping.dataframe["bound_index"]
bottom_elevation = package["bottom_elevation"].to_numpy()
if np.any(
bottom_elevation[np.isfinite(bottom_elevation)][bound_index]
< minimum_subgrid_level[subgrid_index]
):
index = np.flatnonzero(
bottom_elevation[np.isfinite(bottom_elevation)][bound_index]
< minimum_subgrid_level[subgrid_index]
):
raise ValueError(
f"Found bottom elevation below minimum subgrid level of Ribasim, for MODFLOW 6 package: {key}."
)
elif isinstance(package, imod.mf6.Drainage):
destination = "drainage"
else:
if isinstance(self, RibaModPassiveDriverCoupling):
raise TypeError(
f"Expected Drainage, received: {type(package).__name__}"
)
else:
raise TypeError(
f"Expected River or Drainage, received: {type(package).__name__}"
values = bound_index[index].to_numpy()
raise ValueError(
f"Found bottom elevation below minimum subgrid level of Ribasim, for MODFLOW 6 package: {gwf_package_key}, for folowing elements: {values}. "
)
elif isinstance(package, imod.mf6.Drainage):
destination = "drainage"
else:
if isinstance(self, RibaModPassiveDriverCoupling):
raise TypeError(
f"Expected Drainage, received: {type(package).__name__}"
)
else:
raise TypeError(
f"Expected River or Drainage, received: {type(package).__name__}"
)

filename = mapping.write(directory=directory)
coupling_dict[f"mf6_{self._prefix}_{destination}_packages"][key] = filename
coupled_basin_indices.append(mapping.dataframe["basin_index"])

coupled_basin_indices = np.unique(np.concatenate(coupled_basin_indices))
coupled_basin_node_ids = basin_ids[coupled_basin_indices]

_nullify_ribasim_exchange_input(
ribasim_component=ribasim_model.basin,
coupled_node_ids=coupled_basin_node_ids,
columns=["infiltration", "drainage"],
)
return coupling_dict
filename = mapping.write(directory=directory)
return filename, destination, mapping.dataframe["basin_index"]


class RibaModActiveDriverCoupling(RibaModDriverCoupling):
Expand Down
26 changes: 18 additions & 8 deletions pre-processing/primod/driver_coupling/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,29 @@ def _get_gwf_modelnames(mf6_simulation: Modflow6Simulation) -> list[str]:


def _validate_node_ids(
dataframe: pd.DataFrame, definition: gpd.GeoDataFrame
dataframe: pd.DataFrame, definition: gpd.GeoDataFrame | xr.Dataset | xr.DataArray
) -> pd.Series:
# Validate
if "node_id" not in definition.columns:
raise ValueError(
'Definition must contain "node_id" column.'
f"Columns in dataframe: {definition.columns}"
)
if isinstance(definition, xr.Dataset):
data_vars = list(definition.data_vars.keys())
nodes = np.unique(definition.to_dataframe()[data_vars])
node_id = nodes[np.isfinite(nodes)]
elif isinstance(definition, xr.DataArray):
definition.name = "basin_definition"
nodes = definition.to_dataset().to_dataframe()["basin_definition"]
node_id = nodes[np.isfinite(nodes)]
else:
if "node_id" not in definition.columns:
raise ValueError(
'Definition must contain "node_id" column.'
f"Columns in dataframe: {definition.columns}"
)
node_id = definition["node_id"].to_numpy()

basin_ids: NDArray[Int] = np.unique(dataframe.index)
missing = ~np.isin(definition["node_id"], basin_ids)
missing = ~np.isin(node_id, basin_ids)
if missing.any():
missing_nodes = definition["node_id"].to_numpy()[missing]
missing_nodes = node_id[missing]
raise ValueError(
"The node IDs of these nodes in definition do not "
f"occur in the Ribasim model: {missing_nodes}"
Expand Down
Loading

0 comments on commit c0c5e33

Please sign in to comment.