diff --git a/Snakefile b/Snakefile index 9611aff..d2a9c1b 100644 --- a/Snakefile +++ b/Snakefile @@ -5,9 +5,11 @@ from river_dl.evaluate import combined_metrics from river_dl.postproc_utils import plot_obs from river_dl.predict import predict_from_io_data from river_dl.train import train_model +from river_dl import loss_functions as lf out_dir = config['out_dir'] code_dir = config['code_dir'] +loss_function = lf.multitask_rmse(config['lambdas']) rule all: input: @@ -15,11 +17,6 @@ rule all: outdir=out_dir, metric_type=['overall', 'month', 'reach', 'month_reach'], ), - expand( "{outdir}/{plt_variable}_{partition}.png", - outdir=out_dir, - plt_variable=['temp', 'flow'], - partition=['trn', 'val'], - ), rule prep_io_data: input: @@ -62,7 +59,7 @@ rule prep_io_data: # shell: # """ # module load analytics cuda10.1/toolkit/10.1.105 -# run_training -e /home/jsadler/.conda/envs/rgcn --no-node-list "python {code_dir}/train_model.py -o {params.run_dir} -i {input[0]} -p {params.pt_epochs} -f {params.ft_epochs} --lamb {params.lamb} --model rgcn -s 135" +# run_training -e /home/jsadler/.conda/envs/rgcn --no-node-list "python {code_dir}/train_model.py -o {params.run_dir} -i {input[0]} -p {params.pt_epochs} -f {params.ft_epochs} --lambdas {params.lamb} --loss_func multitask_rmse --model rgcn -s 135" # """ @@ -79,7 +76,7 @@ rule train_model_local_or_cpu: run_dir=lambda wildcards, output: os.path.split(output[0][:-1])[0], run: train_model(input[0], config['pt_epochs'], config['ft_epochs'], config['hidden_size'], - params.run_dir, model_type='rgcn', lamb=config['lamb']) + loss_func=loss_function, out_dir=params.run_dir, model_type='rgcn', num_tasks=2) rule make_predictions: input: @@ -93,7 +90,7 @@ rule make_predictions: predict_from_io_data(model_type='rgcn', model_weights_dir=model_dir, hidden_size=config['hidden_size'], io_data=input[1], partition=wildcards.partition, outfile=output[0], - logged_q=False) + logged_q=False, num_tasks=2) def get_grp_arg(wildcards): diff --git a/config.yml b/config.yml index 037eb49..643da55 100644 --- a/config.yml +++ b/config.yml @@ -1,16 +1,15 @@ # Input files -obs_flow: "/home/jsadler/drb_data/obs_flow_full_raw" -obs_temp: "/home/jsadler/drb_data/obs_temp_full" -sntemp_file: "/home/jsadler/drb_data/uncal_sntemp_input_output" -catchment_attr: "/home/jsadler/drb_data/seg_attr_drb.feather" -dist_matrix: "/home/jsadler/drb_data/distance_matrix.npz" +obs_flow: "../drb-dl-model/data/in/obs_flow_subset" +obs_temp: "../drb-dl-model/data/in/obs_temp_subset" +sntemp_file: "../drb-dl-model/data/in/uncal_sntemp_input_output_subset" +dist_matrix: "../drb-dl-model/data/in/distance_matrix_subset.npz" out_dir: "test_val_functionality" code_dir: "/home/jsadler/river-dl/river_dl" x_vars: ["seg_rain", "seg_tave_air", "seginc_swrad", "seg_length", "seginc_potet", "seg_slope", "seg_humid", "seg_elev"] primary_variable: "flow" -lamb: 1 +lambdas: [100, 100] train_start_date: - '1985-10-01' diff --git a/river_dl/RGCN.py b/river_dl/RGCN.py index b112e7b..6e40bb9 100644 --- a/river_dl/RGCN.py +++ b/river_dl/RGCN.py @@ -11,22 +11,27 @@ class RGCN(layers.Layer): - def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None): + def __init__( + self, hidden_size, A, recurrent_dropout=0, dropout=0, rand_seed=None, + ): """ :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix - :param flow_in_temp: [bool] whether the flow predictions should feed - into the temp predictions + :param recurrent_dropout: [float] value between 0 and 1 for the + probability of a recurrent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an + input element to be zero :param rand_seed: [int] the random seed for initialization """ super().__init__() self.hidden_size = hidden_size self.A = A.astype("float32") - self.flow_in_temp = flow_in_temp # set up the layer - self.lstm = tf.keras.layers.LSTMCell(hidden_size) + self.lstm = tf.keras.layers.LSTMCell( + hidden_size, recurrent_dropout=recurrent_dropout, dropout=dropout + ) ### set up the weights ### w_initializer = tf.random_normal_initializer( @@ -88,44 +93,15 @@ def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None): shape=[hidden_size], initializer="zeros", name="b_c" ) - if self.flow_in_temp: - # was W2 - self.W_out_flow = self.add_weight( - shape=[hidden_size, 1], initializer=w_initializer, name="W_out" - ) - # was b2 - self.b_out_flow = self.add_weight( - shape=[1], initializer="zeros", name="b_out" - ) - - self.W_out_temp = self.add_weight( - shape=[hidden_size + 1, 1], - initializer=w_initializer, - name="W_out", - ) - - self.b_out_temp = self.add_weight( - shape=[1], initializer="zeros", name="b_out" - ) - else: - # was W2 - self.W_out = self.add_weight( - shape=[hidden_size, 2], initializer=w_initializer, name="W_out" - ) - # was b2 - self.b_out = self.add_weight( - shape=[2], initializer="zeros", name="b_out" - ) - @tf.function def call(self, inputs, **kwargs): - graph_size = self.A.shape[0] - hidden_state_prev, cell_state_prev = ( - tf.zeros([graph_size, self.hidden_size]), - tf.zeros([graph_size, self.hidden_size]), - ) - out = [] + h_list = [] + c_list = [] n_steps = inputs.shape[1] + # set the initial h & c states to the supplied h and c states if using + # DA, or 0's otherwise + hidden_state_prev = tf.cast(kwargs["h_init"], tf.float32) + cell_state_prev = tf.cast(kwargs["c_init"], tf.float32) for t in range(n_steps): h_graph = tf.nn.tanh( tf.matmul( @@ -157,42 +133,72 @@ def call(self, inputs, **kwargs): + self.b_c ) - if self.flow_in_temp: - out_pred_q = ( - tf.matmul(h_update, self.W_out_flow) + self.b_out_flow - ) - out_pred_t = ( - tf.matmul( - tf.concat([h_update, out_pred_q], axis=1), - self.W_out_temp, - ) - + self.b_out_temp - ) - out_pred = tf.concat([out_pred_t, out_pred_q], axis=1) - else: - out_pred = tf.matmul(h_update, self.W_out) + self.b_out - - out.append(out_pred) - hidden_state_prev = h_update cell_state_prev = c_update - out = tf.stack(out) - out = tf.transpose(out, [1, 0, 2]) - return out + + h_list.append(h_update) + c_list.append(c_update) + + h_list = tf.stack(h_list) + c_list = tf.stack(c_list) + h_list = tf.transpose(h_list, [1, 0, 2]) + c_list = tf.transpose(c_list, [1, 0, 2]) + return h_list, c_list class RGCNModel(tf.keras.Model): - def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None): + def __init__( + self, + hidden_size, + A, + num_tasks=1, + recurrent_dropout=0, + dropout=0, + rand_seed=None, + ): """ :param hidden_size: [int] the number of hidden units :param A: [numpy array] adjacency matrix - :param flow_in_temp: [bool] whether the flow predictions should feed + :param num_tasks: [int] number of prediction tasks to perform - + currently supports either 1 or 2 prediction tasks + :param recurrent_dropout: [float] value between 0 and 1 for the + probability of a recurrent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an + input element to be zero into the temp predictions :param rand_seed: [int] the random seed for initialization """ super().__init__() - self.rgcn_layer = RGCN(hidden_size, A, flow_in_temp, rand_seed) + self.hidden_size = hidden_size + self.num_tasks = num_tasks + self.recurrent_dropout = recurrent_dropout + self.dropout = dropout + + self.rgcn_layer = RGCN( + hidden_size, A, recurrent_dropout, dropout, rand_seed + ) + + self.states = None + + self.dense_main = layers.Dense(1, name="dense_main") + if self.num_tasks == 2: + self.dense_aux = layers.Dense(1, name="dense_aux") def call(self, inputs, **kwargs): - output = self.rgcn_layer(inputs) - return output + batch_size = inputs.shape[0] + h_init = kwargs.get("h_init", tf.zeros([batch_size, self.hidden_size])) + c_init = kwargs.get("c_init", tf.zeros([batch_size, self.hidden_size])) + h_gr, c_gr = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init) + self.states = h_gr[:, -1, :], c_gr[:, -1, :] + + if self.num_tasks == 1: + main_prediction = self.dense_main(h_gr) + return main_prediction + elif self.num_tasks == 2: + main_prediction = self.dense_main(h_gr) + aux_prediction = self.dense_aux(h_gr) + return tf.concat([main_prediction, aux_prediction], axis=2) + else: + raise ValueError( + f"This model only supports 1 or 2 tasks (not {self.num_tasks})" + ) diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 5ed5a18..799d3c8 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -1,4 +1,3 @@ -import numpy as np import tensorflow as tf @@ -69,55 +68,43 @@ def samplewise_nnse_loss(y_true, y_pred): return 1 - nnse_val -def nnse_masked_one_var(data, y_pred, var_idx): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) - return nnse_loss(y_true, y_pred) +def multitask_nse(lambdas): + return multitask_loss(lambdas, nnse_loss) -def nnse_one_var_samplewise(data, y_pred, var_idx): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) - return samplewise_nnse_loss(y_true, y_pred) +def multitask_samplewise_nse(lambdas): + return multitask_loss(lambdas, samplewise_nnse_loss) -def y_data_components(data, y_pred, var_idx): - weights = data[:, :, -2:] - y_true = data[:, :, :-2] +def multitask_rmse(lambdas): + return multitask_loss(lambdas, rmse) - # ensure y_pred, weights, and y_true are all tensors the same data type - y_true = tf.convert_to_tensor(y_true) - weights = tf.convert_to_tensor(weights) - y_true = tf.cast(y_true, y_pred.dtype) - weights = tf.cast(weights, y_pred.dtype) - # make all zero-weighted observations 'nan' so they don't get counted - # at all in the loss calculation - y_true = tf.where(weights == 0, np.nan, y_true) +def multitask_kge(lambdas): + return multitask_loss(lambdas, kge_loss) - weights = weights[:, :, var_idx] - y_true = y_true[:, :, var_idx] - y_pred = y_pred[:, :, var_idx] - return y_true, y_pred, weights - -def rmse_masked_one_var(data, y_pred, var_idx): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) - return rmse(y_true, y_pred) - - -def weighted_masked_rmse(lamb=0.5): +def multitask_loss(lambdas, loss_func): """ - calculate a weighted, masked rmse. - :param lamb: [float] (short for lambda). The factor that the auxiliary loss - will be multiplied by before added to the main loss. + calculate a weighted multi-task loss for a given number of variables with a + given loss function + :param lambdas: [array-like float] The factor that losses will be + multiplied by before being added together. + :param loss_func: [function] Loss function that will be used to calculate + the loss of each variable. Must take as input parameters [y_true, y_pred] """ - def rmse_masked_combined(data, y_pred): - rmse_main = rmse_masked_one_var(data, y_pred, 0) - rmse_aux = rmse_masked_one_var(data, y_pred, 1) - rmse_loss = rmse_main + lamb * rmse_aux - return rmse_loss + def combine_loss(y_true, y_pred): + losses = [] + n_vars = y_pred.shape[-1] + for var_id in range(n_vars): + ind_var_loss = loss_func(y_true[:, :, var_id], y_pred[:, :, var_id]) + weighted_ind_var_loss = lambdas[var_id] * ind_var_loss + losses.append(weighted_ind_var_loss) + total_loss = sum(losses) + return total_loss - return rmse_masked_combined + return combine_loss def mean_masked(y): @@ -181,10 +168,5 @@ def kge_norm_loss(y_true, y_pred): return 1 - norm_kge(y_true, y_pred) -def kge_loss_one_var(data, y_pred, var_idx): - y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) - return kge_loss(y_true, y_pred) - - def kge_loss(y_true, y_pred): return -1 * kge(y_true, y_pred) diff --git a/river_dl/predict.py b/river_dl/predict.py index d4822b4..45e07a4 100644 --- a/river_dl/predict.py +++ b/river_dl/predict.py @@ -37,11 +37,7 @@ def unscale_output(y_scl, y_std, y_mean, y_vars, logged_q=False): def load_model_from_weights( - model_type, - model_weights_dir, - hidden_size, - dist_matrix=None, - flow_in_temp=False, + model_type, model_weights_dir, hidden_size, dist_matrix=None, num_tasks=1, ): """ load a TF model from the model weights directory @@ -49,15 +45,15 @@ def load_model_from_weights( :param model_weights_dir: [str] directory to saved model weights :param hidden_size: [int] the number of hidden units in model :param dist_matrix: [np array] the distance matrix if using 'rgcn' - :param flow_in_temp: [bool] whether the flow should be an input into temp - :return: + :param num_tasks: [int] number of tasks (variables to be predicted) + :return: TF model """ if model_type == "rgcn": - model = RGCNModel(hidden_size, A=dist_matrix, flow_in_temp=flow_in_temp) + model = RGCNModel(hidden_size, A=dist_matrix, num_tasks=num_tasks) elif model_type.startswith("lstm"): - model = LSTMModel(hidden_size) + model = LSTMModel(hidden_size, num_tasks=num_tasks) elif model_type == "gru": - model = GRUModel(hidden_size) + model = GRUModel(hidden_size, num_tasks=num_tasks) else: raise ValueError( f'model_type must be "lstm", "gru" or "rgcn", (not {model_type})' @@ -74,7 +70,7 @@ def predict_from_io_data( io_data, partition, outfile, - flow_in_temp=False, + num_tasks=1, logged_q=False, ): """ @@ -86,9 +82,9 @@ def predict_from_io_data( :param partition: [str] must be 'trn' or 'tst'; whether you want to predict for the train or the dev period :param outfile: [str] the file where the output data should be stored - :param flow_in_temp: [bool] whether the flow should be an input into temp :param logged_q: [bool] whether the discharge was logged in training. if True the exponent of the discharge will be taken in the model unscaling + :param num_tasks: [int] number of tasks (variables to be predicted) :return: [pd dataframe] predictions """ io_data = get_data_if_file(io_data) @@ -97,7 +93,7 @@ def predict_from_io_data( model_weights_dir, hidden_size, io_data.get("dist_matrix"), - flow_in_temp, + num_tasks=num_tasks, ) if partition != "trn": @@ -196,11 +192,12 @@ def swap_first_seq_halves(x_data, batch_size): """ first_batch = x_data[:batch_size, :, :] seq_len = x_data.shape[1] - half_size = round(seq_len/2) + half_size = round(seq_len / 2) first_half_first_batch = first_batch[:, :half_size, :] second_half_first_batch = first_batch[:, half_size:, :] - swapped = np.concatenate([second_half_first_batch, first_half_first_batch], - axis=1) + swapped = np.concatenate( + [second_half_first_batch, first_half_first_batch], axis=1 + ) new_x_data = np.concatenate([swapped, x_data], axis=0) return new_x_data @@ -279,9 +276,9 @@ def predict_from_arbitrary_data( model_weights_dir, model_type, hidden_size, + num_tasks=1, seq_len=365, dist_matrix=None, - flow_in_temp=False, logged_q=False, ): """ @@ -300,11 +297,10 @@ def predict_from_arbitrary_data( weights are stored :param model_type: [str] model to use either 'rgcn', 'lstm', or 'gru' :param hidden_size: [int] the number of hidden units in model + :param num_tasks: [int] number of tasks (variables to be predicted) :param seq_len: [int] length of input sequences given to model :param dist_matrix: [np array] the distance matrix if using 'rgcn'. if not provided, will look for it in the "train_io_data" file. - :param flow_in_temp: [bool] whether the flow should be an input into temp - for the rgcn model :param logged_q: [bool] whether the model predicted log of discharge. if true, the exponent of the discharge will be executed :return: [pd dataframe] the predictions @@ -320,7 +316,11 @@ def predict_from_arbitrary_data( ) model = load_model_from_weights( - model_type, model_weights_dir, hidden_size, dist_matrix, flow_in_temp, + model_type, + model_weights_dir, + hidden_size, + dist_matrix, + num_tasks=num_tasks, ) ds = xr.open_zarr(raw_data_file) @@ -335,7 +335,7 @@ def predict_from_arbitrary_data( pred_start_date = datetime.datetime.strptime(pred_start_date, "%Y-%m-%d") # look back half of the sequence length before the prediction start date. # if present, this serves as a half-sequence warm-up period - inputs_start_date = pred_start_date - datetime.timedelta(round(seq_len/2)) + inputs_start_date = pred_start_date - datetime.timedelta(round(seq_len / 2)) # get the "middle" predictions middle_predictions = predict_one_date_range( @@ -362,7 +362,7 @@ def predict_from_arbitrary_data( start_dates_end, logged_q, keep_last_frac=1, - offset=.5, + offset=0.5, swap_halves_of_first_seq=True, ) diff --git a/river_dl/rnns.py b/river_dl/rnns.py index afff737..804d2da 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -2,126 +2,73 @@ from __future__ import print_function, division import tensorflow as tf from tensorflow.keras import layers -from river_dl.loss_functions import nnse_masked_one_var, nnse_one_var_samplewise class LSTMModel(tf.keras.Model): def __init__( - self, - hidden_size, - gradient_correction=False, - lamb=1, - dropout=0, - grad_log_file=None, + self, hidden_size, num_tasks=1, recurrent_dropout=0, dropout=0, ): """ :param hidden_size: [int] the number of hidden units + :param num_tasks: [int] number of tasks (variables to be predicted) + :param recurrent_dropout: [float] value between 0 and 1 for the + probability of a recurrent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an + input element to be zero """ super().__init__() - self.gradient_correction = gradient_correction - self.grad_log_file = grad_log_file - self.lamb = lamb + self.hidden_size = hidden_size + self.num_tasks = num_tasks self.rnn_layer = layers.LSTM( hidden_size, return_sequences=True, - name="rnn_shared", - recurrent_dropout=dropout, + stateful=True, + return_state=True, + recurrent_dropout=recurrent_dropout, + dropout=dropout, ) self.dense_main = layers.Dense(1, name="dense_main") - self.dense_aux = layers.Dense(1, name="dense_aux") + if self.num_tasks == 2: + self.dense_aux = layers.Dense(1, name="dense_aux") + self.states = None @tf.function def call(self, inputs, **kwargs): - x = self.rnn_layer(inputs) - main_prediction = self.dense_main(x) - aux_prediction = self.dense_aux(x) - return tf.concat([main_prediction, aux_prediction], axis=2) - - @tf.function - def train_step(self, data): - x, y = data - - # If I don't do one forward pass before starting the gradient tape, - # the thing hangs - _ = self(x) - with tf.GradientTape(persistent=True) as tape: - y_pred = self(x, training=True) # forward pass - - loss_main = nnse_one_var_samplewise(y, y_pred, 0) - loss_aux = nnse_one_var_samplewise(y, y_pred, 1) - - trainable_vars = self.trainable_variables - - main_out_vars = get_variables(trainable_vars, "dense_main") - aux_out_vars = get_variables(trainable_vars, "dense_aux") - shared_vars = get_variables(trainable_vars, "rnn_shared") - - # get gradients - gradient_main_out = tape.gradient(loss_main, main_out_vars) - gradient_aux_out = tape.gradient(loss_aux, aux_out_vars) - gradient_shared_main = tape.gradient(loss_main, shared_vars) - gradient_shared_aux = tape.gradient(loss_aux, shared_vars) - - if self.gradient_correction: - # adjust auxiliary gradient - gradient_shared_aux = adjust_gradient_list( - gradient_shared_main, gradient_shared_aux, self.grad_log_file + batch_size = inputs.shape[0] + h_init = kwargs.get("h_init", tf.zeros([batch_size, self.hidden_size])) + c_init = kwargs.get("c_init", tf.zeros([batch_size, self.hidden_size])) + self.rnn_layer.reset_states(states=[h_init, c_init]) + x, h, c = self.rnn_layer(inputs) + self.states = h, c + if self.num_tasks == 1: + main_prediction = self.dense_main(x) + return main_prediction + elif self.num_tasks == 2: + main_prediction = self.dense_main(x) + aux_prediction = self.dense_aux(x) + return tf.concat([main_prediction, aux_prediction], axis=2) + else: + raise ValueError( + f"This model only supports 1 or 2 tasks (not {self.num_tasks})" ) - combined_gradient = combine_gradients_list( - gradient_shared_main, gradient_shared_aux, lamb=self.lamb - ) - - # apply gradients - self.optimizer.apply_gradients(zip(gradient_main_out, main_out_vars)) - self.optimizer.apply_gradients(zip(gradient_aux_out, aux_out_vars)) - self.optimizer.apply_gradients(zip(combined_gradient, shared_vars)) - return {"loss_main": loss_main, "loss_aux": loss_aux} class GRUModel(LSTMModel): - def __init__(self, hidden_size, lamb=1): + def __init__( + self, hidden_size, num_tasks=1, dropout=0, recurrent_dropout=0, + ): """ :param hidden_size: [int] the number of hidden units + :param num_tasks: [int] number of tasks (variables to be predicted) + :param recurrent_dropout: [float] value between 0 and 1 for the + probability of a recurrent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an """ - super().__init__(hidden_size, lamb=lamb) + super().__init__(hidden_size, num_tasks=num_tasks) self.rnn_layer = layers.GRU( - hidden_size, return_sequences=True, name="rnn_shared" + hidden_size, + recurrent_dropout=recurrent_dropout, + dropout=dropout, + return_sequences=True, + name="rnn_shared", ) - - -def adjust_gradient(main_grad, aux_grad, logfile=None): - # flatten tensors - main_grad_flat = tf.reshape(main_grad, [-1]) - aux_grad_flat = tf.reshape(aux_grad, [-1]) - - # project and adjust - projection = ( - tf.minimum(tf.reduce_sum(main_grad_flat * aux_grad_flat), 0) - * main_grad_flat - / tf.reduce_sum(main_grad_flat * main_grad_flat) - ) - if logfile: - logfile = "file://" + logfile - tf.print(tf.reduce_sum(projection), output_stream=logfile, sep=",") - projection = tf.cond( - tf.math.is_nan(tf.reduce_sum(projection)), - lambda: tf.zeros(aux_grad_flat.shape), - lambda: projection, - ) - adjusted = aux_grad_flat - projection - return tf.reshape(adjusted, aux_grad.shape) - - -def get_variables(trainable_variables, name): - return [v for v in trainable_variables if name in v.name] - - -def combine_gradients_list(main_grads, aux_grads, lamb=1): - return [main_grads[i] + lamb * aux_grads[i] for i in range(len(main_grads))] - - -def adjust_gradient_list(main_grads, aux_grads, logfile=None): - return [ - adjust_gradient(main_grads[i], aux_grads[i], logfile) - for i in range(len(main_grads)) - ] diff --git a/river_dl/train.py b/river_dl/train.py index c4c0c47..d584caa 100644 --- a/river_dl/train.py +++ b/river_dl/train.py @@ -5,7 +5,6 @@ import datetime import tensorflow as tf from river_dl.RGCN import RGCNModel -from river_dl.loss_functions import weighted_masked_rmse from river_dl.rnns import LSTMModel, GRUModel @@ -26,12 +25,13 @@ def train_model( pretrain_epochs, finetune_epochs, hidden_units, + loss_func, out_dir, - flow_in_temp=False, model_type="rgcn", seed=None, dropout=0, - lamb=1, + recurrent_dropout=0, + num_tasks=1, learning_rate_pre=0.005, learning_rate_ft=0.01, ): @@ -41,15 +41,16 @@ def train_model( :param pretrain_epochs: [int] number of pretrain epochs :param finetune_epochs: [int] number of finetune epochs :param hidden_units: [int] number of hidden layers + :param loss_func: [function] loss function that the model will be fit to :param out_dir: [str] directory where the output files should be written - :param flow_in_temp: [bool] whether the flow predictions should feed - into the temp predictions :param model_type: [str] which model to use (either 'lstm', 'rgcn', or - 'lstm_grad_correction') + 'gru') :param seed: [int] random seed - :param lamb: [float] (short for 'lambda') weight between 0 and 1. How much - to weight the auxiliary rmse is weighted compared to the main rmse. The - difference between one and lambda becomes the main rmse weight. + :param recurrent_dropout: [float] value between 0 and 1 for the probability + of a reccurent element to be zero + :param dropout: [float] value between 0 and 1 for the probability of an + input element to be zero + :param num_tasks: [int] number of tasks (variables to be predicted) :param learning_rate_pre: [float] the pretrain learning rate :param learning_rate_ft: [float] the finetune learning rate :return: [tf model] finetuned model @@ -71,25 +72,32 @@ def train_model( batch_size = num_years if model_type == "lstm": - model = LSTMModel(hidden_units, lamb=lamb) + model = LSTMModel( + hidden_units, + num_tasks=num_tasks, + recurrent_dropout=recurrent_dropout, + dropout=dropout, + ) elif model_type == "rgcn": model = RGCNModel( hidden_units, - flow_in_temp=flow_in_temp, + num_tasks=num_tasks, A=dist_matrix, rand_seed=seed, + dropout=dropout, + recurrent_dropout=recurrent_dropout, ) - elif model_type == "lstm_grad_correction": - grad_log_file = os.path.join(out_dir, "grad_correction.txt") - model = LSTMModel( + elif model_type == "gru": + model = GRUModel( hidden_units, - gradient_correction=True, - lamb=lamb, + num_tasks=num_tasks, + recurrent_dropout=recurrent_dropout, dropout=dropout, - grad_log_file=grad_log_file, ) - elif model_type == "gru": - model = GRUModel(hidden_units, lamb=lamb) + else: + raise ValueError( + f"The 'model_type' provided ({model_type}) is not supported" + ) if seed: os.environ["PYTHONHASHSEED"] = str(seed) @@ -105,14 +113,9 @@ def train_model( # use built in 'fit' method unless model is grad correction x_trn_pre = io_data["x_trn"] # combine with weights to pass to loss function - y_trn_pre = np.concatenate( - [io_data["y_pre_trn"], io_data["y_pre_wgts"]], axis=2 - ) + y_trn_pre = io_data["y_pre_trn"] - if model_type == "rgcn": - model.compile(optimizer_pre, loss=weighted_masked_rmse(lamb=lamb)) - else: - model.compile(optimizer_pre) + model.compile(optimizer_pre, loss=loss_func) csv_log_pre = tf.keras.callbacks.CSVLogger( os.path.join(out_dir, f"pretrain_log.csv") @@ -140,19 +143,14 @@ def train_model( if finetune_epochs > 0: optimizer_ft = tf.optimizers.Adam(learning_rate=learning_rate_ft) - if model_type == "rgcn": - model.compile(optimizer_ft, loss=weighted_masked_rmse(lamb=lamb)) - else: - model.compile(optimizer_ft) + model.compile(optimizer_ft, loss=loss_func) csv_log_ft = tf.keras.callbacks.CSVLogger( os.path.join(out_dir, "finetune_log.csv") ) x_trn_obs = io_data["x_trn"] - y_trn_obs = np.concatenate( - [io_data["y_obs_trn"], io_data["y_obs_wgts"]], axis=2 - ) + y_trn_obs = io_data["y_obs_trn"] model.fit( x=x_trn_obs, diff --git a/river_dl/train_model.py b/river_dl/train_model.py index be465fa..fd8c86c 100644 --- a/river_dl/train_model.py +++ b/river_dl/train_model.py @@ -1,6 +1,23 @@ -import os import argparse from river_dl.train import train_model +import river_dl.loss_functions as lf + + +def get_loss_func_from_str(loss_func_str, lambdas=None): + if loss_func_str == "rmse": + return lf.rmse + elif loss_func_str == "nse": + return lf.nse + elif loss_func_str == "kge": + return lf.kge + elif loss_func_str == "multitask_rmse": + return lf.multitask_rmse(lambdas) + elif loss_func_str == "multitask_nse": + return lf.multitask_nse(lambdas) + elif loss_func_str == "multitask_kge": + return lf.multitask_kge(lambdas) + else: + raise ValueError(f"loss function {loss_func_str} not supported") parser = argparse.ArgumentParser() @@ -23,13 +40,6 @@ parser.add_argument( "-f", "--finetune_epochs", help="number of finetune" "epochs", type=int ) -parser.add_argument( - "-q", - "--flow-in-temp", - help="whether or not to do flow\ - in temp", - action="store_true", -) parser.add_argument( "--pt_learn_rate", help="learning rate for pretraining", @@ -45,31 +55,56 @@ parser.add_argument( "--model", help="type of model to train", - choices=["lstm", "rgcn"], + choices=["lstm", "rgcn", "gru"], default="rgcn", ) parser.add_argument( - "--lamb", help="lambda for weighting aux gradient", default=1.0, type=float + "--num_tasks", + help="number of tasks (variables to be predicted)", + default=1, + type=int, +) +parser.add_argument( + "--loss_func", + help="loss function", + default="rmse", + type=str, + choices=[ + "rmse", + "nse", + "kge", + "multitask_rmse", + "multitask_kge", + "multitask_nse", + ], +) +parser.add_argument("--dropout", help="dropout rate", default=0, type=float) +parser.add_argument( + "--recurrent_dropout", help="recurrent dropout", default=0, type=float +) +parser.add_argument( + "--lambdas", + help="lambdas for weighting variable losses", + default=[1, 1], + type=list, ) - args = parser.parse_args() -flow_in_temp = args.flow_in_temp -in_data_file = args.in_data -hidden_units = args.hidden_units -out_dir = args.outdir -pt_epochs = args.pretrain_epochs -ft_epochs = args.finetune_epochs + +loss_func = get_loss_func_from_str(args.loss_func) + # -------- train ------ model = train_model( - in_data_file, - pt_epochs, - ft_epochs, - hidden_units, - out_dir=out_dir, - flow_in_temp=flow_in_temp, - lamb=args.lamb, + args.in_data_file, + args.pretrain_epochs, + args.finetune_epochs, + args.hidden_units, + out_dir=args.out_dir, + num_tasks=args.num_tasks, + loss_func=loss_func, + dropout=args.dropout, + recurrent_dropout=args.recurrent_dropout, seed=args.random_seed, learning_rate_ft=args.ft_learn_rate, learning_rate_pre=args.pt_learn_rate,