From 94d045427a615a5d77389b1f895da26bd443c621 Mon Sep 17 00:00:00 2001 From: Jacob Zwart Date: Tue, 25 May 2021 11:57:06 -0500 Subject: [PATCH 01/32] adding DA capabilities to LSTM --- river_dl/rnns.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index afff737..5fab687 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -4,26 +4,34 @@ from tensorflow.keras import layers from river_dl.loss_functions import nnse_masked_one_var, nnse_one_var_samplewise - class LSTMModel(tf.keras.Model): def __init__( self, hidden_size, gradient_correction=False, lamb=1, - dropout=0, + dropout=0, # I propose changing this to 'recurrent_dropout' and adding another option for 'dropout' since these will map to the options for the tf LSTM layers https://www.tensorflow.org/api_docs/python/tf/keras/layers/LSTMCell ; and also https://arxiv.org/pdf/1512.05287.pdf grad_log_file=None, + return_state=False, ): """ :param hidden_size: [int] the number of hidden units + :param gradient_correction: [bool] + :param lamb: [float] + :param dropout: [float] value between 0 and 1 for the probability of an element to be zero + :param grad_log_file: [str] location of gradient log file + :param return_state: [bool] return the hidden (h) and cell (c) states of LSTM """ super().__init__() self.gradient_correction = gradient_correction self.grad_log_file = grad_log_file self.lamb = lamb + self.return_state = return_state self.rnn_layer = layers.LSTM( hidden_size, return_sequences=True, + stateful=True, + return_state=return_state, name="rnn_shared", recurrent_dropout=dropout, ) @@ -32,7 +40,10 @@ def __init__( @tf.function def call(self, inputs, **kwargs): - x = self.rnn_layer(inputs) + if self.return_state: + x, h, c = self.rnn_layer(inputs) + else: + x = self.rnn_layer(inputs) main_prediction = self.dense_main(x) aux_prediction = self.dense_aux(x) return tf.concat([main_prediction, aux_prediction], axis=2) @@ -79,7 +90,11 @@ def train_step(self, data): class GRUModel(LSTMModel): - def __init__(self, hidden_size, lamb=1): + def __init__( + self, + hidden_size, + lamb=1 + ): """ :param hidden_size: [int] the number of hidden units """ From b9f5393e88f80047501868ee4f6ba3d0165cad0f Mon Sep 17 00:00:00 2001 From: Jacob Zwart Date: Wed, 26 May 2021 09:55:11 -0500 Subject: [PATCH 02/32] updating loss functions and adding tasks to RGCN --- river_dl/RGCN.py | 112 ++++++++++++++++++++++++++++++++----- river_dl/loss_functions.py | 26 +++++---- river_dl/rnns.py | 67 ++++++++++++++-------- 3 files changed, 157 insertions(+), 48 deletions(-) diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index b112e7b..6f9fb24 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -11,7 +11,16 @@ class RGCN(layers.Layer): - def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None): + def __init__( + self, + hidden_size, + A, + tasks=1, + dropout=0, + flow_in_temp=False, + rand_seed=None, + return_state=False + ): """ :param hidden_size: [int] the number of hidden units @@ -23,10 +32,12 @@ def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None): super().__init__() self.hidden_size = hidden_size self.A = A.astype("float32") + self.tasks = tasks self.flow_in_temp = flow_in_temp + self.return_state = return_state # set up the layer - self.lstm = tf.keras.layers.LSTMCell(hidden_size) + self.lstm = tf.keras.layers.LSTMCell(hidden_size, recurrent_dropout=dropout) ### set up the weights ### w_initializer = tf.random_normal_initializer( @@ -88,6 +99,7 @@ def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None): shape=[hidden_size], initializer="zeros", name="b_c" ) + # will be doing two task predictions if flow_in_temp == True if self.flow_in_temp: # was W2 self.W_out_flow = self.add_weight( @@ -108,17 +120,29 @@ def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None): shape=[1], initializer="zeros", name="b_out" ) else: - # was W2 - self.W_out = self.add_weight( - shape=[hidden_size, 2], initializer=w_initializer, name="W_out" - ) - # was b2 - self.b_out = self.add_weight( - shape=[2], initializer="zeros", name="b_out" - ) + if self.tasks == 2: + # was W2 + self.W_out = self.add_weight( + shape=[hidden_size, 2], initializer=w_initializer, name="W_out" + ) + # was b2 + self.b_out = self.add_weight( + shape=[2], initializer="zeros", name="b_out" + ) + else: + # was W2 + self.W_out = self.add_weight( + shape=[hidden_size, 1], initializer=w_initializer, name="W_out" + ) + # was b2 + self.b_out = self.add_weight( + shape=[1], initializer="zeros", name="b_out" + ) @tf.function def call(self, inputs, **kwargs): + h_list = [] + c_list = [] graph_size = self.A.shape[0] hidden_state_prev, cell_state_prev = ( tf.zeros([graph_size, self.hidden_size]), @@ -126,7 +150,15 @@ def call(self, inputs, **kwargs): ) out = [] n_steps = inputs.shape[1] + h_update = tf.cast(kwargs['h_init'], tf.float32) + c_update = tf.cast(kwargs['c_init'], tf.float32) + if self.return_state: + # set the initial h & c states to the supplied h and c states if using DA + hidden_state_prev = h_update + cell_state_prev = c_update for t in range(n_steps): + seq, state = self.lstm(inputs[:, t, :], states=[h_update, c_update]) + h, c = state # are these used anywhere? h_graph = tf.nn.tanh( tf.matmul( self.A, @@ -176,23 +208,75 @@ def call(self, inputs, **kwargs): hidden_state_prev = h_update cell_state_prev = c_update + + h_list.append(h_update) + c_list.append(c_update) + + h_list = tf.stack(h_list) + c_list = tf.stack(c_list) + h_list = tf.transpose(h_list, [1, 0, 2]) + c_list = tf.transpose(c_list, [1, 0, 2]) out = tf.stack(out) out = tf.transpose(out, [1, 0, 2]) - return out + + if self.return_state: + return out, h_list, c_list + else: + return out class RGCNModel(tf.keras.Model): - def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None): + def __init__( + self, + hidden_size, + A, + tasks=1, + dropout=0, # I propose changing this to 'recurrent_dropout' and adding another option for 'dropout' since these will map to the options for the tf LSTM layers https://www.tensorflow.org/api_docs/python/tf/keras/layers/LSTMCell ; and also https://arxiv.org/pdf/1512.05287.pdf + flow_in_temp=False, + rand_seed=None, + return_state=False + ): """ :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix :param flow_in_temp: [bool] whether the flow predictions should feed into the temp predictions :param rand_seed: [int] the random seed for initialization + :param return_state: [bool] whether the h and c states should be returned (for DA) """ super().__init__() - self.rgcn_layer = RGCN(hidden_size, A, flow_in_temp, rand_seed) + self.return_state = return_state + self.hidden_size = hidden_size + self.tasks = tasks + self.dropout = dropout + self.rnn_layer = tf.keras.layers.LSTM( + hidden_size, + return_sequences=True, + stateful=True, + return_state=return_state, + recurrent_dropout=dropout) + + self.rgcn_layer = RGCN( + hidden_size, + A, + tasks, + dropout, + flow_in_temp, + rand_seed, + return_state) + + self.h_gr = None + self.c_gr = None def call(self, inputs, **kwargs): - output = self.rgcn_layer(inputs) + batch_size = inputs.shape[0] + h_init = kwargs.get('h_init', tf.zeros([batch_size, self.hidden_size])) + c_init = kwargs.get('c_init', tf.zeros([batch_size, self.hidden_size])) + if self.return_state: + output, h_gr, c_gr = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init) + self.h_gr = h_gr + self.c_gr = c_gr + else: + output = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init) + return output diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 5ed5a18..416bb1e 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -69,19 +69,23 @@ def samplewise_nnse_loss(y_true, y_pred): return 1 - nnse_val -def nnse_masked_one_var(data, y_pred, var_idx): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) +def nnse_masked_one_var(data, y_pred, var_idx, tasks): + y_true, y_pred, weights = y_data_components(data, y_pred, var_idx, tasks) return nnse_loss(y_true, y_pred) -def nnse_one_var_samplewise(data, y_pred, var_idx): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) +def nnse_one_var_samplewise(data, y_pred, var_idx, tasks): + y_true, y_pred, weights = y_data_components(data, y_pred, var_idx, tasks) return samplewise_nnse_loss(y_true, y_pred) -def y_data_components(data, y_pred, var_idx): - weights = data[:, :, -2:] - y_true = data[:, :, :-2] +def y_data_components(data, y_pred, var_idx, tasks): + if tasks == 2: + weights = data[:, :, -2:] + y_true = data[:, :, :-2] + else: + weights = data[:, :, -1:] + y_true = data[:, :, :-1] # ensure y_pred, weights, and y_true are all tensors the same data type y_true = tf.convert_to_tensor(y_true) @@ -99,8 +103,8 @@ def y_data_components(data, y_pred, var_idx): return y_true, y_pred, weights -def rmse_masked_one_var(data, y_pred, var_idx): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) +def rmse_masked_one_var(data, y_pred, var_idx, tasks): + y_true, y_pred, weights = y_data_components(data, y_pred, var_idx, tasks) return rmse(y_true, y_pred) @@ -181,8 +185,8 @@ def kge_norm_loss(y_true, y_pred): return 1 - norm_kge(y_true, y_pred) -def kge_loss_one_var(data, y_pred, var_idx): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) +def kge_loss_one_var(data, y_pred, var_idx, tasks): + y_true, y_pred, weights = y_data_components(data, y_pred, var_idx, tasks) return kge_loss(y_true, y_pred) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index 5fab687..027cf16 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -9,6 +9,7 @@ def __init__( self, hidden_size, gradient_correction=False, + tasks=1, lamb=1, dropout=0, # I propose changing this to 'recurrent_dropout' and adding another option for 'dropout' since these will map to the options for the tf LSTM layers https://www.tensorflow.org/api_docs/python/tf/keras/layers/LSTMCell ; and also https://arxiv.org/pdf/1512.05287.pdf grad_log_file=None, @@ -17,6 +18,7 @@ def __init__( """ :param hidden_size: [int] the number of hidden units :param gradient_correction: [bool] + :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks :param lamb: [float] :param dropout: [float] value between 0 and 1 for the probability of an element to be zero :param grad_log_file: [str] location of gradient log file @@ -25,6 +27,7 @@ def __init__( super().__init__() self.gradient_correction = gradient_correction self.grad_log_file = grad_log_file + self.tasks = tasks self.lamb = lamb self.return_state = return_state self.rnn_layer = layers.LSTM( @@ -35,8 +38,11 @@ def __init__( name="rnn_shared", recurrent_dropout=dropout, ) - self.dense_main = layers.Dense(1, name="dense_main") - self.dense_aux = layers.Dense(1, name="dense_aux") + if self.tasks == 1: + self.dense_main = layers.Dense(1, name="dense_main") + else: + self.dense_main = layers.Dense(1, name="dense_main") + self.dense_aux = layers.Dense(1, name="dense_aux") @tf.function def call(self, inputs, **kwargs): @@ -44,9 +50,14 @@ def call(self, inputs, **kwargs): x, h, c = self.rnn_layer(inputs) else: x = self.rnn_layer(inputs) - main_prediction = self.dense_main(x) - aux_prediction = self.dense_aux(x) - return tf.concat([main_prediction, aux_prediction], axis=2) + + if self.tasks == 1: + main_prediction = self.dense_main(x) + return main_prediction + else: + main_prediction = self.dense_main(x) + aux_prediction = self.dense_aux(x) + return tf.concat([main_prediction, aux_prediction], axis=2) @tf.function def train_step(self, data): @@ -58,35 +69,44 @@ def train_step(self, data): with tf.GradientTape(persistent=True) as tape: y_pred = self(x, training=True) # forward pass - loss_main = nnse_one_var_samplewise(y, y_pred, 0) - loss_aux = nnse_one_var_samplewise(y, y_pred, 1) + loss_main = nnse_one_var_samplewise(y, y_pred, 0, self.tasks) + if self.tasks == 2: + loss_aux = nnse_one_var_samplewise(y, y_pred, 1, 2) trainable_vars = self.trainable_variables main_out_vars = get_variables(trainable_vars, "dense_main") - aux_out_vars = get_variables(trainable_vars, "dense_aux") - shared_vars = get_variables(trainable_vars, "rnn_shared") + if self.tasks == 2: + aux_out_vars = get_variables(trainable_vars, "dense_aux") + shared_vars = get_variables(trainable_vars, "rnn_shared") # get gradients gradient_main_out = tape.gradient(loss_main, main_out_vars) - gradient_aux_out = tape.gradient(loss_aux, aux_out_vars) - gradient_shared_main = tape.gradient(loss_main, shared_vars) - gradient_shared_aux = tape.gradient(loss_aux, shared_vars) - - if self.gradient_correction: - # adjust auxiliary gradient - gradient_shared_aux = adjust_gradient_list( - gradient_shared_main, gradient_shared_aux, self.grad_log_file + if self.tasks == 2: + gradient_aux_out = tape.gradient(loss_aux, aux_out_vars) + gradient_shared_main = tape.gradient(loss_main, shared_vars) + gradient_shared_aux = tape.gradient(loss_aux, shared_vars) + + if self.tasks == 2: + if self.gradient_correction: + # adjust auxiliary gradient + gradient_shared_aux = adjust_gradient_list( + gradient_shared_main, gradient_shared_aux, self.grad_log_file + ) + if self.tasks == 2: + combined_gradient = combine_gradients_list( + gradient_shared_main, gradient_shared_aux, lamb=self.lamb ) - combined_gradient = combine_gradients_list( - gradient_shared_main, gradient_shared_aux, lamb=self.lamb - ) # apply gradients self.optimizer.apply_gradients(zip(gradient_main_out, main_out_vars)) - self.optimizer.apply_gradients(zip(gradient_aux_out, aux_out_vars)) - self.optimizer.apply_gradients(zip(combined_gradient, shared_vars)) - return {"loss_main": loss_main, "loss_aux": loss_aux} + if self.tasks == 2: + self.optimizer.apply_gradients(zip(gradient_aux_out, aux_out_vars)) + self.optimizer.apply_gradients(zip(combined_gradient, shared_vars)) + return {"loss_main": loss_main, "loss_aux": loss_aux} + else: + return {"loss_main": loss_main} + class GRUModel(LSTMModel): @@ -140,3 +160,4 @@ def adjust_gradient_list(main_grads, aux_grads, logfile=None): adjust_gradient(main_grads[i], aux_grads[i], logfile) for i in range(len(main_grads)) ] + From a1ff04fb4a98e44b93a73a1cde651f9c452fb25e Mon Sep 17 00:00:00 2001 From: Jacob Zwart Date: Wed, 26 May 2021 10:24:05 -0500 Subject: [PATCH 03/32] updating parameter definitions --- river_dl/RGCN.py | 9 +++++++-- river_dl/loss_functions.py | 14 +++++++++----- river_dl/rnns.py | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index 6f9fb24..09a412f 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -16,7 +16,7 @@ def __init__( hidden_size, A, tasks=1, - dropout=0, + dropout=0, # I propose changing this to 'recurrent_dropout' and adding another option for 'dropout' since these will map to the options for the tf LSTM layers https://www.tensorflow.org/api_docs/python/tf/keras/layers/LSTMCell ; and also https://arxiv.org/pdf/1512.05287.pdf flow_in_temp=False, rand_seed=None, return_state=False @@ -25,9 +25,12 @@ def __init__( :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix + :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks + :param dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero :param flow_in_temp: [bool] whether the flow predictions should feed into the temp predictions :param rand_seed: [int] the random seed for initialization + :param return_state: [bool] whether the hidden (h) and cell (c) states of LSTM should be returned """ super().__init__() self.hidden_size = hidden_size @@ -239,10 +242,12 @@ def __init__( """ :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix + :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks + :param dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero :param flow_in_temp: [bool] whether the flow predictions should feed into the temp predictions :param rand_seed: [int] the random seed for initialization - :param return_state: [bool] whether the h and c states should be returned (for DA) + :param return_state: [bool] whether the hidden (h) and cell (c) states of LSTM should be returned """ super().__init__() self.return_state = return_state diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 416bb1e..f8916b5 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -108,18 +108,22 @@ def rmse_masked_one_var(data, y_pred, var_idx, tasks): return rmse(y_true, y_pred) -def weighted_masked_rmse(lamb=0.5): +def weighted_masked_rmse(lamb=0.5, tasks=1): """ calculate a weighted, masked rmse. :param lamb: [float] (short for lambda). The factor that the auxiliary loss will be multiplied by before added to the main loss. + :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks """ def rmse_masked_combined(data, y_pred): - rmse_main = rmse_masked_one_var(data, y_pred, 0) - rmse_aux = rmse_masked_one_var(data, y_pred, 1) - rmse_loss = rmse_main + lamb * rmse_aux - return rmse_loss + rmse_main = rmse_masked_one_var(data, y_pred, 0, tasks) + if tasks == 2: + rmse_aux = rmse_masked_one_var(data, y_pred, 1, tasks) + rmse_loss = rmse_main + lamb * rmse_aux + return rmse_loss + else: + return rmse_main return rmse_masked_combined diff --git a/river_dl/rnns.py b/river_dl/rnns.py index 027cf16..4385a23 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -20,7 +20,7 @@ def __init__( :param gradient_correction: [bool] :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks :param lamb: [float] - :param dropout: [float] value between 0 and 1 for the probability of an element to be zero + :param dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero :param grad_log_file: [str] location of gradient log file :param return_state: [bool] return the hidden (h) and cell (c) states of LSTM """ From b7e90469dfd1d2ffa2f31063b3dbb62eb661c7d0 Mon Sep 17 00:00:00 2001 From: Jacob Zwart Date: Wed, 2 Jun 2021 13:45:32 -0500 Subject: [PATCH 04/32] updating dropout argument names --- river_dl/RGCN.py | 19 +++++++++++++------ river_dl/rnns.py | 9 ++++++--- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index 09a412f..fb0f918 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -16,7 +16,8 @@ def __init__( hidden_size, A, tasks=1, - dropout=0, # I propose changing this to 'recurrent_dropout' and adding another option for 'dropout' since these will map to the options for the tf LSTM layers https://www.tensorflow.org/api_docs/python/tf/keras/layers/LSTMCell ; and also https://arxiv.org/pdf/1512.05287.pdf + recurrent_dropout=0, + dropout=0, flow_in_temp=False, rand_seed=None, return_state=False @@ -26,7 +27,8 @@ def __init__( :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks - :param dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero :param flow_in_temp: [bool] whether the flow predictions should feed into the temp predictions :param rand_seed: [int] the random seed for initialization @@ -40,7 +42,7 @@ def __init__( self.return_state = return_state # set up the layer - self.lstm = tf.keras.layers.LSTMCell(hidden_size, recurrent_dropout=dropout) + self.lstm = tf.keras.layers.LSTMCell(hidden_size, recurrent_dropout=recurrent_dropout, dropout=dropout) ### set up the weights ### w_initializer = tf.random_normal_initializer( @@ -234,7 +236,8 @@ def __init__( hidden_size, A, tasks=1, - dropout=0, # I propose changing this to 'recurrent_dropout' and adding another option for 'dropout' since these will map to the options for the tf LSTM layers https://www.tensorflow.org/api_docs/python/tf/keras/layers/LSTMCell ; and also https://arxiv.org/pdf/1512.05287.pdf + recurrent_dropout=0, + dropout=0, flow_in_temp=False, rand_seed=None, return_state=False @@ -243,7 +246,8 @@ def __init__( :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks - :param dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero :param flow_in_temp: [bool] whether the flow predictions should feed into the temp predictions :param rand_seed: [int] the random seed for initialization @@ -253,18 +257,21 @@ def __init__( self.return_state = return_state self.hidden_size = hidden_size self.tasks = tasks + self.recurrent_dropout = recurrent_dropout self.dropout = dropout self.rnn_layer = tf.keras.layers.LSTM( hidden_size, return_sequences=True, stateful=True, return_state=return_state, - recurrent_dropout=dropout) + recurrent_dropout=recurrent_dropout, + dropout=dropout) self.rgcn_layer = RGCN( hidden_size, A, tasks, + recurrent_dropout dropout, flow_in_temp, rand_seed, diff --git a/river_dl/rnns.py b/river_dl/rnns.py index 4385a23..28cce36 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -11,7 +11,8 @@ def __init__( gradient_correction=False, tasks=1, lamb=1, - dropout=0, # I propose changing this to 'recurrent_dropout' and adding another option for 'dropout' since these will map to the options for the tf LSTM layers https://www.tensorflow.org/api_docs/python/tf/keras/layers/LSTMCell ; and also https://arxiv.org/pdf/1512.05287.pdf + recurrent_dropout=0, + dropout=0, grad_log_file=None, return_state=False, ): @@ -20,7 +21,8 @@ def __init__( :param gradient_correction: [bool] :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks :param lamb: [float] - :param dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero :param grad_log_file: [str] location of gradient log file :param return_state: [bool] return the hidden (h) and cell (c) states of LSTM """ @@ -36,7 +38,8 @@ def __init__( stateful=True, return_state=return_state, name="rnn_shared", - recurrent_dropout=dropout, + recurrent_dropout=recurrent_dropout, + dropout=dropout ) if self.tasks == 1: self.dense_main = layers.Dense(1, name="dense_main") From 494b93100a917fe1b36ee77c9e243d8f9c473e43 Mon Sep 17 00:00:00 2001 From: Jake Zwart Date: Wed, 2 Jun 2021 15:04:13 -0400 Subject: [PATCH 05/32] cleaning LSTM Co-authored-by: Alison Appling --- river_dl/rnns.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index 28cce36..6bfd68b 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -41,10 +41,8 @@ def __init__( recurrent_dropout=recurrent_dropout, dropout=dropout ) - if self.tasks == 1: - self.dense_main = layers.Dense(1, name="dense_main") - else: - self.dense_main = layers.Dense(1, name="dense_main") + self.dense_main = layers.Dense(1, name="dense_main") + if self.tasks == 2: self.dense_aux = layers.Dense(1, name="dense_aux") @tf.function @@ -163,4 +161,3 @@ def adjust_gradient_list(main_grads, aux_grads, logfile=None): adjust_gradient(main_grads[i], aux_grads[i], logfile) for i in range(len(main_grads)) ] - From b3f8df91a5eb9b8f352e5a7314f644f3fef781fb Mon Sep 17 00:00:00 2001 From: Jake Zwart Date: Wed, 2 Jun 2021 15:05:44 -0400 Subject: [PATCH 06/32] Being explicit about the number of tasks no magic numbers Co-authored-by: Alison Appling --- river_dl/rnns.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index 6bfd68b..bace4fb 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -72,7 +72,7 @@ def train_step(self, data): loss_main = nnse_one_var_samplewise(y, y_pred, 0, self.tasks) if self.tasks == 2: - loss_aux = nnse_one_var_samplewise(y, y_pred, 1, 2) + loss_aux = nnse_one_var_samplewise(y, y_pred, 1, self.tasks) trainable_vars = self.trainable_variables From 34e998314a4742315b95f01a3b4ead7c1428b0d7 Mon Sep 17 00:00:00 2001 From: Jake Zwart Date: Wed, 2 Jun 2021 15:13:54 -0400 Subject: [PATCH 07/32] cleaning up init states call Co-authored-by: Alison Appling --- river_dl/RGCN.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index fb0f918..5382960 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -149,18 +149,15 @@ def call(self, inputs, **kwargs): h_list = [] c_list = [] graph_size = self.A.shape[0] - hidden_state_prev, cell_state_prev = ( - tf.zeros([graph_size, self.hidden_size]), - tf.zeros([graph_size, self.hidden_size]), - ) out = [] n_steps = inputs.shape[1] - h_update = tf.cast(kwargs['h_init'], tf.float32) - c_update = tf.cast(kwargs['c_init'], tf.float32) + # set the initial h & c states to the supplied h and c states if using DA, or 0's otherwise if self.return_state: - # set the initial h & c states to the supplied h and c states if using DA - hidden_state_prev = h_update - cell_state_prev = c_update + hidden_state_prev = tf.cast(kwargs['h_init'], tf.float32) + cell_state_prev = tf.cast(kwargs['c_init'], tf.float32) + else: + hidden_state_prev = tf.zeros([graph_size, self.hidden_size]) + cell_state_prev = tf.zeros([graph_size, self.hidden_size]) for t in range(n_steps): seq, state = self.lstm(inputs[:, t, :], states=[h_update, c_update]) h, c = state # are these used anywhere? From 10ffdbdc5580741d0dd11d33a48eff35063cdc16 Mon Sep 17 00:00:00 2001 From: Jacob Zwart Date: Wed, 2 Jun 2021 14:15:17 -0500 Subject: [PATCH 08/32] Updating rgcn for clarity --- river_dl/RGCN.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index fb0f918..751b6af 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -125,24 +125,14 @@ def __init__( shape=[1], initializer="zeros", name="b_out" ) else: - if self.tasks == 2: - # was W2 - self.W_out = self.add_weight( - shape=[hidden_size, 2], initializer=w_initializer, name="W_out" - ) - # was b2 - self.b_out = self.add_weight( - shape=[2], initializer="zeros", name="b_out" - ) - else: - # was W2 - self.W_out = self.add_weight( - shape=[hidden_size, 1], initializer=w_initializer, name="W_out" - ) - # was b2 - self.b_out = self.add_weight( - shape=[1], initializer="zeros", name="b_out" - ) + # was W2 + self.W_out = self.add_weight( + shape=[hidden_size, self.tasks], initializer=w_initializer, name="W_out" + ) + # was b2 + self.b_out = self.add_weight( + shape=[self.tasks], initializer="zeros", name="b_out" + ) @tf.function def call(self, inputs, **kwargs): @@ -162,8 +152,6 @@ def call(self, inputs, **kwargs): hidden_state_prev = h_update cell_state_prev = c_update for t in range(n_steps): - seq, state = self.lstm(inputs[:, t, :], states=[h_update, c_update]) - h, c = state # are these used anywhere? h_graph = tf.nn.tanh( tf.matmul( self.A, From cf953241162d127fb59b4b21964c3f8bf9704e4e Mon Sep 17 00:00:00 2001 From: Jacob Zwart Date: Wed, 2 Jun 2021 14:30:46 -0500 Subject: [PATCH 09/32] updating lambda_aux argument --- river_dl/loss_functions.py | 6 +++--- river_dl/rnns.py | 16 ++++++++-------- river_dl/train.py | 14 +++++++------- river_dl/train_model.py | 4 ++-- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index f8916b5..73d2c25 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -108,10 +108,10 @@ def rmse_masked_one_var(data, y_pred, var_idx, tasks): return rmse(y_true, y_pred) -def weighted_masked_rmse(lamb=0.5, tasks=1): +def weighted_masked_rmse(lambda_aux=0.5, tasks=1): """ calculate a weighted, masked rmse. - :param lamb: [float] (short for lambda). The factor that the auxiliary loss + :param lambda_aux: [float] The factor that the auxiliary loss will be multiplied by before added to the main loss. :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks """ @@ -120,7 +120,7 @@ def rmse_masked_combined(data, y_pred): rmse_main = rmse_masked_one_var(data, y_pred, 0, tasks) if tasks == 2: rmse_aux = rmse_masked_one_var(data, y_pred, 1, tasks) - rmse_loss = rmse_main + lamb * rmse_aux + rmse_loss = rmse_main + lambda_aux * rmse_aux return rmse_loss else: return rmse_main diff --git a/river_dl/rnns.py b/river_dl/rnns.py index bace4fb..db75847 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -10,7 +10,7 @@ def __init__( hidden_size, gradient_correction=False, tasks=1, - lamb=1, + lambda_aux=1, recurrent_dropout=0, dropout=0, grad_log_file=None, @@ -20,7 +20,7 @@ def __init__( :param hidden_size: [int] the number of hidden units :param gradient_correction: [bool] :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks - :param lamb: [float] + :param lambda_aux: [float] :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero :param grad_log_file: [str] location of gradient log file @@ -30,7 +30,7 @@ def __init__( self.gradient_correction = gradient_correction self.grad_log_file = grad_log_file self.tasks = tasks - self.lamb = lamb + self.lambda_aux = lambda_aux self.return_state = return_state self.rnn_layer = layers.LSTM( hidden_size, @@ -96,7 +96,7 @@ def train_step(self, data): ) if self.tasks == 2: combined_gradient = combine_gradients_list( - gradient_shared_main, gradient_shared_aux, lamb=self.lamb + gradient_shared_main, gradient_shared_aux, lambda_aux=self.lambda_aux ) # apply gradients @@ -114,12 +114,12 @@ class GRUModel(LSTMModel): def __init__( self, hidden_size, - lamb=1 + lambda_aux=1 ): """ :param hidden_size: [int] the number of hidden units """ - super().__init__(hidden_size, lamb=lamb) + super().__init__(hidden_size, lambda_aux=lambda_aux) self.rnn_layer = layers.GRU( hidden_size, return_sequences=True, name="rnn_shared" ) @@ -152,8 +152,8 @@ def get_variables(trainable_variables, name): return [v for v in trainable_variables if name in v.name] -def combine_gradients_list(main_grads, aux_grads, lamb=1): - return [main_grads[i] + lamb * aux_grads[i] for i in range(len(main_grads))] +def combine_gradients_list(main_grads, aux_grads, lambda_aux=1): + return [main_grads[i] + lambda_aux * aux_grads[i] for i in range(len(main_grads))] def adjust_gradient_list(main_grads, aux_grads, logfile=None): diff --git a/river_dl/train.py b/river_dl/train.py index c4c0c47..e49441a 100644 --- a/river_dl/train.py +++ b/river_dl/train.py @@ -31,7 +31,7 @@ def train_model( model_type="rgcn", seed=None, dropout=0, - lamb=1, + lambda_aux=1, learning_rate_pre=0.005, learning_rate_ft=0.01, ): @@ -47,7 +47,7 @@ def train_model( :param model_type: [str] which model to use (either 'lstm', 'rgcn', or 'lstm_grad_correction') :param seed: [int] random seed - :param lamb: [float] (short for 'lambda') weight between 0 and 1. How much + :param lambda_aux: [float] weight between 0 and 1. How much to weight the auxiliary rmse is weighted compared to the main rmse. The difference between one and lambda becomes the main rmse weight. :param learning_rate_pre: [float] the pretrain learning rate @@ -71,7 +71,7 @@ def train_model( batch_size = num_years if model_type == "lstm": - model = LSTMModel(hidden_units, lamb=lamb) + model = LSTMModel(hidden_units, lambda_aux=lambda_aux) elif model_type == "rgcn": model = RGCNModel( hidden_units, @@ -84,12 +84,12 @@ def train_model( model = LSTMModel( hidden_units, gradient_correction=True, - lamb=lamb, + lambda_aux=lambda_aux, dropout=dropout, grad_log_file=grad_log_file, ) elif model_type == "gru": - model = GRUModel(hidden_units, lamb=lamb) + model = GRUModel(hidden_units, lambda_aux=lambda_aux) if seed: os.environ["PYTHONHASHSEED"] = str(seed) @@ -110,7 +110,7 @@ def train_model( ) if model_type == "rgcn": - model.compile(optimizer_pre, loss=weighted_masked_rmse(lamb=lamb)) + model.compile(optimizer_pre, loss=weighted_masked_rmse(lambda_aux=lambda_aux)) else: model.compile(optimizer_pre) @@ -141,7 +141,7 @@ def train_model( optimizer_ft = tf.optimizers.Adam(learning_rate=learning_rate_ft) if model_type == "rgcn": - model.compile(optimizer_ft, loss=weighted_masked_rmse(lamb=lamb)) + model.compile(optimizer_ft, loss=weighted_masked_rmse(lambda_aux=lambda_aux)) else: model.compile(optimizer_ft) diff --git a/river_dl/train_model.py b/river_dl/train_model.py index be465fa..acffe3b 100644 --- a/river_dl/train_model.py +++ b/river_dl/train_model.py @@ -49,7 +49,7 @@ default="rgcn", ) parser.add_argument( - "--lamb", help="lambda for weighting aux gradient", default=1.0, type=float + "--lambda_aux", help="lambda for weighting aux gradient", default=1.0, type=float ) @@ -69,7 +69,7 @@ hidden_units, out_dir=out_dir, flow_in_temp=flow_in_temp, - lamb=args.lamb, + lambda_aux=args.lambda_aux, seed=args.random_seed, learning_rate_ft=args.ft_learn_rate, learning_rate_pre=args.pt_learn_rate, From 088d4a788a6f63cf1115afe0772c7eb16cc5704c Mon Sep 17 00:00:00 2001 From: Jacob Zwart Date: Wed, 2 Jun 2021 14:35:20 -0500 Subject: [PATCH 10/32] making stateful based on return_state argument --- river_dl/RGCN.py | 2 +- river_dl/rnns.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index 9aa93ae..06fb6b6 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -247,7 +247,7 @@ def __init__( self.rnn_layer = tf.keras.layers.LSTM( hidden_size, return_sequences=True, - stateful=True, + stateful=return_state, return_state=return_state, recurrent_dropout=recurrent_dropout, dropout=dropout) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index db75847..0e23b81 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -35,7 +35,7 @@ def __init__( self.rnn_layer = layers.LSTM( hidden_size, return_sequences=True, - stateful=True, + stateful=return_state, return_state=return_state, name="rnn_shared", recurrent_dropout=recurrent_dropout, From 4101879fd536cfc07008208cc13a5b1b7f9ae247 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Wed, 2 Jun 2021 16:34:18 -0500 Subject: [PATCH 11/32] separating output, rgcn layers --- river_dl/RGCN.py | 130 +++++++++++------------------------------------ 1 file changed, 30 insertions(+), 100 deletions(-) diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index 06fb6b6..68d58cc 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -15,31 +15,21 @@ def __init__( self, hidden_size, A, - tasks=1, - recurrent_dropout=0, + recurrent_dropout=0, dropout=0, - flow_in_temp=False, rand_seed=None, - return_state=False ): """ :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix - :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks - :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero - :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero - :param flow_in_temp: [bool] whether the flow predictions should feed - into the temp predictions + :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero :param rand_seed: [int] the random seed for initialization - :param return_state: [bool] whether the hidden (h) and cell (c) states of LSTM should be returned """ super().__init__() self.hidden_size = hidden_size self.A = A.astype("float32") - self.tasks = tasks - self.flow_in_temp = flow_in_temp - self.return_state = return_state # set up the layer self.lstm = tf.keras.layers.LSTMCell(hidden_size, recurrent_dropout=recurrent_dropout, dropout=dropout) @@ -104,45 +94,14 @@ def __init__( shape=[hidden_size], initializer="zeros", name="b_c" ) - # will be doing two task predictions if flow_in_temp == True - if self.flow_in_temp: - # was W2 - self.W_out_flow = self.add_weight( - shape=[hidden_size, 1], initializer=w_initializer, name="W_out" - ) - # was b2 - self.b_out_flow = self.add_weight( - shape=[1], initializer="zeros", name="b_out" - ) - - self.W_out_temp = self.add_weight( - shape=[hidden_size + 1, 1], - initializer=w_initializer, - name="W_out", - ) - - self.b_out_temp = self.add_weight( - shape=[1], initializer="zeros", name="b_out" - ) - else: - # was W2 - self.W_out = self.add_weight( - shape=[hidden_size, self.tasks], initializer=w_initializer, name="W_out" - ) - # was b2 - self.b_out = self.add_weight( - shape=[self.tasks], initializer="zeros", name="b_out" - ) - @tf.function def call(self, inputs, **kwargs): h_list = [] c_list = [] graph_size = self.A.shape[0] - out = [] n_steps = inputs.shape[1] # set the initial h & c states to the supplied h and c states if using DA, or 0's otherwise - if self.return_state: + if kwargs.get('h_init'): hidden_state_prev = tf.cast(kwargs['h_init'], tf.float32) cell_state_prev = tf.cast(kwargs['c_init'], tf.float32) else: @@ -179,23 +138,6 @@ def call(self, inputs, **kwargs): + self.b_c ) - if self.flow_in_temp: - out_pred_q = ( - tf.matmul(h_update, self.W_out_flow) + self.b_out_flow - ) - out_pred_t = ( - tf.matmul( - tf.concat([h_update, out_pred_q], axis=1), - self.W_out_temp, - ) - + self.b_out_temp - ) - out_pred = tf.concat([out_pred_t, out_pred_q], axis=1) - else: - out_pred = tf.matmul(h_update, self.W_out) + self.b_out - - out.append(out_pred) - hidden_state_prev = h_update cell_state_prev = c_update @@ -206,13 +148,7 @@ def call(self, inputs, **kwargs): c_list = tf.stack(c_list) h_list = tf.transpose(h_list, [1, 0, 2]) c_list = tf.transpose(c_list, [1, 0, 2]) - out = tf.stack(out) - out = tf.transpose(out, [1, 0, 2]) - - if self.return_state: - return out, h_list, c_list - else: - return out + return h_list, c_list class RGCNModel(tf.keras.Model): @@ -220,60 +156,54 @@ def __init__( self, hidden_size, A, - tasks=1, + num_tasks=1, recurrent_dropout=0, dropout=0, - flow_in_temp=False, rand_seed=None, - return_state=False ): """ :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix - :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks - :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param num_tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks + :param recurrent_dropout: [float] value between 0 and 1 for the probability of a recurrent element to be zero :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero - :param flow_in_temp: [bool] whether the flow predictions should feed into the temp predictions :param rand_seed: [int] the random seed for initialization - :param return_state: [bool] whether the hidden (h) and cell (c) states of LSTM should be returned """ super().__init__() - self.return_state = return_state - self.hidden_size = hidden_size - self.tasks = tasks + self.hidden_size = hidden_size + self.num_tasks = num_tasks self.recurrent_dropout = recurrent_dropout - self.dropout = dropout - self.rnn_layer = tf.keras.layers.LSTM( - hidden_size, - return_sequences=True, - stateful=return_state, - return_state=return_state, - recurrent_dropout=recurrent_dropout, - dropout=dropout) - + self.dropout = dropout + self.rgcn_layer = RGCN( hidden_size, A, - tasks, - recurrent_dropout + recurrent_dropout, dropout, - flow_in_temp, - rand_seed, - return_state) + rand_seed) self.h_gr = None self.c_gr = None + self.dense_main = layers.Dense(1, name="dense_main") + if self.num_tasks == 2: + self.dense_aux = layers.Dense(1, name="dense_aux") + def call(self, inputs, **kwargs): batch_size = inputs.shape[0] h_init = kwargs.get('h_init', tf.zeros([batch_size, self.hidden_size])) c_init = kwargs.get('c_init', tf.zeros([batch_size, self.hidden_size])) - if self.return_state: - output, h_gr, c_gr = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init) - self.h_gr = h_gr - self.c_gr = c_gr + h_gr, c_gr = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init) + self.h_gr = h_gr + self.c_gr = c_gr + + if self.num_tasks == 1: + main_prediction = self.dense_main(h_gr) + return main_prediction + elif self.num_tasks == 2: + main_prediction = self.dense_main(h_gr) + aux_prediction = self.dense_aux(h_gr) + return tf.concat([main_prediction, aux_prediction], axis=2) else: - output = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init) - - return output + raise ValueError(f'This model only supports 1 or 2 tasks (not {self.num_tasks})') From 6fc687db72cda5cbad0069e1559c6c5d23e9fccb Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Wed, 2 Jun 2021 17:12:47 -0500 Subject: [PATCH 12/32] making a more generic multitask loss function --- river_dl/loss_functions.py | 38 +++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 73d2c25..88ae7b3 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -103,29 +103,25 @@ def y_data_components(data, y_pred, var_idx, tasks): return y_true, y_pred, weights -def rmse_masked_one_var(data, y_pred, var_idx, tasks): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx, tasks) - return rmse(y_true, y_pred) - - -def weighted_masked_rmse(lambda_aux=0.5, tasks=1): +def multitask_loss(lambdas, loss_func): """ - calculate a weighted, masked rmse. - :param lambda_aux: [float] The factor that the auxiliary loss - will be multiplied by before added to the main loss. - :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks + calculate a weighted multi-task loss for a given number of variables with a + given loss function + :param lambdas: [array-like float] The factor that losses will be + multiplied by before being added together. + :param loss_func: [function] Loss function that will be used to calculate + the loss of each variable. Must take as input parameters [y_true, y_pred] """ - - def rmse_masked_combined(data, y_pred): - rmse_main = rmse_masked_one_var(data, y_pred, 0, tasks) - if tasks == 2: - rmse_aux = rmse_masked_one_var(data, y_pred, 1, tasks) - rmse_loss = rmse_main + lambda_aux * rmse_aux - return rmse_loss - else: - return rmse_main - - return rmse_masked_combined + def combine_loss(y_true, y_pred): + losses = [] + n_vars = y_pred.shape[-1] + for var_id in range(n_vars): + ind_var_loss = loss_func(y_true[:, :, var_id], y_pred[:, :, var_id]) + weighted_ind_var_loss = lambdas[var_id] * ind_var_loss + losses.append(weighted_ind_var_loss) + total_loss = sum(losses) + return total_loss + return combine_loss def mean_masked(y): From 7c36a4f76604adda03be2afdbd13e4a623db0460 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Wed, 2 Jun 2021 17:16:41 -0500 Subject: [PATCH 13/32] adding SingletaskLSTMModel --- river_dl/rnns.py | 51 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index 0e23b81..f87e2ba 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -4,6 +4,39 @@ from tensorflow.keras import layers from river_dl.loss_functions import nnse_masked_one_var, nnse_one_var_samplewise + +class SingletaskLSTMModel(tf.keras.Model): + def __init__( + self, + hidden_size, + recurrent_dropout=0, + dropout=0, + ): + """ + :param hidden_size: [int] the number of hidden units + :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero + """ + super().__init__() + self.rnn_layer = layers.LSTM( + hidden_size, + return_sequences=True, + stateful=True, + return_state=True, + recurrent_dropout=recurrent_dropout, + dropout=dropout + ) + self.dense_main = layers.Dense(1, name="dense_main") + self.h = None + self.c = None + + @tf.function + def call(self, inputs, **kwargs): + x, self.h, self.c = self.rnn_layer(inputs) + main_prediction = self.dense_main(x) + return main_prediction + + class LSTMModel(tf.keras.Model): def __init__( self, @@ -71,43 +104,43 @@ def train_step(self, data): y_pred = self(x, training=True) # forward pass loss_main = nnse_one_var_samplewise(y, y_pred, 0, self.tasks) - if self.tasks == 2: + if self.tasks == 2: loss_aux = nnse_one_var_samplewise(y, y_pred, 1, self.tasks) trainable_vars = self.trainable_variables main_out_vars = get_variables(trainable_vars, "dense_main") - if self.tasks == 2: + if self.tasks == 2: aux_out_vars = get_variables(trainable_vars, "dense_aux") shared_vars = get_variables(trainable_vars, "rnn_shared") # get gradients gradient_main_out = tape.gradient(loss_main, main_out_vars) - if self.tasks == 2: + if self.tasks == 2: gradient_aux_out = tape.gradient(loss_aux, aux_out_vars) gradient_shared_main = tape.gradient(loss_main, shared_vars) gradient_shared_aux = tape.gradient(loss_aux, shared_vars) - if self.tasks == 2: + if self.tasks == 2: if self.gradient_correction: # adjust auxiliary gradient gradient_shared_aux = adjust_gradient_list( gradient_shared_main, gradient_shared_aux, self.grad_log_file ) - if self.tasks == 2: + if self.tasks == 2: combined_gradient = combine_gradients_list( gradient_shared_main, gradient_shared_aux, lambda_aux=self.lambda_aux ) # apply gradients self.optimizer.apply_gradients(zip(gradient_main_out, main_out_vars)) - if self.tasks == 2: + if self.tasks == 2: self.optimizer.apply_gradients(zip(gradient_aux_out, aux_out_vars)) self.optimizer.apply_gradients(zip(combined_gradient, shared_vars)) return {"loss_main": loss_main, "loss_aux": loss_aux} - else: - return {"loss_main": loss_main} - + else: + return {"loss_main": loss_main} + class GRUModel(LSTMModel): From c582d3d2567daa75cc8a0bc90b3dce6ac5d01a85 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Wed, 2 Jun 2021 17:20:30 -0500 Subject: [PATCH 14/32] Convert to MultitaskLSTM; update GRU classes --- river_dl/rnns.py | 92 ++++++++++++++++++++++-------------------------- 1 file changed, 42 insertions(+), 50 deletions(-) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index f87e2ba..8f58610 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -37,61 +37,48 @@ def call(self, inputs, **kwargs): return main_prediction -class LSTMModel(tf.keras.Model): +class MultitaskLSTMModel(tf.keras.Model): def __init__( self, hidden_size, gradient_correction=False, - tasks=1, lambda_aux=1, recurrent_dropout=0, dropout=0, grad_log_file=None, - return_state=False, ): """ :param hidden_size: [int] the number of hidden units :param gradient_correction: [bool] - :param tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks - :param lambda_aux: [float] + :param lambda_aux: [float] :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero :param grad_log_file: [str] location of gradient log file - :param return_state: [bool] return the hidden (h) and cell (c) states of LSTM """ super().__init__() self.gradient_correction = gradient_correction self.grad_log_file = grad_log_file - self.tasks = tasks self.lambda_aux = lambda_aux - self.return_state = return_state self.rnn_layer = layers.LSTM( hidden_size, return_sequences=True, - stateful=return_state, - return_state=return_state, + stateful=True, + return_state=True, name="rnn_shared", recurrent_dropout=recurrent_dropout, dropout=dropout ) self.dense_main = layers.Dense(1, name="dense_main") - if self.tasks == 2: - self.dense_aux = layers.Dense(1, name="dense_aux") + self.dense_aux = layers.Dense(1, name="dense_aux") + self.h = None + self.c = None @tf.function def call(self, inputs, **kwargs): - if self.return_state: - x, h, c = self.rnn_layer(inputs) - else: - x = self.rnn_layer(inputs) - - if self.tasks == 1: - main_prediction = self.dense_main(x) - return main_prediction - else: - main_prediction = self.dense_main(x) - aux_prediction = self.dense_aux(x) - return tf.concat([main_prediction, aux_prediction], axis=2) + x, self.h, self.c = self.rnn_layer(inputs) + main_prediction = self.dense_main(x) + aux_prediction = self.dense_aux(x) + return tf.concat([main_prediction, aux_prediction], axis=2) @tf.function def train_step(self, data): @@ -104,46 +91,51 @@ def train_step(self, data): y_pred = self(x, training=True) # forward pass loss_main = nnse_one_var_samplewise(y, y_pred, 0, self.tasks) - if self.tasks == 2: - loss_aux = nnse_one_var_samplewise(y, y_pred, 1, self.tasks) + loss_aux = nnse_one_var_samplewise(y, y_pred, 1, self.tasks) trainable_vars = self.trainable_variables main_out_vars = get_variables(trainable_vars, "dense_main") - if self.tasks == 2: - aux_out_vars = get_variables(trainable_vars, "dense_aux") - shared_vars = get_variables(trainable_vars, "rnn_shared") + aux_out_vars = get_variables(trainable_vars, "dense_aux") + shared_vars = get_variables(trainable_vars, "rnn_shared") # get gradients gradient_main_out = tape.gradient(loss_main, main_out_vars) - if self.tasks == 2: - gradient_aux_out = tape.gradient(loss_aux, aux_out_vars) - gradient_shared_main = tape.gradient(loss_main, shared_vars) - gradient_shared_aux = tape.gradient(loss_aux, shared_vars) - - if self.tasks == 2: - if self.gradient_correction: - # adjust auxiliary gradient - gradient_shared_aux = adjust_gradient_list( - gradient_shared_main, gradient_shared_aux, self.grad_log_file - ) - if self.tasks == 2: - combined_gradient = combine_gradients_list( - gradient_shared_main, gradient_shared_aux, lambda_aux=self.lambda_aux + gradient_aux_out = tape.gradient(loss_aux, aux_out_vars) + gradient_shared_main = tape.gradient(loss_main, shared_vars) + gradient_shared_aux = tape.gradient(loss_aux, shared_vars) + + if self.gradient_correction: + # adjust auxiliary gradient + gradient_shared_aux = adjust_gradient_list( + gradient_shared_main, gradient_shared_aux, self.grad_log_file ) + combined_gradient = combine_gradients_list( + gradient_shared_main, gradient_shared_aux, lambda_aux=self.lambda_aux + ) # apply gradients self.optimizer.apply_gradients(zip(gradient_main_out, main_out_vars)) - if self.tasks == 2: - self.optimizer.apply_gradients(zip(gradient_aux_out, aux_out_vars)) - self.optimizer.apply_gradients(zip(combined_gradient, shared_vars)) - return {"loss_main": loss_main, "loss_aux": loss_aux} - else: - return {"loss_main": loss_main} + self.optimizer.apply_gradients(zip(gradient_aux_out, aux_out_vars)) + self.optimizer.apply_gradients(zip(combined_gradient, shared_vars)) + return {"loss_main": loss_main, "loss_aux": loss_aux} +class SingletaskGRUModel(SingletaskLSTMModel): + def __init__( + self, + hidden_size, + ): + """ + :param hidden_size: [int] the number of hidden units + """ + super().__init__(hidden_size) + self.rnn_layer = layers.GRU( + hidden_size, return_sequences=True, name="rnn_shared" + ) + -class GRUModel(LSTMModel): +class MultitaskGRUModel(MultitaskLSTMModel): def __init__( self, hidden_size, From ab4cff11f6a728e2b75cda4d8fbb0c9e8f292397 Mon Sep 17 00:00:00 2001 From: Jake Zwart Date: Thu, 3 Jun 2021 11:07:48 -0400 Subject: [PATCH 15/32] explicit about number of tasks for y_data_components Co-authored-by: Alison Appling --- river_dl/loss_functions.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 88ae7b3..5f30809 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -80,12 +80,8 @@ def nnse_one_var_samplewise(data, y_pred, var_idx, tasks): def y_data_components(data, y_pred, var_idx, tasks): - if tasks == 2: - weights = data[:, :, -2:] - y_true = data[:, :, :-2] - else: - weights = data[:, :, -1:] - y_true = data[:, :, :-1] + weights = data[:, :, -tasks:] + y_true = data[:, :, :-tasks] # ensure y_pred, weights, and y_true are all tensors the same data type y_true = tf.convert_to_tensor(y_true) From 3129bd508f995ae09be8bb0dad480f56bee7346d Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 10:44:00 -0500 Subject: [PATCH 16/32] convenience fxn weighted_masked_rmse --- river_dl/loss_functions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 88ae7b3..b1989d0 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -103,6 +103,10 @@ def y_data_components(data, y_pred, var_idx, tasks): return y_true, y_pred, weights +def weighted_masked_rmse(lambdas): + return multitask_loss(lambdas, rmse) + + def multitask_loss(lambdas, loss_func): """ calculate a weighted multi-task loss for a given number of variables with a From c8c140a26bde6d335bcbf1ebb606e33b5b848f0f Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 10:58:03 -0500 Subject: [PATCH 17/32] lambda_aux -> lambdas in rnns --- river_dl/rnns.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index 8f58610..50c8644 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -42,23 +42,24 @@ def __init__( self, hidden_size, gradient_correction=False, - lambda_aux=1, + lambdas=(1, 1), recurrent_dropout=0, dropout=0, grad_log_file=None, ): """ :param hidden_size: [int] the number of hidden units - :param gradient_correction: [bool] - :param lambda_aux: [float] - :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param gradient_correction: [bool] + :param lambdas: [array-like] weights to multiply the loss from each target + variable by + :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero :param grad_log_file: [str] location of gradient log file """ super().__init__() self.gradient_correction = gradient_correction self.grad_log_file = grad_log_file - self.lambda_aux = lambda_aux + self.lambdas = lambdas self.rnn_layer = layers.LSTM( hidden_size, return_sequences=True, @@ -111,7 +112,7 @@ def train_step(self, data): gradient_shared_main, gradient_shared_aux, self.grad_log_file ) combined_gradient = combine_gradients_list( - gradient_shared_main, gradient_shared_aux, lambda_aux=self.lambda_aux + gradient_shared_main, gradient_shared_aux, lambdas=self.lambdas ) # apply gradients @@ -139,12 +140,12 @@ class MultitaskGRUModel(MultitaskLSTMModel): def __init__( self, hidden_size, - lambda_aux=1 + lambdas=(1, 1) ): """ :param hidden_size: [int] the number of hidden units """ - super().__init__(hidden_size, lambda_aux=lambda_aux) + super().__init__(hidden_size, lambdas=lambdas) self.rnn_layer = layers.GRU( hidden_size, return_sequences=True, name="rnn_shared" ) @@ -177,8 +178,8 @@ def get_variables(trainable_variables, name): return [v for v in trainable_variables if name in v.name] -def combine_gradients_list(main_grads, aux_grads, lambda_aux=1): - return [main_grads[i] + lambda_aux * aux_grads[i] for i in range(len(main_grads))] +def combine_gradients_list(main_grads, aux_grads, lambdas=(1, 1)): + return [lambdas[0] * main_grads[i] + lambdas[1] * aux_grads[i] for i in range(len(main_grads))] def adjust_gradient_list(main_grads, aux_grads, logfile=None): From 3e544656e162f333b9c43f1505ffede921a184f8 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 10:58:39 -0500 Subject: [PATCH 18/32] num_tasks, lambdas in train functions --- river_dl/train.py | 36 +++++++++++++++++++++--------------- river_dl/train_model.py | 34 ++++++++++++---------------------- 2 files changed, 33 insertions(+), 37 deletions(-) diff --git a/river_dl/train.py b/river_dl/train.py index e49441a..e03d5e5 100644 --- a/river_dl/train.py +++ b/river_dl/train.py @@ -6,7 +6,7 @@ import tensorflow as tf from river_dl.RGCN import RGCNModel from river_dl.loss_functions import weighted_masked_rmse -from river_dl.rnns import LSTMModel, GRUModel +from river_dl.rnns import SingletaskLSTMModel, MultitaskLSTMModel, SingletaskGRUModel, MultitaskGRUModel def get_data_if_file(d): @@ -27,11 +27,11 @@ def train_model( finetune_epochs, hidden_units, out_dir, - flow_in_temp=False, model_type="rgcn", seed=None, dropout=0, - lambda_aux=1, + lambdas=(1, 1), + num_tasks=1, learning_rate_pre=0.005, learning_rate_ft=0.01, ): @@ -42,14 +42,12 @@ def train_model( :param finetune_epochs: [int] number of finetune epochs :param hidden_units: [int] number of hidden layers :param out_dir: [str] directory where the output files should be written - :param flow_in_temp: [bool] whether the flow predictions should feed - into the temp predictions :param model_type: [str] which model to use (either 'lstm', 'rgcn', or 'lstm_grad_correction') :param seed: [int] random seed - :param lambda_aux: [float] weight between 0 and 1. How much - to weight the auxiliary rmse is weighted compared to the main rmse. The - difference between one and lambda becomes the main rmse weight. + :param lambdas: [array-like] weights to multiply the loss from each target + variable by + :param num_tasks: [int] number of tasks (outputs to be predicted) :param learning_rate_pre: [float] the pretrain learning rate :param learning_rate_ft: [float] the finetune learning rate :return: [tf model] finetuned model @@ -71,25 +69,33 @@ def train_model( batch_size = num_years if model_type == "lstm": - model = LSTMModel(hidden_units, lambda_aux=lambda_aux) + if num_tasks == 1: + model = SingletaskLSTMModel(hidden_units) + elif num_tasks == 2: + model = MultitaskLSTMModel(hidden_units, lambdas=lambdas) elif model_type == "rgcn": model = RGCNModel( hidden_units, - flow_in_temp=flow_in_temp, + num_tasks=num_tasks, A=dist_matrix, rand_seed=seed, ) elif model_type == "lstm_grad_correction": grad_log_file = os.path.join(out_dir, "grad_correction.txt") - model = LSTMModel( + model = MultitaskLSTMModel( hidden_units, gradient_correction=True, - lambda_aux=lambda_aux, + lambdas=lambdas, dropout=dropout, grad_log_file=grad_log_file, ) elif model_type == "gru": - model = GRUModel(hidden_units, lambda_aux=lambda_aux) + if num_tasks == 1: + model = SingletaskGRUModel(hidden_units) + elif num_tasks == 2: + model = MultitaskGRUModel(hidden_units, lambdas=lambdas) + else: + raise ValueError(f"The 'model_type' provided ({model_type}) is not supported") if seed: os.environ["PYTHONHASHSEED"] = str(seed) @@ -110,7 +116,7 @@ def train_model( ) if model_type == "rgcn": - model.compile(optimizer_pre, loss=weighted_masked_rmse(lambda_aux=lambda_aux)) + model.compile(optimizer_pre, loss=weighted_masked_rmse(lambdas=lambdas)) else: model.compile(optimizer_pre) @@ -141,7 +147,7 @@ def train_model( optimizer_ft = tf.optimizers.Adam(learning_rate=learning_rate_ft) if model_type == "rgcn": - model.compile(optimizer_ft, loss=weighted_masked_rmse(lambda_aux=lambda_aux)) + model.compile(optimizer_ft, loss=weighted_masked_rmse(lambdas=lambdas)) else: model.compile(optimizer_ft) diff --git a/river_dl/train_model.py b/river_dl/train_model.py index acffe3b..d47259b 100644 --- a/river_dl/train_model.py +++ b/river_dl/train_model.py @@ -23,13 +23,6 @@ parser.add_argument( "-f", "--finetune_epochs", help="number of finetune" "epochs", type=int ) -parser.add_argument( - "-q", - "--flow-in-temp", - help="whether or not to do flow\ - in temp", - action="store_true", -) parser.add_argument( "--pt_learn_rate", help="learning rate for pretraining", @@ -45,31 +38,28 @@ parser.add_argument( "--model", help="type of model to train", - choices=["lstm", "rgcn"], + choices=["lstm", "rgcn", "gru"], default="rgcn", ) parser.add_argument( - "--lambda_aux", help="lambda for weighting aux gradient", default=1.0, type=float + "--num_tasks", help="number of tasks (outputs to be predicted)", default=1, type=int +) +parser.add_argument( + "--lambdas", help="lambdas for weighting variable losses", default=[1, 1], type=list ) args = parser.parse_args() -flow_in_temp = args.flow_in_temp -in_data_file = args.in_data -hidden_units = args.hidden_units -out_dir = args.outdir -pt_epochs = args.pretrain_epochs -ft_epochs = args.finetune_epochs # -------- train ------ model = train_model( - in_data_file, - pt_epochs, - ft_epochs, - hidden_units, - out_dir=out_dir, - flow_in_temp=flow_in_temp, - lambda_aux=args.lambda_aux, + args.in_data_file, + args.pretrain_epochs, + args.finetune_epochs, + args.hidden_units, + out_dir=args.out_dir, + num_tasks=args.num_tasks, + lambdas=args.lambdas, seed=args.random_seed, learning_rate_ft=args.ft_learn_rate, learning_rate_pre=args.pt_learn_rate, From 17bfab8c79a2cc46e04c2a936980bf1ffaaa12ce Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 11:51:24 -0500 Subject: [PATCH 19/32] [#106] taking train_step out in rnns (just add losses together for multitask) --- river_dl/rnns.py | 167 +++++++---------------------------------------- 1 file changed, 22 insertions(+), 145 deletions(-) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index 50c8644..c554c38 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -2,13 +2,13 @@ from __future__ import print_function, division import tensorflow as tf from tensorflow.keras import layers -from river_dl.loss_functions import nnse_masked_one_var, nnse_one_var_samplewise -class SingletaskLSTMModel(tf.keras.Model): +class LSTMModel(tf.keras.Model): def __init__( self, hidden_size, + num_tasks=1, recurrent_dropout=0, dropout=0, ): @@ -18,6 +18,7 @@ def __init__( :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero """ super().__init__() + self.num_tasks = num_tasks, self.rnn_layer = layers.LSTM( hidden_size, return_sequences=True, @@ -27,163 +28,39 @@ def __init__( dropout=dropout ) self.dense_main = layers.Dense(1, name="dense_main") + if self.num_tasks == 2: + self.dense_aux = layers.Dense(1, name="dense_aux") self.h = None self.c = None @tf.function def call(self, inputs, **kwargs): x, self.h, self.c = self.rnn_layer(inputs) - main_prediction = self.dense_main(x) - return main_prediction - - -class MultitaskLSTMModel(tf.keras.Model): - def __init__( - self, - hidden_size, - gradient_correction=False, - lambdas=(1, 1), - recurrent_dropout=0, - dropout=0, - grad_log_file=None, - ): - """ - :param hidden_size: [int] the number of hidden units - :param gradient_correction: [bool] - :param lambdas: [array-like] weights to multiply the loss from each target - variable by - :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero - :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero - :param grad_log_file: [str] location of gradient log file - """ - super().__init__() - self.gradient_correction = gradient_correction - self.grad_log_file = grad_log_file - self.lambdas = lambdas - self.rnn_layer = layers.LSTM( - hidden_size, - return_sequences=True, - stateful=True, - return_state=True, - name="rnn_shared", - recurrent_dropout=recurrent_dropout, - dropout=dropout - ) - self.dense_main = layers.Dense(1, name="dense_main") - self.dense_aux = layers.Dense(1, name="dense_aux") - self.h = None - self.c = None - - @tf.function - def call(self, inputs, **kwargs): - x, self.h, self.c = self.rnn_layer(inputs) - main_prediction = self.dense_main(x) - aux_prediction = self.dense_aux(x) - return tf.concat([main_prediction, aux_prediction], axis=2) - - @tf.function - def train_step(self, data): - x, y = data - - # If I don't do one forward pass before starting the gradient tape, - # the thing hangs - _ = self(x) - with tf.GradientTape(persistent=True) as tape: - y_pred = self(x, training=True) # forward pass - - loss_main = nnse_one_var_samplewise(y, y_pred, 0, self.tasks) - loss_aux = nnse_one_var_samplewise(y, y_pred, 1, self.tasks) - - trainable_vars = self.trainable_variables - - main_out_vars = get_variables(trainable_vars, "dense_main") - aux_out_vars = get_variables(trainable_vars, "dense_aux") - shared_vars = get_variables(trainable_vars, "rnn_shared") - - # get gradients - gradient_main_out = tape.gradient(loss_main, main_out_vars) - gradient_aux_out = tape.gradient(loss_aux, aux_out_vars) - gradient_shared_main = tape.gradient(loss_main, shared_vars) - gradient_shared_aux = tape.gradient(loss_aux, shared_vars) - - if self.gradient_correction: - # adjust auxiliary gradient - gradient_shared_aux = adjust_gradient_list( - gradient_shared_main, gradient_shared_aux, self.grad_log_file - ) - combined_gradient = combine_gradients_list( - gradient_shared_main, gradient_shared_aux, lambdas=self.lambdas - ) - - # apply gradients - self.optimizer.apply_gradients(zip(gradient_main_out, main_out_vars)) - self.optimizer.apply_gradients(zip(gradient_aux_out, aux_out_vars)) - self.optimizer.apply_gradients(zip(combined_gradient, shared_vars)) - return {"loss_main": loss_main, "loss_aux": loss_aux} - - -class SingletaskGRUModel(SingletaskLSTMModel): + if self.num_tasks == 1: + main_prediction = self.dense_main(x) + return main_prediction + elif self.num_tasks == 2: + main_prediction = self.dense_main(x) + aux_prediction = self.dense_aux(x) + return tf.concat([main_prediction, aux_prediction], axis=2) + else: + raise ValueError(f'This model only supports 1 or 2 tasks (not {self.num_tasks})') + + +class GRUModel(LSTMModel): def __init__( self, hidden_size, + num_tasks=1, + dropout=0, + recurrent_dropout=0, ): """ :param hidden_size: [int] the number of hidden units """ - super().__init__(hidden_size) - self.rnn_layer = layers.GRU( - hidden_size, return_sequences=True, name="rnn_shared" - ) - - -class MultitaskGRUModel(MultitaskLSTMModel): - def __init__( - self, - hidden_size, - lambdas=(1, 1) - ): - """ - :param hidden_size: [int] the number of hidden units - """ - super().__init__(hidden_size, lambdas=lambdas) + super().__init__(hidden_size, num_tasks=num_tasks) self.rnn_layer = layers.GRU( - hidden_size, return_sequences=True, name="rnn_shared" + hidden_size, recurrent_dropout=recurrent_dropout, dropout=dropout, return_sequences=True, name="rnn_shared" ) -def adjust_gradient(main_grad, aux_grad, logfile=None): - # flatten tensors - main_grad_flat = tf.reshape(main_grad, [-1]) - aux_grad_flat = tf.reshape(aux_grad, [-1]) - - # project and adjust - projection = ( - tf.minimum(tf.reduce_sum(main_grad_flat * aux_grad_flat), 0) - * main_grad_flat - / tf.reduce_sum(main_grad_flat * main_grad_flat) - ) - if logfile: - logfile = "file://" + logfile - tf.print(tf.reduce_sum(projection), output_stream=logfile, sep=",") - projection = tf.cond( - tf.math.is_nan(tf.reduce_sum(projection)), - lambda: tf.zeros(aux_grad_flat.shape), - lambda: projection, - ) - adjusted = aux_grad_flat - projection - return tf.reshape(adjusted, aux_grad.shape) - - -def get_variables(trainable_variables, name): - return [v for v in trainable_variables if name in v.name] - - -def combine_gradients_list(main_grads, aux_grads, lambdas=(1, 1)): - return [lambdas[0] * main_grads[i] + lambdas[1] * aux_grads[i] for i in range(len(main_grads))] - - -def adjust_gradient_list(main_grads, aux_grads, logfile=None): - return [ - adjust_gradient(main_grads[i], aux_grads[i], logfile) - for i in range(len(main_grads)) - ] From 2c7580b94c9c602a74f2d79d5dd106f177b5398f Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 11:53:14 -0500 Subject: [PATCH 20/32] [#106] provide loss_func to train func; compiles rnns --- river_dl/train.py | 42 ++++++++++++------------------------------ 1 file changed, 12 insertions(+), 30 deletions(-) diff --git a/river_dl/train.py b/river_dl/train.py index e03d5e5..3bfb551 100644 --- a/river_dl/train.py +++ b/river_dl/train.py @@ -5,8 +5,7 @@ import datetime import tensorflow as tf from river_dl.RGCN import RGCNModel -from river_dl.loss_functions import weighted_masked_rmse -from river_dl.rnns import SingletaskLSTMModel, MultitaskLSTMModel, SingletaskGRUModel, MultitaskGRUModel +from river_dl.rnns import LSTMModel, GRUModel def get_data_if_file(d): @@ -26,11 +25,12 @@ def train_model( pretrain_epochs, finetune_epochs, hidden_units, + loss_func, out_dir, model_type="rgcn", seed=None, dropout=0, - lambdas=(1, 1), + recurrent_dropout=0, num_tasks=1, learning_rate_pre=0.005, learning_rate_ft=0.01, @@ -41,12 +41,13 @@ def train_model( :param pretrain_epochs: [int] number of pretrain epochs :param finetune_epochs: [int] number of finetune epochs :param hidden_units: [int] number of hidden layers + :param loss_func: [function] loss function that the model will be fit to :param out_dir: [str] directory where the output files should be written :param model_type: [str] which model to use (either 'lstm', 'rgcn', or - 'lstm_grad_correction') + 'gru') :param seed: [int] random seed - :param lambdas: [array-like] weights to multiply the loss from each target - variable by + :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero :param num_tasks: [int] number of tasks (outputs to be predicted) :param learning_rate_pre: [float] the pretrain learning rate :param learning_rate_ft: [float] the finetune learning rate @@ -69,31 +70,18 @@ def train_model( batch_size = num_years if model_type == "lstm": - if num_tasks == 1: - model = SingletaskLSTMModel(hidden_units) - elif num_tasks == 2: - model = MultitaskLSTMModel(hidden_units, lambdas=lambdas) + model = LSTMModel(hidden_units, num_tasks=num_tasks, recurrent_dropout=recurrent_dropout, dropout=dropout) elif model_type == "rgcn": model = RGCNModel( hidden_units, num_tasks=num_tasks, A=dist_matrix, rand_seed=seed, - ) - elif model_type == "lstm_grad_correction": - grad_log_file = os.path.join(out_dir, "grad_correction.txt") - model = MultitaskLSTMModel( - hidden_units, - gradient_correction=True, - lambdas=lambdas, dropout=dropout, - grad_log_file=grad_log_file, + recurrent_dropout=recurrent_dropout ) elif model_type == "gru": - if num_tasks == 1: - model = SingletaskGRUModel(hidden_units) - elif num_tasks == 2: - model = MultitaskGRUModel(hidden_units, lambdas=lambdas) + model = GRUModel(hidden_units, num_tasks=num_tasks, recurrent_dropout=recurrent_dropout, dropout=dropout) else: raise ValueError(f"The 'model_type' provided ({model_type}) is not supported") @@ -115,10 +103,7 @@ def train_model( [io_data["y_pre_trn"], io_data["y_pre_wgts"]], axis=2 ) - if model_type == "rgcn": - model.compile(optimizer_pre, loss=weighted_masked_rmse(lambdas=lambdas)) - else: - model.compile(optimizer_pre) + model.compile(optimizer_pre, loss=loss_func) csv_log_pre = tf.keras.callbacks.CSVLogger( os.path.join(out_dir, f"pretrain_log.csv") @@ -146,10 +131,7 @@ def train_model( if finetune_epochs > 0: optimizer_ft = tf.optimizers.Adam(learning_rate=learning_rate_ft) - if model_type == "rgcn": - model.compile(optimizer_ft, loss=weighted_masked_rmse(lambdas=lambdas)) - else: - model.compile(optimizer_ft) + model.compile(optimizer_ft, loss=loss_func) csv_log_ft = tf.keras.callbacks.CSVLogger( os.path.join(out_dir, "finetune_log.csv") From 37995092a6ebadd710211f70f08cacac4fb068ff Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 11:54:42 -0500 Subject: [PATCH 21/32] [#98] multitask nse, kge functions; rm weights --- river_dl/loss_functions.py | 40 ++++++++------------------------------ 1 file changed, 8 insertions(+), 32 deletions(-) diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 431b972..198daf8 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -1,4 +1,3 @@ -import numpy as np import tensorflow as tf @@ -69,38 +68,20 @@ def samplewise_nnse_loss(y_true, y_pred): return 1 - nnse_val -def nnse_masked_one_var(data, y_pred, var_idx, tasks): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx, tasks) - return nnse_loss(y_true, y_pred) +def multitask_nse(lambdas): + return multitask_loss(lambdas, nnse_loss) -def nnse_one_var_samplewise(data, y_pred, var_idx, tasks): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx, tasks) - return samplewise_nnse_loss(y_true, y_pred) +def multitask_samplewise_nse(lambdas): + return multitask_loss(lambdas, samplewise_nnse_loss) -def y_data_components(data, y_pred, var_idx, tasks): - weights = data[:, :, -tasks:] - y_true = data[:, :, :-tasks] - - # ensure y_pred, weights, and y_true are all tensors the same data type - y_true = tf.convert_to_tensor(y_true) - weights = tf.convert_to_tensor(weights) - y_true = tf.cast(y_true, y_pred.dtype) - weights = tf.cast(weights, y_pred.dtype) - - # make all zero-weighted observations 'nan' so they don't get counted - # at all in the loss calculation - y_true = tf.where(weights == 0, np.nan, y_true) +def multitask_rmse(lambdas): + return multitask_loss(lambdas, rmse) - weights = weights[:, :, var_idx] - y_true = y_true[:, :, var_idx] - y_pred = y_pred[:, :, var_idx] - return y_true, y_pred, weights - -def weighted_masked_rmse(lambdas): - return multitask_loss(lambdas, rmse) +def multitask_kge(lambdas): + return multitask_loss(lambdas, kge_loss) def multitask_loss(lambdas, loss_func): @@ -185,10 +166,5 @@ def kge_norm_loss(y_true, y_pred): return 1 - norm_kge(y_true, y_pred) -def kge_loss_one_var(data, y_pred, var_idx, tasks): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx, tasks) - return kge_loss(y_true, y_pred) - - def kge_loss(y_true, y_pred): return -1 * kge(y_true, y_pred) From a4d54e2b1187e4241eb650d1f0a9cea06315aaea Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 11:56:07 -0500 Subject: [PATCH 22/32] [#106] match train cli with train.py fxn --- river_dl/train_model.py | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/river_dl/train_model.py b/river_dl/train_model.py index d47259b..30c1d30 100644 --- a/river_dl/train_model.py +++ b/river_dl/train_model.py @@ -1,6 +1,23 @@ -import os import argparse from river_dl.train import train_model +import river_dl.loss_functions as lf + + +def get_loss_func_from_str(loss_func_str, lambdas=None): + if loss_func_str == 'rmse': + return lf.rmse + elif loss_func_str == 'nse': + return lf.nse + elif loss_func_str == 'kge': + return lf.kge + elif loss_func_str == 'multitask_rmse': + return lf.multitask_rmse(lambdas) + elif loss_func_str == 'multitask_nse': + return lf.multitask_nse(lambdas) + elif loss_func_str == 'multitask_kge': + return lf.multitask_kge(lambdas) + else: + raise ValueError(f'loss function {loss_func_str} not supported') parser = argparse.ArgumentParser() @@ -44,13 +61,25 @@ parser.add_argument( "--num_tasks", help="number of tasks (outputs to be predicted)", default=1, type=int ) +parser.add_argument( + "--loss_func", help="loss function", default='rmse', type=str, + choices=["rmse", "nse", "kge", "multitask_rmse", "multitask_kge", "multitask_nse"], +) +parser.add_argument( + "--dropout", help="dropout rate", default=0, type=float +) +parser.add_argument( + "--recurrent_dropout", help="recurrent dropout", default=0, type=float +) parser.add_argument( "--lambdas", help="lambdas for weighting variable losses", default=[1, 1], type=list ) - args = parser.parse_args() +loss_func = get_loss_func_from_str(args.loss_func) + + # -------- train ------ model = train_model( args.in_data_file, @@ -59,7 +88,9 @@ args.hidden_units, out_dir=args.out_dir, num_tasks=args.num_tasks, - lambdas=args.lambdas, + loss_func=loss_func, + dropout=args.dropout, + recurrent_dropout=args.recurrent_dropout, seed=args.random_seed, learning_rate_ft=args.ft_learn_rate, learning_rate_pre=args.pt_learn_rate, From 0f568aeab2052914bbcf011831cfe21e3e4204aa Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 14:03:54 -0500 Subject: [PATCH 23/32] take out unneeded check on h_/c_init in RGCN this check throws error and is handled upstream --- river_dl/RGCN.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index 68d58cc..d61f909 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -98,15 +98,10 @@ def __init__( def call(self, inputs, **kwargs): h_list = [] c_list = [] - graph_size = self.A.shape[0] n_steps = inputs.shape[1] # set the initial h & c states to the supplied h and c states if using DA, or 0's otherwise - if kwargs.get('h_init'): - hidden_state_prev = tf.cast(kwargs['h_init'], tf.float32) - cell_state_prev = tf.cast(kwargs['c_init'], tf.float32) - else: - hidden_state_prev = tf.zeros([graph_size, self.hidden_size]) - cell_state_prev = tf.zeros([graph_size, self.hidden_size]) + hidden_state_prev = tf.cast(kwargs['h_init'], tf.float32) + cell_state_prev = tf.cast(kwargs['c_init'], tf.float32) for t in range(n_steps): h_graph = tf.nn.tanh( tf.matmul( From 6cd7b52684ee2bb00c4af2397d85025c1fb00d41 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 14:05:32 -0500 Subject: [PATCH 24/32] [#98] don't pass weights to `fit` call --- river_dl/train.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/river_dl/train.py b/river_dl/train.py index 3bfb551..7fbf151 100644 --- a/river_dl/train.py +++ b/river_dl/train.py @@ -99,9 +99,7 @@ def train_model( # use built in 'fit' method unless model is grad correction x_trn_pre = io_data["x_trn"] # combine with weights to pass to loss function - y_trn_pre = np.concatenate( - [io_data["y_pre_trn"], io_data["y_pre_wgts"]], axis=2 - ) + y_trn_pre = io_data["y_pre_trn"] model.compile(optimizer_pre, loss=loss_func) @@ -138,9 +136,7 @@ def train_model( ) x_trn_obs = io_data["x_trn"] - y_trn_obs = np.concatenate( - [io_data["y_obs_trn"], io_data["y_obs_wgts"]], axis=2 - ) + y_trn_obs = io_data["y_obs_trn"] model.fit( x=x_trn_obs, From 9bc45cab5e5ce0dd4b5e773081224905bd3fa838 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 14:06:30 -0500 Subject: [PATCH 25/32] add `num_tasks` to predict fxns --- river_dl/predict.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/river_dl/predict.py b/river_dl/predict.py index d4822b4..3d8a7cc 100644 --- a/river_dl/predict.py +++ b/river_dl/predict.py @@ -41,7 +41,7 @@ def load_model_from_weights( model_weights_dir, hidden_size, dist_matrix=None, - flow_in_temp=False, + num_tasks=1, ): """ load a TF model from the model weights directory @@ -49,15 +49,14 @@ def load_model_from_weights( :param model_weights_dir: [str] directory to saved model weights :param hidden_size: [int] the number of hidden units in model :param dist_matrix: [np array] the distance matrix if using 'rgcn' - :param flow_in_temp: [bool] whether the flow should be an input into temp :return: """ if model_type == "rgcn": - model = RGCNModel(hidden_size, A=dist_matrix, flow_in_temp=flow_in_temp) + model = RGCNModel(hidden_size, A=dist_matrix, num_tasks=num_tasks) elif model_type.startswith("lstm"): - model = LSTMModel(hidden_size) + model = LSTMModel(hidden_size, num_tasks=num_tasks) elif model_type == "gru": - model = GRUModel(hidden_size) + model = GRUModel(hidden_size, num_tasks=num_tasks) else: raise ValueError( f'model_type must be "lstm", "gru" or "rgcn", (not {model_type})' @@ -74,7 +73,7 @@ def predict_from_io_data( io_data, partition, outfile, - flow_in_temp=False, + num_tasks=1, logged_q=False, ): """ @@ -86,7 +85,6 @@ def predict_from_io_data( :param partition: [str] must be 'trn' or 'tst'; whether you want to predict for the train or the dev period :param outfile: [str] the file where the output data should be stored - :param flow_in_temp: [bool] whether the flow should be an input into temp :param logged_q: [bool] whether the discharge was logged in training. if True the exponent of the discharge will be taken in the model unscaling :return: [pd dataframe] predictions @@ -97,7 +95,7 @@ def predict_from_io_data( model_weights_dir, hidden_size, io_data.get("dist_matrix"), - flow_in_temp, + num_tasks=num_tasks, ) if partition != "trn": @@ -279,9 +277,9 @@ def predict_from_arbitrary_data( model_weights_dir, model_type, hidden_size, + num_tasks=1, seq_len=365, dist_matrix=None, - flow_in_temp=False, logged_q=False, ): """ @@ -303,8 +301,6 @@ def predict_from_arbitrary_data( :param seq_len: [int] length of input sequences given to model :param dist_matrix: [np array] the distance matrix if using 'rgcn'. if not provided, will look for it in the "train_io_data" file. - :param flow_in_temp: [bool] whether the flow should be an input into temp - for the rgcn model :param logged_q: [bool] whether the model predicted log of discharge. if true, the exponent of the discharge will be executed :return: [pd dataframe] the predictions @@ -320,7 +316,8 @@ def predict_from_arbitrary_data( ) model = load_model_from_weights( - model_type, model_weights_dir, hidden_size, dist_matrix, flow_in_temp, + model_type, model_weights_dir, hidden_size, dist_matrix, + um_tasks=num_tasks ) ds = xr.open_zarr(raw_data_file) From 63bb515dc716a0b1eb66c46252cacde6d68b7aa5 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 14:07:26 -0500 Subject: [PATCH 26/32] Snakefile updates for lambdas, num_tasks, loss_func --- Snakefile | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/Snakefile b/Snakefile index 9611aff..d2a9c1b 100644 --- a/Snakefile +++ b/Snakefile @@ -5,9 +5,11 @@ from river_dl.evaluate import combined_metrics from river_dl.postproc_utils import plot_obs from river_dl.predict import predict_from_io_data from river_dl.train import train_model +from river_dl import loss_functions as lf out_dir = config['out_dir'] code_dir = config['code_dir'] +loss_function = lf.multitask_rmse(config['lambdas']) rule all: input: @@ -15,11 +17,6 @@ rule all: outdir=out_dir, metric_type=['overall', 'month', 'reach', 'month_reach'], ), - expand( "{outdir}/{plt_variable}_{partition}.png", - outdir=out_dir, - plt_variable=['temp', 'flow'], - partition=['trn', 'val'], - ), rule prep_io_data: input: @@ -62,7 +59,7 @@ rule prep_io_data: # shell: # """ # module load analytics cuda10.1/toolkit/10.1.105 -# run_training -e /home/jsadler/.conda/envs/rgcn --no-node-list "python {code_dir}/train_model.py -o {params.run_dir} -i {input[0]} -p {params.pt_epochs} -f {params.ft_epochs} --lamb {params.lamb} --model rgcn -s 135" +# run_training -e /home/jsadler/.conda/envs/rgcn --no-node-list "python {code_dir}/train_model.py -o {params.run_dir} -i {input[0]} -p {params.pt_epochs} -f {params.ft_epochs} --lambdas {params.lamb} --loss_func multitask_rmse --model rgcn -s 135" # """ @@ -79,7 +76,7 @@ rule train_model_local_or_cpu: run_dir=lambda wildcards, output: os.path.split(output[0][:-1])[0], run: train_model(input[0], config['pt_epochs'], config['ft_epochs'], config['hidden_size'], - params.run_dir, model_type='rgcn', lamb=config['lamb']) + loss_func=loss_function, out_dir=params.run_dir, model_type='rgcn', num_tasks=2) rule make_predictions: input: @@ -93,7 +90,7 @@ rule make_predictions: predict_from_io_data(model_type='rgcn', model_weights_dir=model_dir, hidden_size=config['hidden_size'], io_data=input[1], partition=wildcards.partition, outfile=output[0], - logged_q=False) + logged_q=False, num_tasks=2) def get_grp_arg(wildcards): From 8c80003c4f10142efdde4bc1295dcdc868a31dfd Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 14:10:12 -0500 Subject: [PATCH 27/32] RGCN `states` attribute; just final states --- config.yml | 11 +++++------ river_dl/RGCN.py | 6 ++---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/config.yml b/config.yml index 037eb49..643da55 100644 --- a/config.yml +++ b/config.yml @@ -1,16 +1,15 @@ # Input files -obs_flow: "/home/jsadler/drb_data/obs_flow_full_raw" -obs_temp: "/home/jsadler/drb_data/obs_temp_full" -sntemp_file: "/home/jsadler/drb_data/uncal_sntemp_input_output" -catchment_attr: "/home/jsadler/drb_data/seg_attr_drb.feather" -dist_matrix: "/home/jsadler/drb_data/distance_matrix.npz" +obs_flow: "../drb-dl-model/data/in/obs_flow_subset" +obs_temp: "../drb-dl-model/data/in/obs_temp_subset" +sntemp_file: "../drb-dl-model/data/in/uncal_sntemp_input_output_subset" +dist_matrix: "../drb-dl-model/data/in/distance_matrix_subset.npz" out_dir: "test_val_functionality" code_dir: "/home/jsadler/river-dl/river_dl" x_vars: ["seg_rain", "seg_tave_air", "seginc_swrad", "seg_length", "seginc_potet", "seg_slope", "seg_humid", "seg_elev"] primary_variable: "flow" -lamb: 1 +lambdas: [100, 100] train_start_date: - '1985-10-01' diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index d61f909..7ac95d0 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -178,8 +178,7 @@ def __init__( dropout, rand_seed) - self.h_gr = None - self.c_gr = None + self.states = None self.dense_main = layers.Dense(1, name="dense_main") if self.num_tasks == 2: @@ -190,8 +189,7 @@ def call(self, inputs, **kwargs): h_init = kwargs.get('h_init', tf.zeros([batch_size, self.hidden_size])) c_init = kwargs.get('c_init', tf.zeros([batch_size, self.hidden_size])) h_gr, c_gr = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init) - self.h_gr = h_gr - self.c_gr = c_gr + self.states = h_gr[:, -1, :], c_gr[:, -1, :] if self.num_tasks == 1: main_prediction = self.dense_main(h_gr) From 3eca6f250e2de48db7fd5fbe66ea172d4e09d7ba Mon Sep 17 00:00:00 2001 From: Jeff Sadler Date: Fri, 4 Jun 2021 14:13:33 -0500 Subject: [PATCH 28/32] typo in predict --- river_dl/predict.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river_dl/predict.py b/river_dl/predict.py index 3d8a7cc..597962a 100644 --- a/river_dl/predict.py +++ b/river_dl/predict.py @@ -317,7 +317,7 @@ def predict_from_arbitrary_data( model = load_model_from_weights( model_type, model_weights_dir, hidden_size, dist_matrix, - um_tasks=num_tasks + num_tasks=num_tasks ) ds = xr.open_zarr(raw_data_file) From 4527d673724dfc48a524ba2dc3ac787ae5ea860c Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 14:23:19 -0500 Subject: [PATCH 29/32] Black formatting and docstring corrections --- river_dl/RGCN.py | 62 ++++++++++++++++++++------------------ river_dl/loss_functions.py | 2 ++ river_dl/predict.py | 29 ++++++++++-------- river_dl/rnns.py | 39 +++++++++++++----------- river_dl/train.py | 26 ++++++++++++---- river_dl/train_model.py | 42 +++++++++++++++++--------- 6 files changed, 119 insertions(+), 81 deletions(-) diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index 7ac95d0..6e40bb9 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -12,19 +12,16 @@ class RGCN(layers.Layer): def __init__( - self, - hidden_size, - A, - recurrent_dropout=0, - dropout=0, - rand_seed=None, + self, hidden_size, A, recurrent_dropout=0, dropout=0, rand_seed=None, ): """ :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix - :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero - :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero + :param recurrent_dropout: [float] value between 0 and 1 for the + probability of a recurrent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an + input element to be zero :param rand_seed: [int] the random seed for initialization """ super().__init__() @@ -32,7 +29,9 @@ def __init__( self.A = A.astype("float32") # set up the layer - self.lstm = tf.keras.layers.LSTMCell(hidden_size, recurrent_dropout=recurrent_dropout, dropout=dropout) + self.lstm = tf.keras.layers.LSTMCell( + hidden_size, recurrent_dropout=recurrent_dropout, dropout=dropout + ) ### set up the weights ### w_initializer = tf.random_normal_initializer( @@ -99,9 +98,10 @@ def call(self, inputs, **kwargs): h_list = [] c_list = [] n_steps = inputs.shape[1] - # set the initial h & c states to the supplied h and c states if using DA, or 0's otherwise - hidden_state_prev = tf.cast(kwargs['h_init'], tf.float32) - cell_state_prev = tf.cast(kwargs['c_init'], tf.float32) + # set the initial h & c states to the supplied h and c states if using + # DA, or 0's otherwise + hidden_state_prev = tf.cast(kwargs["h_init"], tf.float32) + cell_state_prev = tf.cast(kwargs["c_init"], tf.float32) for t in range(n_steps): h_graph = tf.nn.tanh( tf.matmul( @@ -135,10 +135,10 @@ def call(self, inputs, **kwargs): hidden_state_prev = h_update cell_state_prev = c_update - + h_list.append(h_update) c_list.append(c_update) - + h_list = tf.stack(h_list) c_list = tf.stack(c_list) h_list = tf.transpose(h_list, [1, 0, 2]) @@ -148,20 +148,23 @@ def call(self, inputs, **kwargs): class RGCNModel(tf.keras.Model): def __init__( - self, - hidden_size, - A, + self, + hidden_size, + A, num_tasks=1, - recurrent_dropout=0, + recurrent_dropout=0, dropout=0, rand_seed=None, ): """ :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix - :param num_tasks: [int] number of prediction tasks to perform - currently supports either 1 or 2 prediction tasks - :param recurrent_dropout: [float] value between 0 and 1 for the probability of a recurrent element to be zero - :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero + :param num_tasks: [int] number of prediction tasks to perform - + currently supports either 1 or 2 prediction tasks + :param recurrent_dropout: [float] value between 0 and 1 for the + probability of a recurrent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an + input element to be zero into the temp predictions :param rand_seed: [int] the random seed for initialization """ @@ -172,12 +175,9 @@ def __init__( self.dropout = dropout self.rgcn_layer = RGCN( - hidden_size, - A, - recurrent_dropout, - dropout, - rand_seed) - + hidden_size, A, recurrent_dropout, dropout, rand_seed + ) + self.states = None self.dense_main = layers.Dense(1, name="dense_main") @@ -186,8 +186,8 @@ def __init__( def call(self, inputs, **kwargs): batch_size = inputs.shape[0] - h_init = kwargs.get('h_init', tf.zeros([batch_size, self.hidden_size])) - c_init = kwargs.get('c_init', tf.zeros([batch_size, self.hidden_size])) + h_init = kwargs.get("h_init", tf.zeros([batch_size, self.hidden_size])) + c_init = kwargs.get("c_init", tf.zeros([batch_size, self.hidden_size])) h_gr, c_gr = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init) self.states = h_gr[:, -1, :], c_gr[:, -1, :] @@ -199,4 +199,6 @@ def call(self, inputs, **kwargs): aux_prediction = self.dense_aux(h_gr) return tf.concat([main_prediction, aux_prediction], axis=2) else: - raise ValueError(f'This model only supports 1 or 2 tasks (not {self.num_tasks})') + raise ValueError( + f"This model only supports 1 or 2 tasks (not {self.num_tasks})" + ) diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 198daf8..799d3c8 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -93,6 +93,7 @@ def multitask_loss(lambdas, loss_func): :param loss_func: [function] Loss function that will be used to calculate the loss of each variable. Must take as input parameters [y_true, y_pred] """ + def combine_loss(y_true, y_pred): losses = [] n_vars = y_pred.shape[-1] @@ -102,6 +103,7 @@ def combine_loss(y_true, y_pred): losses.append(weighted_ind_var_loss) total_loss = sum(losses) return total_loss + return combine_loss diff --git a/river_dl/predict.py b/river_dl/predict.py index 597962a..0364da1 100644 --- a/river_dl/predict.py +++ b/river_dl/predict.py @@ -37,11 +37,7 @@ def unscale_output(y_scl, y_std, y_mean, y_vars, logged_q=False): def load_model_from_weights( - model_type, - model_weights_dir, - hidden_size, - dist_matrix=None, - num_tasks=1, + model_type, model_weights_dir, hidden_size, dist_matrix=None, num_tasks=1, ): """ load a TF model from the model weights directory @@ -49,7 +45,8 @@ def load_model_from_weights( :param model_weights_dir: [str] directory to saved model weights :param hidden_size: [int] the number of hidden units in model :param dist_matrix: [np array] the distance matrix if using 'rgcn' - :return: + :param num_tasks: [int] number of tasks (outputs to be predicted) + :return: TF model """ if model_type == "rgcn": model = RGCNModel(hidden_size, A=dist_matrix, num_tasks=num_tasks) @@ -87,6 +84,7 @@ def predict_from_io_data( :param outfile: [str] the file where the output data should be stored :param logged_q: [bool] whether the discharge was logged in training. if True the exponent of the discharge will be taken in the model unscaling + :param num_tasks: [int] number of tasks (outputs to be predicted) :return: [pd dataframe] predictions """ io_data = get_data_if_file(io_data) @@ -194,11 +192,12 @@ def swap_first_seq_halves(x_data, batch_size): """ first_batch = x_data[:batch_size, :, :] seq_len = x_data.shape[1] - half_size = round(seq_len/2) + half_size = round(seq_len / 2) first_half_first_batch = first_batch[:, :half_size, :] second_half_first_batch = first_batch[:, half_size:, :] - swapped = np.concatenate([second_half_first_batch, first_half_first_batch], - axis=1) + swapped = np.concatenate( + [second_half_first_batch, first_half_first_batch], axis=1 + ) new_x_data = np.concatenate([swapped, x_data], axis=0) return new_x_data @@ -298,6 +297,7 @@ def predict_from_arbitrary_data( weights are stored :param model_type: [str] model to use either 'rgcn', 'lstm', or 'gru' :param hidden_size: [int] the number of hidden units in model + :param num_tasks: [int] number of tasks (outputs to be predicted) :param seq_len: [int] length of input sequences given to model :param dist_matrix: [np array] the distance matrix if using 'rgcn'. if not provided, will look for it in the "train_io_data" file. @@ -316,8 +316,11 @@ def predict_from_arbitrary_data( ) model = load_model_from_weights( - model_type, model_weights_dir, hidden_size, dist_matrix, - num_tasks=num_tasks + model_type, + model_weights_dir, + hidden_size, + dist_matrix, + num_tasks=num_tasks, ) ds = xr.open_zarr(raw_data_file) @@ -332,7 +335,7 @@ def predict_from_arbitrary_data( pred_start_date = datetime.datetime.strptime(pred_start_date, "%Y-%m-%d") # look back half of the sequence length before the prediction start date. # if present, this serves as a half-sequence warm-up period - inputs_start_date = pred_start_date - datetime.timedelta(round(seq_len/2)) + inputs_start_date = pred_start_date - datetime.timedelta(round(seq_len / 2)) # get the "middle" predictions middle_predictions = predict_one_date_range( @@ -359,7 +362,7 @@ def predict_from_arbitrary_data( start_dates_end, logged_q, keep_last_frac=1, - offset=.5, + offset=0.5, swap_halves_of_first_seq=True, ) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index c554c38..b84e22b 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -6,26 +6,25 @@ class LSTMModel(tf.keras.Model): def __init__( - self, - hidden_size, - num_tasks=1, - recurrent_dropout=0, - dropout=0, + self, hidden_size, num_tasks=1, recurrent_dropout=0, dropout=0, ): """ :param hidden_size: [int] the number of hidden units - :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero - :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero + :param num_tasks: [int] number of tasks (outputs to be predicted) + :param recurrent_dropout: [float] value between 0 and 1 for the + probability of a recurrent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an + input element to be zero """ super().__init__() - self.num_tasks = num_tasks, + self.num_tasks = num_tasks self.rnn_layer = layers.LSTM( hidden_size, return_sequences=True, stateful=True, return_state=True, recurrent_dropout=recurrent_dropout, - dropout=dropout + dropout=dropout, ) self.dense_main = layers.Dense(1, name="dense_main") if self.num_tasks == 2: @@ -44,23 +43,27 @@ def call(self, inputs, **kwargs): aux_prediction = self.dense_aux(x) return tf.concat([main_prediction, aux_prediction], axis=2) else: - raise ValueError(f'This model only supports 1 or 2 tasks (not {self.num_tasks})') + raise ValueError( + f"This model only supports 1 or 2 tasks (not {self.num_tasks})" + ) class GRUModel(LSTMModel): def __init__( - self, - hidden_size, - num_tasks=1, - dropout=0, - recurrent_dropout=0, + self, hidden_size, num_tasks=1, dropout=0, recurrent_dropout=0, ): """ :param hidden_size: [int] the number of hidden units + :param num_tasks: [int] number of tasks (outputs to be predicted) + :param recurrent_dropout: [float] value between 0 and 1 for the + probability of a recurrent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an """ super().__init__(hidden_size, num_tasks=num_tasks) self.rnn_layer = layers.GRU( - hidden_size, recurrent_dropout=recurrent_dropout, dropout=dropout, return_sequences=True, name="rnn_shared" + hidden_size, + recurrent_dropout=recurrent_dropout, + dropout=dropout, + return_sequences=True, + name="rnn_shared", ) - - diff --git a/river_dl/train.py b/river_dl/train.py index 7fbf151..13697df 100644 --- a/river_dl/train.py +++ b/river_dl/train.py @@ -46,8 +46,10 @@ def train_model( :param model_type: [str] which model to use (either 'lstm', 'rgcn', or 'gru') :param seed: [int] random seed - :param recurrent_dropout: [float] value between 0 and 1 for the probability of a reccurent element to be zero - :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero + :param recurrent_dropout: [float] value between 0 and 1 for the probability + of a reccurent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an + input element to be zero :param num_tasks: [int] number of tasks (outputs to be predicted) :param learning_rate_pre: [float] the pretrain learning rate :param learning_rate_ft: [float] the finetune learning rate @@ -70,7 +72,12 @@ def train_model( batch_size = num_years if model_type == "lstm": - model = LSTMModel(hidden_units, num_tasks=num_tasks, recurrent_dropout=recurrent_dropout, dropout=dropout) + model = LSTMModel( + hidden_units, + num_tasks=num_tasks, + recurrent_dropout=recurrent_dropout, + dropout=dropout, + ) elif model_type == "rgcn": model = RGCNModel( hidden_units, @@ -78,12 +85,19 @@ def train_model( A=dist_matrix, rand_seed=seed, dropout=dropout, - recurrent_dropout=recurrent_dropout + recurrent_dropout=recurrent_dropout, ) elif model_type == "gru": - model = GRUModel(hidden_units, num_tasks=num_tasks, recurrent_dropout=recurrent_dropout, dropout=dropout) + model = GRUModel( + hidden_units, + num_tasks=num_tasks, + recurrent_dropout=recurrent_dropout, + dropout=dropout, + ) else: - raise ValueError(f"The 'model_type' provided ({model_type}) is not supported") + raise ValueError( + f"The 'model_type' provided ({model_type}) is not supported" + ) if seed: os.environ["PYTHONHASHSEED"] = str(seed) diff --git a/river_dl/train_model.py b/river_dl/train_model.py index 30c1d30..2ad57a4 100644 --- a/river_dl/train_model.py +++ b/river_dl/train_model.py @@ -4,20 +4,20 @@ def get_loss_func_from_str(loss_func_str, lambdas=None): - if loss_func_str == 'rmse': + if loss_func_str == "rmse": return lf.rmse - elif loss_func_str == 'nse': + elif loss_func_str == "nse": return lf.nse - elif loss_func_str == 'kge': + elif loss_func_str == "kge": return lf.kge - elif loss_func_str == 'multitask_rmse': + elif loss_func_str == "multitask_rmse": return lf.multitask_rmse(lambdas) - elif loss_func_str == 'multitask_nse': + elif loss_func_str == "multitask_nse": return lf.multitask_nse(lambdas) - elif loss_func_str == 'multitask_kge': + elif loss_func_str == "multitask_kge": return lf.multitask_kge(lambdas) else: - raise ValueError(f'loss function {loss_func_str} not supported') + raise ValueError(f"loss function {loss_func_str} not supported") parser = argparse.ArgumentParser() @@ -59,20 +59,34 @@ def get_loss_func_from_str(loss_func_str, lambdas=None): default="rgcn", ) parser.add_argument( - "--num_tasks", help="number of tasks (outputs to be predicted)", default=1, type=int + "--num_tasks", + help="number of tasks (outputs to be predicted)", + default=1, + type=int, ) parser.add_argument( - "--loss_func", help="loss function", default='rmse', type=str, - choices=["rmse", "nse", "kge", "multitask_rmse", "multitask_kge", "multitask_nse"], -) -parser.add_argument( - "--dropout", help="dropout rate", default=0, type=float + "--loss_func", + help="loss function", + default="rmse", + type=str, + choices=[ + "rmse", + "nse", + "kge", + "multitask_rmse", + "multitask_kge", + "multitask_nse", + ], ) +parser.add_argument("--dropout", help="dropout rate", default=0, type=float) parser.add_argument( "--recurrent_dropout", help="recurrent dropout", default=0, type=float ) parser.add_argument( - "--lambdas", help="lambdas for weighting variable losses", default=[1, 1], type=list + "--lambdas", + help="lambdas for weighting variable losses", + default=[1, 1], + type=list, ) args = parser.parse_args() From 8d061f8b60fd757ac81771bd1f8cd37f91766577 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 4 Jun 2021 14:27:25 -0500 Subject: [PATCH 30/32] attr for rnns --- river_dl/rnns.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index b84e22b..bb22bcf 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -29,12 +29,12 @@ def __init__( self.dense_main = layers.Dense(1, name="dense_main") if self.num_tasks == 2: self.dense_aux = layers.Dense(1, name="dense_aux") - self.h = None - self.c = None + self.states = None @tf.function def call(self, inputs, **kwargs): - x, self.h, self.c = self.rnn_layer(inputs) + x, h, c = self.rnn_layer(inputs) + self.states = h, c if self.num_tasks == 1: main_prediction = self.dense_main(x) return main_prediction From 1c1641a021ee62925ec0c5b36724407f8926940f Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Mon, 7 Jun 2021 13:40:49 -0500 Subject: [PATCH 31/32] "outputs" -> "variables" in `num_task` docstring --- river_dl/predict.py | 6 +++--- river_dl/rnns.py | 4 ++-- river_dl/train.py | 2 +- river_dl/train_model.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/river_dl/predict.py b/river_dl/predict.py index 0364da1..45e07a4 100644 --- a/river_dl/predict.py +++ b/river_dl/predict.py @@ -45,7 +45,7 @@ def load_model_from_weights( :param model_weights_dir: [str] directory to saved model weights :param hidden_size: [int] the number of hidden units in model :param dist_matrix: [np array] the distance matrix if using 'rgcn' - :param num_tasks: [int] number of tasks (outputs to be predicted) + :param num_tasks: [int] number of tasks (variables to be predicted) :return: TF model """ if model_type == "rgcn": @@ -84,7 +84,7 @@ def predict_from_io_data( :param outfile: [str] the file where the output data should be stored :param logged_q: [bool] whether the discharge was logged in training. if True the exponent of the discharge will be taken in the model unscaling - :param num_tasks: [int] number of tasks (outputs to be predicted) + :param num_tasks: [int] number of tasks (variables to be predicted) :return: [pd dataframe] predictions """ io_data = get_data_if_file(io_data) @@ -297,7 +297,7 @@ def predict_from_arbitrary_data( weights are stored :param model_type: [str] model to use either 'rgcn', 'lstm', or 'gru' :param hidden_size: [int] the number of hidden units in model - :param num_tasks: [int] number of tasks (outputs to be predicted) + :param num_tasks: [int] number of tasks (variables to be predicted) :param seq_len: [int] length of input sequences given to model :param dist_matrix: [np array] the distance matrix if using 'rgcn'. if not provided, will look for it in the "train_io_data" file. diff --git a/river_dl/rnns.py b/river_dl/rnns.py index bb22bcf..1a5c4b1 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -10,7 +10,7 @@ def __init__( ): """ :param hidden_size: [int] the number of hidden units - :param num_tasks: [int] number of tasks (outputs to be predicted) + :param num_tasks: [int] number of tasks (variables to be predicted) :param recurrent_dropout: [float] value between 0 and 1 for the probability of a recurrent element to be zero :param dropout: [float] value between 0 and 1 for the probability of an @@ -54,7 +54,7 @@ def __init__( ): """ :param hidden_size: [int] the number of hidden units - :param num_tasks: [int] number of tasks (outputs to be predicted) + :param num_tasks: [int] number of tasks (variables to be predicted) :param recurrent_dropout: [float] value between 0 and 1 for the probability of a recurrent element to be zero :param dropout: [float] value between 0 and 1 for the probability of an diff --git a/river_dl/train.py b/river_dl/train.py index 13697df..d584caa 100644 --- a/river_dl/train.py +++ b/river_dl/train.py @@ -50,7 +50,7 @@ def train_model( of a reccurent element to be zero :param dropout: [float] value between 0 and 1 for the probability of an input element to be zero - :param num_tasks: [int] number of tasks (outputs to be predicted) + :param num_tasks: [int] number of tasks (variables to be predicted) :param learning_rate_pre: [float] the pretrain learning rate :param learning_rate_ft: [float] the finetune learning rate :return: [tf model] finetuned model diff --git a/river_dl/train_model.py b/river_dl/train_model.py index 2ad57a4..fd8c86c 100644 --- a/river_dl/train_model.py +++ b/river_dl/train_model.py @@ -60,7 +60,7 @@ def get_loss_func_from_str(loss_func_str, lambdas=None): ) parser.add_argument( "--num_tasks", - help="number of tasks (outputs to be predicted)", + help="number of tasks (variables to be predicted)", default=1, type=int, ) From 8362981db22b595440f7ef614b88931a5f229801 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Mon, 7 Jun 2021 13:46:38 -0500 Subject: [PATCH 32/32] can provide h_/c_init to initalize rnn --- river_dl/rnns.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index 1a5c4b1..804d2da 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -17,6 +17,7 @@ def __init__( input element to be zero """ super().__init__() + self.hidden_size = hidden_size self.num_tasks = num_tasks self.rnn_layer = layers.LSTM( hidden_size, @@ -33,6 +34,10 @@ def __init__( @tf.function def call(self, inputs, **kwargs): + batch_size = inputs.shape[0] + h_init = kwargs.get("h_init", tf.zeros([batch_size, self.hidden_size])) + c_init = kwargs.get("c_init", tf.zeros([batch_size, self.hidden_size])) + self.rnn_layer.reset_states(states=[h_init, c_init]) x, h, c = self.rnn_layer(inputs) self.states = h, c if self.num_tasks == 1: