Skip to content

Commit

Permalink
Sequential vs Ensemble routing (#32)
Browse files Browse the repository at this point in the history
* specify sequential vs ensemble runoff in configs

* update docs and version number

* correct assertion explanation

* organize config validation
  • Loading branch information
rileyhales authored Jun 25, 2024
1 parent 045da22 commit 57776ec
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 20 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ graph LR
end
subgraph "Compute-Options"
co1["Routing Timestep\nOutflow Timestep"]
co1["Routing Timestep\nOutflow Timestep\nRunoff Type\nRouting Method"]
end
subgraph "Initialization"
Expand Down Expand Up @@ -134,6 +134,7 @@ graph LR
| dt_routing | True | Integer | Compute Options | Time interval in seconds between routing computations. |
| dt_outflows | False | Integer | Compute Options | Time interval in seconds between writing flows to disc. |
| routing | False | String | Compute Options | The routing method to use: "linear" or "nonlinear". |
| runoff_type | False | String | Compute Options | Specify if runoff files are "sequential" time steps or an "ensemble" |
| initial_state_file | False | File Path | Initialization Data | Path to the initial state file. |
| final_state_file | False | File Path | Initialization Data | Path where the final state file should be saved. |
| log | False | Boolean | Logging Options | Whether to enable logging. |
Expand All @@ -144,7 +145,7 @@ graph LR
| var_runoff_volume | False | String | File Management | Name of the variable in files containing runoff volumes |
| var_river_id | False | String | File Management | Name of the variable in all files that contains the river IDs. |
| var_outflow | False | String | File Management | Name of the variable in the outflows file that contains the outflows. |
| solver_atol | False | Float | Solver Options | Absolute tolerance for the solver. |
| solver_atol | False | Float | Solver Options | Absolute tolerance for the solver. |
## Input File Schemas

### Routing Parameters
Expand Down
1 change: 1 addition & 0 deletions config_files/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var_river_id: 'river_id'
var_outflow: 'Q'
# Compute Options - Optional
routing: 'linear'
runoff_type: 'sequential'
dt_routing: 0
dt_outflows: 0
# Solver Options - Optional
Expand Down
1 change: 1 addition & 0 deletions config_files/descriptions.csv
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ outflow_file,string,Path where the outflows file should be saved.
dt_routing,number,Time interval in seconds between routing computations.
dt_outflows,number,Time interval in seconds between writing flows to disc.
routing,string,Either 'linear' or 'nonlinear' routing- default 'linear'.
runoff_type,string,Type of runoff data: either sequential or ensemble.
solver_atol,number,Absolute tolerance for the solver.
initial_state_file,string,Path to the file with initial state values.
final_state_file,string,Path to the file with final state values.
Expand Down
36 changes: 19 additions & 17 deletions river_route/_MuskingumCunge.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ def set_configs(self, config_file, **kwargs) -> None:
self.conf['var_river_id'] = self.conf.get('var_river_id', 'river_id')
self.conf['var_discharge'] = self.conf.get('var_discharge', 'Q')

# routing and solver options - time is validated at route time
# compute, routing, solver options (time is validated separately at compute step)
assert 'routing_params_file' in self.conf, 'Requires routing params file'
self.conf['routing'] = self.conf.get('routing', 'linear')
assert self.conf['routing'] in ['linear', 'nonlinear'], 'Routing method not recognized'

# update solver options if given
self.conf['runoff_type'] = self.conf.get('runoff_type', 'sequential')
assert self.conf['runoff_type'] in ['sequential', 'ensemble'], 'Runoff type not recognized'
self._solver_atol = self.conf.get('solver_atol', self._solver_atol)
if 'solver_atol' in self.conf:
del self.conf['solver_atol']
Expand All @@ -129,10 +129,6 @@ def set_configs(self, config_file, **kwargs) -> None:
log_destination = self.conf.get('log_stream', 'stdout')
log_format = self.conf.get('log_format', '%(asctime)s - %(levelname)s - %(message)s')

# create a new log level 1 point above INFO, between INFO and CRITICAL
logging.addLevelName(logging.INFO + 5, 'CALIBRATE')
setattr(LOG, 'calibrate', lambda msg, *args: LOG._log(logging.INFO + 5, msg, args))

if log_destination == 'stdout':
LOG.addHandler(logging.StreamHandler(sys.stdout))
elif log_destination == 'stderr':
Expand Down Expand Up @@ -171,14 +167,15 @@ def _set_linear_routing_params(self) -> None:
self.x = pd.read_parquet(self.conf['routing_params_file'], columns=['x', ]).values.flatten()
return

def _read_initial_state(self) -> (np.array, np.array):
def _read_initial_state(self) -> None:
if hasattr(self, 'initial_state'):
return self.initial_state
return

state_file = self.conf.get('initial_state_file', '')
if state_file == '':
LOG.debug('Setting initial state to zeros')
return np.zeros(self.A.shape[0]), np.zeros(self.A.shape[0])
self.initial_state = (np.zeros(self.A.shape[0]), np.zeros(self.A.shape[0]))
return
assert os.path.exists(state_file), FileNotFoundError(f'Initial state file not found at: {state_file}')
LOG.debug('Reading Initial State from Parquet')
initial_state = pd.read_parquet(state_file).values
Expand Down Expand Up @@ -335,9 +332,7 @@ def route(self, **kwargs) -> 'MuskingumCunge':
self._set_time_params(dates)
self._set_muskingum_coefficients()
runoffs = runoffs / self.dt_runoff # convert volume -> volume/time
LOG.debug('Getting initial value arrays')
q_t, r_t = self._read_initial_state()
outflow_array = self._router(dates, runoffs, q_t, r_t)
outflow_array = self._router(dates, runoffs)

if self.dt_outflow > self.dt_runoff:
LOG.info('Resampling dates and outflows to specified timestep')
Expand Down Expand Up @@ -456,7 +451,11 @@ def calibrate(self, observed: pd.DataFrame, overwrite_params_file: bool = False)
self._set_linear_routing_params()
return self

def _router(self, dates: np.array, runoffs: np.array, q_init: np.array, r_init: np.array, ) -> np.array:
def _router(self, dates: np.array, runoffs: np.array) -> np.array:
LOG.debug('Getting initial state arrays')
self._read_initial_state()
q_init, r_init = self.initial_state

# set initial values
self._set_lhs_matrix()
outflow_array = np.zeros((runoffs.shape[0], self.A.shape[0]))
Expand Down Expand Up @@ -485,9 +484,13 @@ def _router(self, dates: np.array, runoffs: np.array, q_init: np.array, r_init:
# Enforce positive flows in case of negative solver results
outflow_array[outflow_array < 0] = 0

# if simulation type is ensemble, then do not overwrite the initial state
if self.conf['runoff_type'] == 'sequential':
LOG.debug('Updating Initial State for Next Sequential Computation')
self.initial_state = (q_t, r_prev)

t2 = datetime.datetime.now()
LOG.info(f'Routing completed in {(t2 - t1).total_seconds()} seconds')
self.initial_state = (q_t, r_prev)
return outflow_array

def _solver(self, rhs: np.array, q_t: np.array) -> np.array:
Expand Down Expand Up @@ -530,8 +533,7 @@ def _calibration_objective(self,
self._set_time_params(dates)
self._set_muskingum_coefficients(k=k, x=x)
runoffs = runoffs / self.dt_runoff # convert volume -> volume/time
q_t, r_t = self._read_initial_state()
outflow_array = self._router(dates, runoffs, q_t, r_t)[:, obs_idxs]
outflow_array = self._router(dates, runoffs)[:, obs_idxs]
if iter_df.empty:
iter_df = pd.DataFrame(outflow_array, index=dates, columns=observed.columns)
else:
Expand Down
2 changes: 1 addition & 1 deletion river_route/__metadata__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '0.11.0'
__version__ = '0.12.0'
__author__ = 'Riley Hales PhD'
__url__ = 'https://github.com/rileyhales/river-route'

0 comments on commit 57776ec

Please sign in to comment.