Skip to content
This repository has been archived by the owner on Jun 2, 2023. It is now read-only.

Commit

Permalink
Merge pull request #104 from USGS-R/adding_da_to_models
Browse files Browse the repository at this point in the history
Adding State Updating and # of Tasks to LSTM and RGCN models
  • Loading branch information
jzwart authored Jun 7, 2021
2 parents f3e7c38 + 8362981 commit ec2d9b9
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 298 deletions.
13 changes: 5 additions & 8 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@ from river_dl.evaluate import combined_metrics
from river_dl.postproc_utils import plot_obs
from river_dl.predict import predict_from_io_data
from river_dl.train import train_model
from river_dl import loss_functions as lf

out_dir = config['out_dir']
code_dir = config['code_dir']
loss_function = lf.multitask_rmse(config['lambdas'])

rule all:
input:
expand("{outdir}/{metric_type}_metrics.csv",
outdir=out_dir,
metric_type=['overall', 'month', 'reach', 'month_reach'],
),
expand( "{outdir}/{plt_variable}_{partition}.png",
outdir=out_dir,
plt_variable=['temp', 'flow'],
partition=['trn', 'val'],
),

rule prep_io_data:
input:
Expand Down Expand Up @@ -62,7 +59,7 @@ rule prep_io_data:
# shell:
# """
# module load analytics cuda10.1/toolkit/10.1.105
# run_training -e /home/jsadler/.conda/envs/rgcn --no-node-list "python {code_dir}/train_model.py -o {params.run_dir} -i {input[0]} -p {params.pt_epochs} -f {params.ft_epochs} --lamb {params.lamb} --model rgcn -s 135"
# run_training -e /home/jsadler/.conda/envs/rgcn --no-node-list "python {code_dir}/train_model.py -o {params.run_dir} -i {input[0]} -p {params.pt_epochs} -f {params.ft_epochs} --lambdas {params.lamb} --loss_func multitask_rmse --model rgcn -s 135"
# """


Expand All @@ -79,7 +76,7 @@ rule train_model_local_or_cpu:
run_dir=lambda wildcards, output: os.path.split(output[0][:-1])[0],
run:
train_model(input[0], config['pt_epochs'], config['ft_epochs'], config['hidden_size'],
params.run_dir, model_type='rgcn', lamb=config['lamb'])
loss_func=loss_function, out_dir=params.run_dir, model_type='rgcn', num_tasks=2)

rule make_predictions:
input:
Expand All @@ -93,7 +90,7 @@ rule make_predictions:
predict_from_io_data(model_type='rgcn', model_weights_dir=model_dir,
hidden_size=config['hidden_size'], io_data=input[1],
partition=wildcards.partition, outfile=output[0],
logged_q=False)
logged_q=False, num_tasks=2)


def get_grp_arg(wildcards):
Expand Down
11 changes: 5 additions & 6 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# Input files
obs_flow: "/home/jsadler/drb_data/obs_flow_full_raw"
obs_temp: "/home/jsadler/drb_data/obs_temp_full"
sntemp_file: "/home/jsadler/drb_data/uncal_sntemp_input_output"
catchment_attr: "/home/jsadler/drb_data/seg_attr_drb.feather"
dist_matrix: "/home/jsadler/drb_data/distance_matrix.npz"
obs_flow: "../drb-dl-model/data/in/obs_flow_subset"
obs_temp: "../drb-dl-model/data/in/obs_temp_subset"
sntemp_file: "../drb-dl-model/data/in/uncal_sntemp_input_output_subset"
dist_matrix: "../drb-dl-model/data/in/distance_matrix_subset.npz"

out_dir: "test_val_functionality"
code_dir: "/home/jsadler/river-dl/river_dl"

x_vars: ["seg_rain", "seg_tave_air", "seginc_swrad", "seg_length", "seginc_potet", "seg_slope", "seg_humid", "seg_elev"]
primary_variable: "flow"
lamb: 1
lambdas: [100, 100]

train_start_date:
- '1985-10-01'
Expand Down
136 changes: 71 additions & 65 deletions river_dl/RGCN.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,27 @@


class RGCN(layers.Layer):
def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None):
def __init__(
self, hidden_size, A, recurrent_dropout=0, dropout=0, rand_seed=None,
):
"""
:param hidden_size: [int] the number of hidden units
:param A: [numpy array] adjacency matrix
:param flow_in_temp: [bool] whether the flow predictions should feed
into the temp predictions
:param recurrent_dropout: [float] value between 0 and 1 for the
probability of a recurrent element to be zero
:param dropout: [float] value between 0 and 1 for the probability of an
input element to be zero
:param rand_seed: [int] the random seed for initialization
"""
super().__init__()
self.hidden_size = hidden_size
self.A = A.astype("float32")
self.flow_in_temp = flow_in_temp

