From 83a6f0463f8d0b5bd4705288db7095696d8f1e0b Mon Sep 17 00:00:00 2001 From: rhugman <60137311+rhugman@users.noreply.github.com> Date: Tue, 10 Dec 2024 03:08:08 +0000 Subject: [PATCH] Feat dsi_pyworker (#559) * initial gpr setup helper * more work on gpr prototype * more work on gpr emulator * more gpr * more dev and exp with gpr * more thresh testing * env * xfer pi from base to gpr interface * starting on py worker * starting on py worker * more * initailization rough'd in * more * refactor to listen fxn, prep for threading * more * fix for long names in a binary file * test undo to_coo * added initial placeholder pypestworker invest function * working on gpr tests * added gpr notebook * more work on hosaki * more work on gpr * added shortened zdt1 test, trying to speed up gpr hosaki notebook * shortening gpr tests more * fix for parallel tests * switched constr gpr test to serial * more * docstrings * more work on pypestworker, added test * longer time * shortened timeouts again but added netpack.recv_all() to help * added multiple pyworker test * more work on pyworker test * timeout * longer default timeout, removed start_workers from pyworker test to avoid conflicts with other tests * added try-catch-reconnect * fix * fix * working on ppw gpr worker * added gpr ppw test: * more work on gpr ppw fxn * added ppw options to start_workers * reworked gpr notebook to use ppw, omg sooo much better * even longer timeout for ends tests * a few tweaks to the gpr hosaki notebook * fix for notebook with really short runs being timed out * time out protection in zdt1 gpr test * less mem in ac draw test * more test opt * trying again * turning off constr gpr test for now * update get_pestpp test * more work on get_pestpp for mac: * comment out print funx * dsi pyworker funxs' * switch dsi tests to use pyworker --------- Co-authored-by: jdub Co-authored-by: Brioch Hemmings --- autotest/la_tests.py | 16 ++++-- pyemu/eds.py | 2 +- pyemu/utils/helpers.py | 112 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 4 deletions(-) diff --git a/autotest/la_tests.py b/autotest/la_tests.py index cb1be63e..20899642 100644 --- a/autotest/la_tests.py +++ b/autotest/la_tests.py @@ -598,6 +598,8 @@ def ends_freyberg_test(tmp_path): def ends_run_freyberg_dsi(tmp_d, nst=False, nst_extrap=None, ztz=False, energy=1.0): import pyemu import os + import pandas as pd + import numpy as np test_d = "ends_master" test_d = setup_tmp(test_d, tmp_d) case = "freyberg6_run_ies" @@ -627,11 +629,19 @@ def ends_run_freyberg_dsi(tmp_d, nst=False, nst_extrap=None, ztz=False, energy=1 pst.pestpp_options["overdue_giveup_fac"] = 100000000 pst.write(os.path.join(t_d,"dsi.pst"),version=2) #pyemu.os_utils.run("pestpp-ies dsi.pst",cwd=t_d) + + pvals = pd.read_csv(os.path.join(t_d,"dsi_pars.csv"),index_col=0) + pmat = np.load(os.path.join(t_d,"dsi_proj_mat.npy")) + ovals = pd.read_csv(os.path.join(t_d,"dsi_pr_mean.csv"),index_col=0) + + m_d = t_d.replace("template","master") port = _get_port() pyemu.os_utils.start_workers(t_d, ies_exe_path,"dsi.pst", worker_root=tmp_d, - master_dir=m_d, num_workers=10, port=port) + master_dir=m_d, num_workers=10, port=port, + ppw_function=pyemu.helpers.dsi_pyworker, + ppw_kwargs={"pmat":pmat,"ovals":ovals,"pvals":pvals}) #read in the results oe = pyemu.ObservationEnsemble.from_csv(pst=pst, filename=os.path.join(m_d,"dsi.0.obs.csv")) assert oe.shape[0]==50, f"{50-oe.shape[0]} failed runs" @@ -741,8 +751,8 @@ def dsi_normscoretransform_test(): if __name__ == "__main__": #dsi_normscoretransform_test() - ends_freyberg_test("temp") - #ends_freyberg_dsi_test("temp") + #ends_freyberg_test("temp") + ends_freyberg_dsi_test("temp") #ends_freyberg_dev() #ends_freyberg_dsi_test("temp") #plot_freyberg_dsi() diff --git a/pyemu/eds.py b/pyemu/eds.py index 298c813c..f71f45ed 100644 --- a/pyemu/eds.py +++ b/pyemu/eds.py @@ -732,7 +732,7 @@ def dsi_forward_run(): vals[np.where(log_trans==1)] = 10**vals[np.where(log_trans==1)] vals-= offset sim_vals.loc[:,'mn'] = vals - print(sim_vals) + #print(sim_vals) sim_vals.to_csv("dsi_sim_vals.csv") self.logger.log("test run") diff --git a/pyemu/utils/helpers.py b/pyemu/utils/helpers.py index fbdf3bbc..ae975622 100644 --- a/pyemu/utils/helpers.py +++ b/pyemu/utils/helpers.py @@ -4405,6 +4405,118 @@ def gpr_forward_run(): mdf.loc[:,["output_name","sim","sim_std"]].to_csv("gpr_output.csv",index=False) return mdf + +def dsi_forward_run(pmat=None,ovals=None,pvals=None, + write_csv=True + + ): + + if pvals is None: + pvals = pd.read_csv("dsi_pars.csv",index_col=0) + if pmat is None: + pmat = np.load("dsi_proj_mat.npy") + if ovals is None: + ovals = pd.read_csv("dsi_pr_mean.csv",index_col=0) + + try: + offset = np.load("dsi_obs_offset.npy") + except: + #print("no offset file found, assuming no offset") + offset = np.zeros(ovals.shape[0]) + try: + log_trans = np.load("dsi_obs_log.npy") + except: + #print("no log-tansform file found, assuming no log-transform") + log_trans = np.zeros(ovals.shape[0]) + + try: + backtransformvals = np.load("dsi_obs_backtransformvals.npy") + backtransformobsnmes = np.load("dsi_obs_backtransformobsnmes.npy",allow_pickle=True) + backtransform=True + except: + #print("no back-transform file found, assuming no back-transform") + backtransform=False + + + sim_vals = ovals + np.dot(pmat,pvals.values) + + if backtransform: + #print("applying back-transform") + obsnmes = np.unique(backtransformobsnmes) + back_vals = [ + inverse_normal_score_transform( + backtransformvals[np.where(backtransformobsnmes==o)][:,1], + backtransformvals[np.where(backtransformobsnmes==o)][:,0], + sim_vals.loc[o].mn, + extrap=None + )[0] + for o in obsnmes + ] + sim_vals.loc[obsnmes,'mn'] = back_vals + + #print("reversing offset and log-transform") + assert log_trans.shape[0] == sim_vals.mn.values.shape[0], f"log transform shape mismatch: {log_trans.shape[0]},{sim_vals.mn.values.shape[0]}" + assert offset.shape[0] == sim_vals.mn.values.shape[0], f"offset transform shape mismatch: {offset.shape[0]},{sim_vals.mn.values.shape[0]}" + vals = sim_vals.mn.values + vals[np.where(log_trans==1)] = 10**vals[np.where(log_trans==1)] + vals-= offset + sim_vals.loc[:,'mn'] = vals + #print(sim_vals) + if write_csv: + sim_vals.to_csv("dsi_sim_vals.csv") + return sim_vals + + +def dsi_pyworker(pst,host,port,pmat=None,ovals=None,pvals=None): + + import os + import pandas as pd + import numpy as np + + + # if explicit args werent passed, get the default ones... + if pvals is None: + pvals = pd.read_csv("dsi_pars.csv",index_col=0) + if pmat is None: + pmat = np.load("dsi_proj_mat.npy") + if ovals is None: + ovals = pd.read_csv("dsi_pr_mean.csv",index_col=0) + + + ppw = PyPestWorker(pst,host,port,verbose=False) + + # we can only get parameters once the worker has initialize and + # is ready to run, so getting the first of pars here + # essentially blocks until the worker is ready + parameters = ppw.get_parameters() + # if its None, the master already quit... + if parameters is None: + return + + obs = ppw._pst.observation_data.copy() + # align the obsval series with the order sent from the master + obs = obs.loc[ppw.obs_names,"obsval"] + + while True: + # map the current par values in parameters into the + # df needed to run the emulator + pvals.parval1 = parameters.loc[pvals.index] + # do the emulation + simdf = dsi_forward_run(pmat=pmat,ovals=ovals,pvals=pvals,write_csv=False) + + # replace the emulated quantites in the obs series + obs.loc[simdf.index] = simdf.mn.values + + #send the obs series to the master + ppw.send_observations(obs.values) + + #try to get more pars + parameters = ppw.get_parameters() + # if None, we are done + if parameters is None: + break + + def randrealgen_optimized(nreal, tol=1e-7, max_samples=1000000): """ Generate a set of random realizations with a normal distribution.