From ee4f1311cab04b6e2cbe4cfc2a0d4981faba7724 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Mon, 28 Dec 2020 13:18:30 -0600 Subject: [PATCH 1/8] functions for pgda-da prep --- river_dl/preproc_utils.py | 40 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/river_dl/preproc_utils.py b/river_dl/preproc_utils.py index 46ba6ef..162ef62 100644 --- a/river_dl/preproc_utils.py +++ b/river_dl/preproc_utils.py @@ -613,3 +613,43 @@ def read_exclude_segs_file(exclude_file): with open(exclude_file, "r") as s: d = yaml.safe_load(s) return [val for key, val in d.items()] + + +def prep_one_var_lstm_da(var_file, vars, seg_id, start_date_trn, end_date_trn, + start_date_pred, end_date_pred, scale_data=False): + data = xr.open_zarr(var_file) + data = data[vars] + data = data.loc[dict(seg_id_nat=[seg_id])] + data_trn = data.loc[dict(date=slice(start_date_trn, end_date_trn))] + data_pred = data.loc[dict(date=slice(start_date_pred, end_date_pred))] + if scale_data: + data_trn, mean, std = scale(data_trn) + data_pred, _, _ = scale(data_pred, mean, std) + return data_trn, data_pred + + +def fmt_dataset(dataset): + return np.expand_dims(dataset.to_array().values, 2) + + +def prep_data_lstm_da(obs_temp_file, driver_file, seg_id, start_date_trn, + end_date_trn, start_date_pred, end_date_pred, + x_vars=['seg_tave_air'], y_vars=['temp_c'], + out_file=None): + x_trn, x_pred = prep_one_var_lstm_da(driver_file, x_vars, seg_id, start_date_trn, end_date_trn, start_date_pred, end_date_pred, scale_data=True) + y_trn, y_pred = prep_one_var_lstm_da(obs_temp_file, y_vars, seg_id, start_date_trn, end_date_trn, start_date_pred, end_date_pred, scale_data=False) + + + data = { + "x_trn": (convert_batch_reshape(x_trn)), + "x_pred": (convert_batch_reshape(x_pred, seq_len=30)), + "dates_trn": (x_trn.date.values), + "dates_pred": (x_pred.date.values), + "y_trn": (convert_batch_reshape(y_trn)), + "y_pred": (convert_batch_reshape(y_pred, seq_len=30)), + } + if out_file: + np.savez_compressed(out_file, **data) + return data + +# d = prep_data_lstm_da('../../drb-dl-model/data/in/obs_temp_full', '../../drb-dl-model/data/in/uncal_sntemp_input_output', 1573, '2011-06-01', '2012-06-01', '2012-06-02', '2012-07-02', out_file='lstm_da_data_just_air_temp.npz') From 685f573f6e2489a0c7bd93f1ebe28a01413d6863 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Thu, 31 Dec 2020 15:33:03 -0600 Subject: [PATCH 2/8] added nnse-samplewise loss --- river_dl/loss_functions.py | 35 ++++++++++++++++++++++++++++++++--- river_dl/rnns.py | 6 +++--- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 5b3b29c..97a9f3c 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -19,18 +19,38 @@ def rmse(y_true, y_pred): return rmse_loss -def nse(y_true, y_pred): +def sample_avg_nse(y_true, y_pred): + """ + calculate the sample averaged nse, i.e., it will calculate the nse across + each of the samples (the 1st dimension of the arrays) and then average those + """ y_true = tf.cast(y_true, tf.float32) y_pred = tf.cast(y_pred, tf.float32) zero_or_error = tf.where( tf.math.is_nan(y_true), tf.zeros_like(y_true), y_pred - y_true ) - numerator = tf.reduce_sum(tf.square(zero_or_error)) + # add a small value to the deviation to prevent instability + deviation = dev_masked(y_true) + 0.1 + + numerator_samplewise = tf.reduce_sum(tf.square(zero_or_error), axis=1) + denomin_samplewise = tf.reduce_sum(tf.square(deviation), axis=1) + nse_samplewise = 1 - numerator_samplewise/denomin_samplewise + nse_samplewise_avg = tf.reduce_sum(nse_samplewise)/tf.cast(tf.shape(y_true)[0], tf.float32) + return nse_samplewise_avg + + +def nse(y_true, y_pred): + y_true = tf.cast(y_true, tf.float32) + y_pred = tf.cast(y_pred, tf.float32) + zero_or_error = tf.where( + tf.math.is_nan(y_true), tf.zeros_like(y_true), y_pred - y_true + ) deviation = dev_masked(y_true) + numerator = tf.reduce_sum(tf.square(zero_or_error)) denominator = tf.reduce_sum(tf.square(deviation)) - return 1 - numerator / denominator + return 1 - numerator / denominator def nnse(y_true, y_pred): @@ -41,11 +61,20 @@ def nnse_loss(y_true, y_pred): return 1 - nnse(y_true, y_pred) +def samplewise_nnse_loss(y_true, y_pred): + nnse_val = 1 / (2 - sample_avg_nse(y_true, y_pred)) + return 1 - nnse_val + + @tf.function def nnse_masked_one_var(data, y_pred, var_idx): y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) return nnse_loss(y_true, y_pred) +@tf.function +def nnse_one_var_samplewise(data, y_pred, var_idx): + y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) + return samplewise_nnse_loss(y_true, y_pred) @tf.function def y_data_components(data, y_pred, var_idx): diff --git a/river_dl/rnns.py b/river_dl/rnns.py index ff868ad..ac939cc 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -2,7 +2,7 @@ from __future__ import print_function, division import tensorflow as tf from tensorflow.keras import layers -from river_dl.loss_functions import nnse_masked_one_var +from river_dl.loss_functions import nnse_masked_one_var, nnse_one_var_samplewise class LSTMModel(tf.keras.Model): @@ -47,8 +47,8 @@ def train_step(self, data): with tf.GradientTape(persistent=True) as tape: y_pred = self(x, training=True) # forward pass - loss_main = nnse_masked_one_var(y, y_pred, 0) - loss_aux = nnse_masked_one_var(y, y_pred, 1) + loss_main = nnse_one_var_samplewise(y, y_pred, 0) + loss_aux = nnse_one_var_samplewise(y, y_pred, 1) trainable_vars = self.trainable_variables From cd569234550ac9bced455b39d222862df6883f70 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Mon, 28 Dec 2020 13:18:30 -0600 Subject: [PATCH 3/8] functions for pgda-da prep --- river_dl/preproc_utils.py | 40 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/river_dl/preproc_utils.py b/river_dl/preproc_utils.py index 46ba6ef..162ef62 100644 --- a/river_dl/preproc_utils.py +++ b/river_dl/preproc_utils.py @@ -613,3 +613,43 @@ def read_exclude_segs_file(exclude_file): with open(exclude_file, "r") as s: d = yaml.safe_load(s) return [val for key, val in d.items()] + + +def prep_one_var_lstm_da(var_file, vars, seg_id, start_date_trn, end_date_trn, + start_date_pred, end_date_pred, scale_data=False): + data = xr.open_zarr(var_file) + data = data[vars] + data = data.loc[dict(seg_id_nat=[seg_id])] + data_trn = data.loc[dict(date=slice(start_date_trn, end_date_trn))] + data_pred = data.loc[dict(date=slice(start_date_pred, end_date_pred))] + if scale_data: + data_trn, mean, std = scale(data_trn) + data_pred, _, _ = scale(data_pred, mean, std) + return data_trn, data_pred + + +def fmt_dataset(dataset): + return np.expand_dims(dataset.to_array().values, 2) + + +def prep_data_lstm_da(obs_temp_file, driver_file, seg_id, start_date_trn, + end_date_trn, start_date_pred, end_date_pred, + x_vars=['seg_tave_air'], y_vars=['temp_c'], + out_file=None): + x_trn, x_pred = prep_one_var_lstm_da(driver_file, x_vars, seg_id, start_date_trn, end_date_trn, start_date_pred, end_date_pred, scale_data=True) + y_trn, y_pred = prep_one_var_lstm_da(obs_temp_file, y_vars, seg_id, start_date_trn, end_date_trn, start_date_pred, end_date_pred, scale_data=False) + + + data = { + "x_trn": (convert_batch_reshape(x_trn)), + "x_pred": (convert_batch_reshape(x_pred, seq_len=30)), + "dates_trn": (x_trn.date.values), + "dates_pred": (x_pred.date.values), + "y_trn": (convert_batch_reshape(y_trn)), + "y_pred": (convert_batch_reshape(y_pred, seq_len=30)), + } + if out_file: + np.savez_compressed(out_file, **data) + return data + +# d = prep_data_lstm_da('../../drb-dl-model/data/in/obs_temp_full', '../../drb-dl-model/data/in/uncal_sntemp_input_output', 1573, '2011-06-01', '2012-06-01', '2012-06-02', '2012-07-02', out_file='lstm_da_data_just_air_temp.npz') From ec7b720331232a40ecb8f39af659f776991ac275 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Thu, 31 Dec 2020 15:33:03 -0600 Subject: [PATCH 4/8] added nnse-samplewise loss --- river_dl/loss_functions.py | 35 ++++++++++++++++++++++++++++++++--- river_dl/rnns.py | 6 +++--- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/river_dl/loss_functions.py b/river_dl/loss_functions.py index 5b3b29c..97a9f3c 100644 --- a/river_dl/loss_functions.py +++ b/river_dl/loss_functions.py @@ -19,18 +19,38 @@ def rmse(y_true, y_pred): return rmse_loss -def nse(y_true, y_pred): +def sample_avg_nse(y_true, y_pred): + """ + calculate the sample averaged nse, i.e., it will calculate the nse across + each of the samples (the 1st dimension of the arrays) and then average those + """ y_true = tf.cast(y_true, tf.float32) y_pred = tf.cast(y_pred, tf.float32) zero_or_error = tf.where( tf.math.is_nan(y_true), tf.zeros_like(y_true), y_pred - y_true ) - numerator = tf.reduce_sum(tf.square(zero_or_error)) + # add a small value to the deviation to prevent instability + deviation = dev_masked(y_true) + 0.1 + + numerator_samplewise = tf.reduce_sum(tf.square(zero_or_error), axis=1) + denomin_samplewise = tf.reduce_sum(tf.square(deviation), axis=1) + nse_samplewise = 1 - numerator_samplewise/denomin_samplewise + nse_samplewise_avg = tf.reduce_sum(nse_samplewise)/tf.cast(tf.shape(y_true)[0], tf.float32) + return nse_samplewise_avg + + +def nse(y_true, y_pred): + y_true = tf.cast(y_true, tf.float32) + y_pred = tf.cast(y_pred, tf.float32) + zero_or_error = tf.where( + tf.math.is_nan(y_true), tf.zeros_like(y_true), y_pred - y_true + ) deviation = dev_masked(y_true) + numerator = tf.reduce_sum(tf.square(zero_or_error)) denominator = tf.reduce_sum(tf.square(deviation)) - return 1 - numerator / denominator + return 1 - numerator / denominator def nnse(y_true, y_pred): @@ -41,11 +61,20 @@ def nnse_loss(y_true, y_pred): return 1 - nnse(y_true, y_pred) +def samplewise_nnse_loss(y_true, y_pred): + nnse_val = 1 / (2 - sample_avg_nse(y_true, y_pred)) + return 1 - nnse_val + + @tf.function def nnse_masked_one_var(data, y_pred, var_idx): y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) return nnse_loss(y_true, y_pred) +@tf.function +def nnse_one_var_samplewise(data, y_pred, var_idx): + y_true, y_pred, weights = y_data_components(data, y_pred, var_idx) + return samplewise_nnse_loss(y_true, y_pred) @tf.function def y_data_components(data, y_pred, var_idx): diff --git a/river_dl/rnns.py b/river_dl/rnns.py index ff868ad..ac939cc 100644 --- a/river_dl/rnns.py +++ b/river_dl/rnns.py @@ -2,7 +2,7 @@ from __future__ import print_function, division import tensorflow as tf from tensorflow.keras import layers -from river_dl.loss_functions import nnse_masked_one_var +from river_dl.loss_functions import nnse_masked_one_var, nnse_one_var_samplewise class LSTMModel(tf.keras.Model): @@ -47,8 +47,8 @@ def train_step(self, data): with tf.GradientTape(persistent=True) as tape: y_pred = self(x, training=True) # forward pass - loss_main = nnse_masked_one_var(y, y_pred, 0) - loss_aux = nnse_masked_one_var(y, y_pred, 1) + loss_main = nnse_one_var_samplewise(y, y_pred, 0) + loss_aux = nnse_one_var_samplewise(y, y_pred, 1) trainable_vars = self.trainable_variables From 776fe20d27f4abb3d7a5b34a988b64907e4953d7 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Wed, 10 Feb 2021 13:53:23 -0600 Subject: [PATCH 5/8] builds rnn_layer in shape of pred data for preds --- river_dl/postproc_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/river_dl/postproc_utils.py b/river_dl/postproc_utils.py index 3fbc583..8ca90bd 100644 --- a/river_dl/postproc_utils.py +++ b/river_dl/postproc_utils.py @@ -172,6 +172,7 @@ def predict_from_file( """ io_data = get_data_if_file(io_data) model = tf.keras.models.load_model(model_weights_dir) + model.rnn_layer.build(input_shape=io_data['x_tst'].shape) preds = predict( model, io_data, partition, outfile, logged_q=logged_q, half_tst=half_tst ) From a7bdccb45fd5aebf7945e85a57cc0897cf4ff2ea Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 12 Mar 2021 09:50:47 -0600 Subject: [PATCH 6/8] postproc support for verification set --- river_dl/postproc_utils.py | 39 ++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/river_dl/postproc_utils.py b/river_dl/postproc_utils.py index 8ca90bd..2f4e0cc 100644 --- a/river_dl/postproc_utils.py +++ b/river_dl/postproc_utils.py @@ -35,7 +35,7 @@ def prepped_array_to_df(data_array, dates, ids, col_names): return df -def take_first_half(df): +def take_half(df, first_half=True): """ filter out the second half of the dates in the predictions. this is to retain a "test" set of the i/o data for evaluation @@ -47,9 +47,12 @@ def take_first_half(df): df.sort_index(inplace=True) unique_dates = df.index.unique() halfway_date = unique_dates[int(len(unique_dates) / 2)] - df_first_half = df.loc[:halfway_date] - df_first_half.reset_index(inplace=True) - return df_first_half + if first_half: + df_half = df.loc[:halfway_date] + else: + df_half = df.loc[halfway_date:] + df_half.reset_index(inplace=True) + return df_half def unscale_output(y_scl, y_std, y_mean, data_cols, logged_q=False): @@ -172,7 +175,6 @@ def predict_from_file( """ io_data = get_data_if_file(io_data) model = tf.keras.models.load_model(model_weights_dir) - model.rnn_layer.build(input_shape=io_data['x_tst'].shape) preds = predict( model, io_data, partition, outfile, logged_q=logged_q, half_tst=half_tst ) @@ -198,11 +200,16 @@ def predict(model, io_data, partition, outfile, logged_q=False, half_tst=False): """ io_data = get_data_if_file(io_data) - # evaluate training - if partition == "trn" or partition == "tst": + if partition in ["trn", "tst", "ver"]: pass else: - raise ValueError('partition arg needs to be "trn" or "tst"') + raise ValueError('partition arg needs to be "trn" or "tst" or "ver"') + + if partition == "ver": + partition = "tst" + tst_partition = "ver" + elif partition == "tst": + tst_partition = "tst" num_segs = len(np.unique(io_data["ids_trn"])) y_pred = model.predict(io_data[f"x_{partition}"], batch_size=num_segs) @@ -221,8 +228,12 @@ def predict(model, io_data, partition, outfile, logged_q=False, half_tst=False): logged_q, ) - if half_tst and partition == "tst": - y_pred_pp = take_first_half(y_pred_pp) + if partition == "tst": + if half_tst and tst_partition == "tst": + y_pred_pp = take_half(y_pred_pp, first_half=True) + + if half_tst and tst_partition == "ver": + y_pred_pp = take_half(y_pred_pp, first_half=False) y_pred_pp.to_feather(outfile) return y_pred_pp @@ -373,13 +384,14 @@ def overall_metrics( def combined_metrics( - pred_trn, pred_tst, obs_temp, obs_flow, grp=None, outfile=None + pred_trn, pred_tst, obs_temp, obs_flow, pred_ver=None, grp=None, outfile=None ): """ calculate the metrics for flow and temp and training and test sets for a given grouping :param pred_trn: [str] path to training prediction feather file :param pred_tst: [str] path to testing prediction feather file + :param pred_tst: [str] path to verification prediction feather file :param obs_temp: [str] path to observations temperature zarr file :param obs_flow: [str] path to observations flow zarr file :param group: [str or list] which group the metrics should be computed for. @@ -393,7 +405,10 @@ def combined_metrics( trn_flow = overall_metrics(pred_trn, obs_flow, "flow", "trn", grp) tst_temp = overall_metrics(pred_tst, obs_temp, "temp", "tst", grp) tst_flow = overall_metrics(pred_tst, obs_flow, "flow", "tst", grp) - df_all = [trn_temp, tst_temp, trn_flow, tst_flow] + if pred_ver: + ver_temp = overall_metrics(pred_ver, obs_temp, "temp", "ver", grp) + ver_flow = overall_metrics(pred_ver, obs_flow, "flow", "ver", grp) + df_all = [trn_temp, tst_temp, trn_flow, tst_flow, ver_temp, ver_flow] df_all = pd.concat(df_all, axis=0) if outfile: df_all.to_csv(outfile, index=False) From c8683ea9f8f0e87c7191b1a190f6afef20349521 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 12 Mar 2021 09:55:27 -0600 Subject: [PATCH 7/8] adds ver metrics only if ver==True --- river_dl/postproc_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/river_dl/postproc_utils.py b/river_dl/postproc_utils.py index 2f4e0cc..64a18b1 100644 --- a/river_dl/postproc_utils.py +++ b/river_dl/postproc_utils.py @@ -405,10 +405,11 @@ def combined_metrics( trn_flow = overall_metrics(pred_trn, obs_flow, "flow", "trn", grp) tst_temp = overall_metrics(pred_tst, obs_temp, "temp", "tst", grp) tst_flow = overall_metrics(pred_tst, obs_flow, "flow", "tst", grp) + df_all = [trn_temp, tst_temp, trn_flow, tst_flow] if pred_ver: ver_temp = overall_metrics(pred_ver, obs_temp, "temp", "ver", grp) ver_flow = overall_metrics(pred_ver, obs_flow, "flow", "ver", grp) - df_all = [trn_temp, tst_temp, trn_flow, tst_flow, ver_temp, ver_flow] + df_all.extend([ver_temp, ver_flow]) df_all = pd.concat(df_all, axis=0) if outfile: df_all.to_csv(outfile, index=False) From 051f1f25817bb9021294fd1feb157f3db19f6998 Mon Sep 17 00:00:00 2001 From: jsadler2 Date: Fri, 12 Mar 2021 09:57:30 -0600 Subject: [PATCH 8/8] rm unneeded pgdl_da functions --- river_dl/preproc_utils.py | 40 --------------------------------------- 1 file changed, 40 deletions(-) diff --git a/river_dl/preproc_utils.py b/river_dl/preproc_utils.py index 162ef62..46ba6ef 100644 --- a/river_dl/preproc_utils.py +++ b/river_dl/preproc_utils.py @@ -613,43 +613,3 @@ def read_exclude_segs_file(exclude_file): with open(exclude_file, "r") as s: d = yaml.safe_load(s) return [val for key, val in d.items()] - - -def prep_one_var_lstm_da(var_file, vars, seg_id, start_date_trn, end_date_trn, - start_date_pred, end_date_pred, scale_data=False): - data = xr.open_zarr(var_file) - data = data[vars] - data = data.loc[dict(seg_id_nat=[seg_id])] - data_trn = data.loc[dict(date=slice(start_date_trn, end_date_trn))] - data_pred = data.loc[dict(date=slice(start_date_pred, end_date_pred))] - if scale_data: - data_trn, mean, std = scale(data_trn) - data_pred, _, _ = scale(data_pred, mean, std) - return data_trn, data_pred - - -def fmt_dataset(dataset): - return np.expand_dims(dataset.to_array().values, 2) - - -def prep_data_lstm_da(obs_temp_file, driver_file, seg_id, start_date_trn, - end_date_trn, start_date_pred, end_date_pred, - x_vars=['seg_tave_air'], y_vars=['temp_c'], - out_file=None): - x_trn, x_pred = prep_one_var_lstm_da(driver_file, x_vars, seg_id, start_date_trn, end_date_trn, start_date_pred, end_date_pred, scale_data=True) - y_trn, y_pred = prep_one_var_lstm_da(obs_temp_file, y_vars, seg_id, start_date_trn, end_date_trn, start_date_pred, end_date_pred, scale_data=False) - - - data = { - "x_trn": (convert_batch_reshape(x_trn)), - "x_pred": (convert_batch_reshape(x_pred, seq_len=30)), - "dates_trn": (x_trn.date.values), - "dates_pred": (x_pred.date.values), - "y_trn": (convert_batch_reshape(y_trn)), - "y_pred": (convert_batch_reshape(y_pred, seq_len=30)), - } - if out_file: - np.savez_compressed(out_file, **data) - return data - -# d = prep_data_lstm_da('../../drb-dl-model/data/in/obs_temp_full', '../../drb-dl-model/data/in/uncal_sntemp_input_output', 1573, '2011-06-01', '2012-06-01', '2012-06-02', '2012-07-02', out_file='lstm_da_data_just_air_temp.npz')