# set up the layer
self.lstm = tf.keras.layers.LSTMCell(hidden_size)
self.lstm = tf.keras.layers.LSTMCell(
hidden_size, recurrent_dropout=recurrent_dropout, dropout=dropout
)

### set up the weights ###
w_initializer = tf.random_normal_initializer(
Expand Down Expand Up @@ -88,44 +93,15 @@ def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None):
shape=[hidden_size], initializer="zeros", name="b_c"
)

if self.flow_in_temp:
# was W2
self.W_out_flow = self.add_weight(
shape=[hidden_size, 1], initializer=w_initializer, name="W_out"
)
# was b2
self.b_out_flow = self.add_weight(
shape=[1], initializer="zeros", name="b_out"
)

self.W_out_temp = self.add_weight(
shape=[hidden_size + 1, 1],
initializer=w_initializer,
name="W_out",
)

self.b_out_temp = self.add_weight(
shape=[1], initializer="zeros", name="b_out"
)
else:
# was W2
self.W_out = self.add_weight(
shape=[hidden_size, 2], initializer=w_initializer, name="W_out"
)
# was b2
self.b_out = self.add_weight(
shape=[2], initializer="zeros", name="b_out"
)

@tf.function
def call(self, inputs, **kwargs):
graph_size = self.A.shape[0]
hidden_state_prev, cell_state_prev = (
tf.zeros([graph_size, self.hidden_size]),
tf.zeros([graph_size, self.hidden_size]),
)
out = []
h_list = []
c_list = []
n_steps = inputs.shape[1]
# set the initial h & c states to the supplied h and c states if using
# DA, or 0's otherwise
hidden_state_prev = tf.cast(kwargs["h_init"], tf.float32)
cell_state_prev = tf.cast(kwargs["c_init"], tf.float32)
for t in range(n_steps):
h_graph = tf.nn.tanh(
tf.matmul(
Expand Down Expand Up @@ -157,42 +133,72 @@ def call(self, inputs, **kwargs):
+ self.b_c
)

if self.flow_in_temp:
out_pred_q = (
tf.matmul(h_update, self.W_out_flow) + self.b_out_flow
)
out_pred_t = (
tf.matmul(
tf.concat([h_update, out_pred_q], axis=1),
self.W_out_temp,
)
+ self.b_out_temp
)
out_pred = tf.concat([out_pred_t, out_pred_q], axis=1)
else:
out_pred = tf.matmul(h_update, self.W_out) + self.b_out

out.append(out_pred)

hidden_state_prev = h_update
cell_state_prev = c_update
out = tf.stack(out)
out = tf.transpose(out, [1, 0, 2])
return out

h_list.append(h_update)
c_list.append(c_update)

h_list = tf.stack(h_list)
c_list = tf.stack(c_list)
h_list = tf.transpose(h_list, [1, 0, 2])
c_list = tf.transpose(c_list, [1, 0, 2])
return h_list, c_list


class RGCNModel(tf.keras.Model):
def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None):
def __init__(
self,
hidden_size,
A,
num_tasks=1,
recurrent_dropout=0,
dropout=0,
rand_seed=None,
):
"""
:param hidden_size: [int] the number of hidden units
:param A: [numpy array] adjacency matrix
:param flow_in_temp: [bool] whether the flow predictions should feed
:param num_tasks: [int] number of prediction tasks to perform -
currently supports either 1 or 2 prediction tasks
:param recurrent_dropout: [float] value between 0 and 1 for the
probability of a recurrent element to be zero
:param dropout: [float] value between 0 and 1 for the probability of an
input element to be zero
into the temp predictions
:param rand_seed: [int] the random seed for initialization
"""
super().__init__()
self.rgcn_layer = RGCN(hidden_size, A, flow_in_temp, rand_seed)
self.hidden_size = hidden_size
self.num_tasks = num_tasks
self.recurrent_dropout = recurrent_dropout
self.dropout = dropout

self.rgcn_layer = RGCN(
hidden_size, A, recurrent_dropout, dropout, rand_seed
)

self.states = None

self.dense_main = layers.Dense(1, name="dense_main")
if self.num_tasks == 2:
self.dense_aux = layers.Dense(1, name="dense_aux")

