Skip to content

Commit

Permalink
Feat dsi_pyworker (#559)
Browse files Browse the repository at this point in the history
* initial gpr setup helper

* more work on gpr prototype

* more work on gpr emulator

* more gpr

* more dev and exp with gpr

* more thresh testing

* env

* xfer pi from base to gpr interface

* starting on py worker

* starting on py worker

* more

* initailization rough'd in

* more

* refactor to listen fxn, prep for threading

* more

* fix for long names in a binary file

* test undo to_coo

* added initial placeholder pypestworker invest function

* working on gpr tests

* added gpr notebook

* more work on hosaki

* more work on gpr

* added shortened zdt1 test, trying to speed up gpr hosaki notebook

* shortening gpr tests more

* fix for parallel tests

* switched constr gpr test to serial

* more

* docstrings

* more work on pypestworker, added test

* longer time

* shortened timeouts again but added netpack.recv_all() to help

* added multiple pyworker test

* more work on pyworker test

* timeout

* longer default timeout, removed start_workers from pyworker test to avoid conflicts with other tests

* added try-catch-reconnect

* fix

* fix

* working on ppw gpr worker

* added gpr ppw test:

* more work on gpr ppw fxn

* added ppw options to start_workers

* reworked gpr notebook to use ppw, omg sooo much better

* even longer timeout for ends tests

* a few tweaks to the gpr hosaki notebook

* fix for notebook with really short runs being timed out

* time out protection in zdt1 gpr test

* less mem in ac draw test

* more test opt

* trying again

* turning off constr gpr test for now

* update get_pestpp test

* more work on get_pestpp for mac:

* comment out print funx

* dsi pyworker funxs'

* switch dsi tests to use pyworker

---------

Co-authored-by: jdub <[email protected]>
Co-authored-by: Brioch Hemmings <[email protected]>
  • Loading branch information
3 people authored Dec 10, 2024
1 parent 90c4eec commit 83a6f04
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 4 deletions.
16 changes: 13 additions & 3 deletions autotest/la_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@ def ends_freyberg_test(tmp_path):
def ends_run_freyberg_dsi(tmp_d, nst=False, nst_extrap=None, ztz=False, energy=1.0):
import pyemu
import os
import pandas as pd
import numpy as np
test_d = "ends_master"
test_d = setup_tmp(test_d, tmp_d)
case = "freyberg6_run_ies"
Expand Down Expand Up @@ -627,11 +629,19 @@ def ends_run_freyberg_dsi(tmp_d, nst=False, nst_extrap=None, ztz=False, energy=1
pst.pestpp_options["overdue_giveup_fac"] = 100000000
pst.write(os.path.join(t_d,"dsi.pst"),version=2)
#pyemu.os_utils.run("pestpp-ies dsi.pst",cwd=t_d)

pvals = pd.read_csv(os.path.join(t_d,"dsi_pars.csv"),index_col=0)
pmat = np.load(os.path.join(t_d,"dsi_proj_mat.npy"))
ovals = pd.read_csv(os.path.join(t_d,"dsi_pr_mean.csv"),index_col=0)


m_d = t_d.replace("template","master")
port = _get_port()
pyemu.os_utils.start_workers(t_d, ies_exe_path,"dsi.pst",
worker_root=tmp_d,
master_dir=m_d, num_workers=10, port=port)
master_dir=m_d, num_workers=10, port=port,
ppw_function=pyemu.helpers.dsi_pyworker,
ppw_kwargs={"pmat":pmat,"ovals":ovals,"pvals":pvals})
#read in the results
oe = pyemu.ObservationEnsemble.from_csv(pst=pst, filename=os.path.join(m_d,"dsi.0.obs.csv"))
assert oe.shape[0]==50, f"{50-oe.shape[0]} failed runs"
Expand Down Expand Up @@ -741,8 +751,8 @@ def dsi_normscoretransform_test():

if __name__ == "__main__":
#dsi_normscoretransform_test()
ends_freyberg_test("temp")
#ends_freyberg_dsi_test("temp")
#ends_freyberg_test("temp")
ends_freyberg_dsi_test("temp")
#ends_freyberg_dev()
#ends_freyberg_dsi_test("temp")
#plot_freyberg_dsi()
Expand Down
2 changes: 1 addition & 1 deletion pyemu/eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ def dsi_forward_run():
vals[np.where(log_trans==1)] = 10**vals[np.where(log_trans==1)]
vals-= offset
sim_vals.loc[:,'mn'] = vals
print(sim_vals)
#print(sim_vals)
sim_vals.to_csv("dsi_sim_vals.csv")

self.logger.log("test run")
Expand Down
112 changes: 112 additions & 0 deletions pyemu/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4405,6 +4405,118 @@ def gpr_forward_run():
mdf.loc[:,["output_name","sim","sim_std"]].to_csv("gpr_output.csv",index=False)
return mdf


def dsi_forward_run(pmat=None,ovals=None,pvals=None,
write_csv=True

):

if pvals is None:
pvals = pd.read_csv("dsi_pars.csv",index_col=0)
if pmat is None:
pmat = np.load("dsi_proj_mat.npy")
if ovals is None:
ovals = pd.read_csv("dsi_pr_mean.csv",index_col=0)

try:
offset = np.load("dsi_obs_offset.npy")
except:
#print("no offset file found, assuming no offset")
offset = np.zeros(ovals.shape[0])
try:
log_trans = np.load("dsi_obs_log.npy")
except:
#print("no log-tansform file found, assuming no log-transform")
log_trans = np.zeros(ovals.shape[0])

try:
backtransformvals = np.load("dsi_obs_backtransformvals.npy")
backtransformobsnmes = np.load("dsi_obs_backtransformobsnmes.npy",allow_pickle=True)
backtransform=True
except:
#print("no back-transform file found, assuming no back-transform")
backtransform=False


sim_vals = ovals + np.dot(pmat,pvals.values)

if backtransform:
#print("applying back-transform")
obsnmes = np.unique(backtransformobsnmes)
back_vals = [
inverse_normal_score_transform(
backtransformvals[np.where(backtransformobsnmes==o)][:,1],
backtransformvals[np.where(backtransformobsnmes==o)][:,0],
sim_vals.loc[o].mn,
extrap=None
)[0]
for o in obsnmes
]
sim_vals.loc[obsnmes,'mn'] = back_vals

#print("reversing offset and log-transform")
assert log_trans.shape[0] == sim_vals.mn.values.shape[0], f"log transform shape mismatch: {log_trans.shape[0]},{sim_vals.mn.values.shape[0]}"
assert offset.shape[0] == sim_vals.mn.values.shape[0], f"offset transform shape mismatch: {offset.shape[0]},{sim_vals.mn.values.shape[0]}"
vals = sim_vals.mn.values
vals[np.where(log_trans==1)] = 10**vals[np.where(log_trans==1)]
vals-= offset
sim_vals.loc[:,'mn'] = vals
#print(sim_vals)
if write_csv:
sim_vals.to_csv("dsi_sim_vals.csv")
return sim_vals


def dsi_pyworker(pst,host,port,pmat=None,ovals=None,pvals=None):

import os
import pandas as pd
import numpy as np


# if explicit args werent passed, get the default ones...
if pvals is None:
pvals = pd.read_csv("dsi_pars.csv",index_col=0)
if pmat is None:
pmat = np.load("dsi_proj_mat.npy")
if ovals is None:
ovals = pd.read_csv("dsi_pr_mean.csv",index_col=0)


ppw = PyPestWorker(pst,host,port,verbose=False)

# we can only get parameters once the worker has initialize and
# is ready to run, so getting the first of pars here
# essentially blocks until the worker is ready
parameters = ppw.get_parameters()
# if its None, the master already quit...
if parameters is None:
return

obs = ppw._pst.observation_data.copy()
# align the obsval series with the order sent from the master
obs = obs.loc[ppw.obs_names,"obsval"]

while True:
# map the current par values in parameters into the
# df needed to run the emulator
pvals.parval1 = parameters.loc[pvals.index]
# do the emulation
simdf = dsi_forward_run(pmat=pmat,ovals=ovals,pvals=pvals,write_csv=False)

# replace the emulated quantites in the obs series
obs.loc[simdf.index] = simdf.mn.values

#send the obs series to the master
ppw.send_observations(obs.values)

#try to get more pars
parameters = ppw.get_parameters()
# if None, we are done
if parameters is None:
break


def randrealgen_optimized(nreal, tol=1e-7, max_samples=1000000):
"""
Generate a set of random realizations with a normal distribution.
Expand Down

0 comments on commit 83a6f04

Please sign in to comment.