def call(self, inputs, **kwargs):
output = self.rgcn_layer(inputs)
return output
batch_size = inputs.shape[0]
h_init = kwargs.get("h_init", tf.zeros([batch_size, self.hidden_size]))
c_init = kwargs.get("c_init", tf.zeros([batch_size, self.hidden_size]))
h_gr, c_gr = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init)
self.states = h_gr[:, -1, :], c_gr[:, -1, :]

if self.num_tasks == 1:
main_prediction = self.dense_main(h_gr)
return main_prediction
elif self.num_tasks == 2:
main_prediction = self.dense_main(h_gr)
aux_prediction = self.dense_aux(h_gr)
return tf.concat([main_prediction, aux_prediction], axis=2)
else:
raise ValueError(
f"This model only supports 1 or 2 tasks (not {self.num_tasks})"
)
68 changes: 25 additions & 43 deletions river_dl/loss_functions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import numpy as np
import tensorflow as tf


Expand Down Expand Up @@ -69,55 +68,43 @@ def samplewise_nnse_loss(y_true, y_pred):
return 1 - nnse_val


def nnse_masked_one_var(data, y_pred, var_idx):
y_true, y_pred, weights = y_data_components(data, y_pred, var_idx)
return nnse_loss(y_true, y_pred)
def multitask_nse(lambdas):
return multitask_loss(lambdas, nnse_loss)


def nnse_one_var_samplewise(data, y_pred, var_idx):
y_true, y_pred, weights = y_data_components(data, y_pred, var_idx)
return samplewise_nnse_loss(y_true, y_pred)
def multitask_samplewise_nse(lambdas):
return multitask_loss(lambdas, samplewise_nnse_loss)


def y_data_components(data, y_pred, var_idx):
weights = data[:, :, -2:]
y_true = data[:, :, :-2]
def multitask_rmse(lambdas):
return multitask_loss(lambdas, rmse)

# ensure y_pred, weights, and y_true are all tensors the same data type
y_true = tf.convert_to_tensor(y_true)
weights = tf.convert_to_tensor(weights)
y_true = tf.cast(y_true, y_pred.dtype)
weights = tf.cast(weights, y_pred.dtype)

# make all zero-weighted observations 'nan' so they don't get counted
# at all in the loss calculation
y_true = tf.where(weights == 0, np.nan, y_true)
def multitask_kge(lambdas):
return multitask_loss(lambdas, kge_loss)

weights = weights[:, :, var_idx]
y_true = y_true[:, :, var_idx]
y_pred = y_pred[:, :, var_idx]
return y_true, y_pred, weights


def rmse_masked_one_var(data, y_pred, var_idx):
y_true, y_pred, weights = y_data_components(data, y_pred, var_idx)
return rmse(y_true, y_pred)


def weighted_masked_rmse(lamb=0.5):
def multitask_loss(lambdas, loss_func):
"""
calculate a weighted, masked rmse.
:param lamb: [float] (short for lambda). The factor that the auxiliary loss
will be multiplied by before added to the main loss.
calculate a weighted multi-task loss for a given number of variables with a
given loss function
:param lambdas: [array-like float] The factor that losses will be
multiplied by before being added together.
:param loss_func: [function] Loss function that will be used to calculate
the loss of each variable. Must take as input parameters [y_true, y_pred]
"""

def rmse_masked_combined(data, y_pred):
rmse_main = rmse_masked_one_var(data, y_pred, 0)
rmse_aux = rmse_masked_one_var(data, y_pred, 1)
rmse_loss = rmse_main + lamb * rmse_aux
return rmse_loss
def combine_loss(y_true, y_pred):
losses = []
n_vars = y_pred.shape[-1]
for var_id in range(n_vars):
ind_var_loss = loss_func(y_true[:, :, var_id], y_pred[:, :, var_id])
weighted_ind_var_loss = lambdas[var_id] * ind_var_loss
losses.append(weighted_ind_var_loss)
total_loss = sum(losses)
return total_loss

return rmse_masked_combined
return combine_loss


def mean_masked(y):
Expand Down Expand Up @@ -181,10 +168,5 @@ def kge_norm_loss(y_true, y_pred):
return 1 - norm_kge(y_true, y_pred)


def kge_loss_one_var(data, y_pred, var_idx):
y_true, y_pred, weights = y_data_components(data, y_pred, var_idx)
return kge_loss(y_true, y_pred)


def kge_loss(y_true, y_pred):
return -1 * kge(y_true, y_pred)
Loading

0 comments on commit ec2d9b9

Please sign in to comment.