Skip to content

Commit

Permalink
Merge branch 'improvement.runtime.class' into 'main'
Browse files Browse the repository at this point in the history
Introduce Runtime Environments

See merge request flexi/codes/relexi!25
  • Loading branch information
m-kurz committed Jul 24, 2024
2 parents de4ca88 + 03f574a commit 46cba2e
Show file tree
Hide file tree
Showing 17 changed files with 1,142 additions and 531 deletions.
7 changes: 2 additions & 5 deletions examples/HIT_24_DOF/prm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ environment:
executable_path: ../../../flexi-extensions/build/bin/flexi
parameter_file: ./simulation_files/parameter_flexi.ini
mesh_file: ./simulation_files/CART_HEX_PERIODIC_004_mesh.h5
#local_dir: /var/tmp
mpi_launch_mpmd: False
env_launcher: local

# All parameters for setting up the reward
Expand Down Expand Up @@ -64,6 +62,5 @@ performance:
# SmartSim
smartsim:
smartsim_port: 6780
smartsim_num_dbs: 1
smartsim_launcher: pbs
smartsim_orchestrator: pbs
smartsim_network_interface: local
smartsim_orchestrator: auto
7 changes: 2 additions & 5 deletions examples/HIT_32_DOF/prm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ environment:
executable_path: ../../../flexi-extensions/build/bin/flexi
parameter_file: ./simulation_files/parameter_flexi.ini
mesh_file: ./simulation_files/CART_HEX_PERIODIC_004_mesh.h5
#local_dir: /var/tmp
mpi_launch_mpmd: False
env_launcher: local

# All parameters for setting up the reward
Expand Down Expand Up @@ -64,6 +62,5 @@ performance:
# SmartSim
smartsim:
smartsim_port: 6780
smartsim_num_dbs: 1
smartsim_launcher: pbs
smartsim_orchestrator: pbs
smartsim_network_interface: local
smartsim_orchestrator: auto
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
smartsim>=0.4,<0.7
smartredis
tensorflow>=2.9,<2.16
tensorflow>=2.15,<2.16
tf-agents
cmake
pyyaml
matplotlib
pdoc
pytest
pytest-cov
setuptools!=70.*
207 changes: 54 additions & 153 deletions src/relexi/env/flexiEnvSmartSim.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class flexiEnv(py_environment.PyEnvironment):
"""

def __init__(self,
exp,
runtime,
flexi_path,
prm_file,
spectra_file,
Expand All @@ -47,28 +47,19 @@ def __init__(self,
reward_scale,
n_procs=1,
n_envs=1,
n_procs_per_node=1,
restart_files=None,
random_restart_file=True,
entry_db="127.0.0.1",
port=6780,
is_db_cluster=False,
debug=0,
tag=None,
hosts=None,
rankfiles=None,
mpi_launch_mpmd=False,
env_launcher='mpirun'
):
"""Initialize TF and FLEXI specific properties."""
# Path to FLEXI executable
self.n_envs = n_envs
self.n_procs = n_procs
self.n_procs_per_node = n_procs_per_node
self.prm_file = prm_file
self.flexi_path = flexi_path
self.hosts = hosts
self.rankfiles = rankfiles

# Save values for reward function
self.reward_kmin = reward_kmin
Expand All @@ -80,9 +71,8 @@ def __init__(self,
# Sanity Check Launcher
self.env_launcher = env_launcher
if ((self.env_launcher == 'local') and (n_procs != 1)):
rlxout.warning("For env_launcher 'local', only single execution is allowed! Setting 'n_procs=1'!")
rlxout.warning("To run evironments in parallel with MPI, use env_launcher='mpi'!")
n_procs = 1
rlxout.warning("For env_launcher 'local', only single execution is allowed! Setting 'n_procs=1'")
self.n_procs = 1

if self.env_launcher == 'mpirun':
self.mpi_launch_mpmd = mpi_launch_mpmd
Expand All @@ -95,38 +85,30 @@ def __init__(self,

# Read target DNS spectra from file
if spectra_file:
with open(spectra_file, 'r', encoding='ascii') as csvfile:
with open(spectra_file, 'r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
col_e = next(reader).index('E')
e = []
for rows in reader:
e.append(float(rows[col_e]))
self.e_dns = e

# Get experiment handle and port of db
self.exp = exp
self.port = port
# Should be IP address not hostname, since "-.,'" in hostname will cause a crash
self.entry_db = entry_db
self.is_db_cluster = is_db_cluster
# Get runtime environment
self.runtime = runtime

# Connect python redis client to an orchestrator database
self.client = Client(address=f"{self.entry_db}:{str(self.port)}", cluster=self.is_db_cluster)
self.client = Client(address=self.runtime.db_entry, cluster=False)

# Build tag from tag plus env number
if tag:
self.tag = [tag+str(i)+'_' for i in range(self.n_envs)]
self.tag = [f'{tag}{i:03d}_' for i in range(self.n_envs)]
else:
self.tag = None

# Startup FLEXI instances inside experiment to get state size
self.flexi = self._start_flexi(self.exp, self.n_procs, self.n_envs)

# Get current state from FLEXI environment
# Startup FLEXI instances to get state size
self.flexi = self._start_flexi()
self._state = self._get_current_state()

# End FLEXI again. Otherwise it will compute the entire run...
self._end_flexi()
self._stop_flexi()

# Specify action and observation dimensions (neglect first batch dimension)
self._action_spec = array_spec.ArraySpec(
Expand All @@ -141,6 +123,16 @@ def __del__(self):
"""Finalize launched FLEXI instances if deleted."""
self.stop()

def stop(self):
"""Stops all flexi instances inside launched in this environment."""
if self.flexi:
self._stop_flexi()

def start(self):
"""Starts all flexi instances with configuration specified in initialization."""
self.flexi = self._start_flexi()
self._state = self._get_current_state()

@property
def batched(self):
"""Override batched property to indicate that environment is batched."""
Expand All @@ -151,128 +143,45 @@ def batch_size(self):
"""Override batch size property according to chosen batch size."""
return self.n_envs

def stop(self):
"""Stops all flexi instances inside launched in this environment."""
if self.exp:
self._stop_flexi_instances(self.exp)

def start(self):
"""Starts all flexi instances with configuration specified in initialization."""
# Start new FLEXI instance and get initial state
self.flexi = self._start_flexi(self.exp, self.n_procs, self.n_envs)
self._state = self._get_current_state()

def _stop_flexi_instances(self, exp):
"""Stop all FLEXI instances.
Uses the current SmartSim experiment to loop over all FLEXI instances
and stop them if they have not finished yet.
Args:
exp (smartsim.Experiment): Experiment in which the Orchestrator and
the FLEXI instances were launched.
Returns:
None
TODO:
* `exp` should be a property of the class.
"""
if self.flexi:
for flexi in self.flexi:
if not exp.finished(flexi):
exp.stop(flexi)

def _start_flexi(self, exp, n_procs, n_envs):
"""Start FLEXI instances within SmartSim experiment.
def _start_flexi(self):
"""Start FLEXI instances within runtime environment.
Args:
exp (smartsim.Experiment): Experiment in which the Orchestrator
was launched.
n_procs (int): Number of processors used to run each simulation
environment.
n_envs (int): Number of environments to be launched.
Returns:
List of `smartsim` handles for each started FLEXI environment.
"""

# Build hostlist to specify on which hosts each flexi is started
# First check: Are there enough free ranks?
ranks_avail = self.n_procs_per_node*len(self.hosts)
ranks_needed = n_envs*n_procs
if ranks_needed > ranks_avail:
rlxout.warning(f'Only {ranks_avail} ranks are available, but {ranks_needed} would be required '+
'to start {n_envs} on {n_procs} each.')

# Distribute ranks to instances in a round robin fashion
# TODO: Get ranks directly from hostfile for PBS Orchestrator
hosts_per_flexi = np.zeros((n_envs, 2), dtype=np.int8)
n_cores_used = 0
for i in range(n_envs):
# 1. Get first node the instance has ranks on
hosts_per_flexi[i, 0] = n_cores_used // self.n_procs_per_node
# 2. Increase amount of used cores accordingly
n_cores_used = n_cores_used + n_procs
# 3. Get last node the instance has ranks on
hosts_per_flexi[i,1] = (n_cores_used-1) // self.n_procs_per_node # last node

flexi = []
# Build list of individual FLEXI instances
for i in range(n_envs):

exe_args = []
exe_name = []
# Build list of arguments for each executable
for i in range(self.n_envs):
# First argument is parameter file
exe_args.append([self.prm_file])
# Select (possibly random drawn) restart file for the run
if self.random_restart_file:
restart_file = random.choice(self.restart_files)
exe_args[i].append(random.choice(self.restart_files))
else:
restart_file = self.restart_files[0]

args = [self.prm_file, restart_file]
exe_args[i].append(self.restart_files[0])
# Tags are given to FLEXI with the Syntax "--tag [value]"
if self.tag[i]:
# Tags are given to FLEXI with the Syntax "--tag [value]"
args.append('--tag')
args.append(self.tag[i])

if self.env_launcher == 'mpirun':
run_args = {
"rankfile": self.rankfiles[i],
"report-bindings": ""
}
run = MpirunSettings(
exe=self.flexi_path,
exe_args=args,
run_args=run_args)
run.set_tasks(n_procs)

# Create MPMD Settings and start later in single command
if self.mpi_launch_mpmd:
if i == 0:
f_mpmd = run
else:
f_mpmd.make_mpmd(run)

else: # Otherwise do not use launcher
run = RunSettings(exe=self.flexi_path, exe_args=args)

# Create and directly start FLEXI instances
if not self.mpi_launch_mpmd:
flexi_instance = exp.create_model(self.tag[i]+"flexi", run)
exp.start(flexi_instance, block=False, summary=False)
flexi.append(flexi_instance)

# Create MPMD Model from settings and start
if self.mpi_launch_mpmd:
flexi = exp.create_model(self.tag[0]+"flexi", f_mpmd)
exp.start(flexi, block=False, summary=False)
flexi = [flexi]

return flexi

def _end_flexi(self):
"""Stop FLEXI experiment with SmartSim."""
exe_args[i].append('--tag')
exe_args[i].append(self.tag[i])
# And create name of executable
exe_name.append(self.tag[i]+'flexi')

# Launch executables in runtime
return self.runtime.launch_models(
self.flexi_path,
exe_args,
exe_name,
self.n_procs,
self.n_envs,
launcher=self.env_launcher
)

def _stop_flexi(self):
"""Stop all FLEXI instances currently running."""
for flexi in self.flexi:
if not self.exp.finished(flexi):
self.exp.stop(flexi)
if not self.runtime.exp.finished(flexi):
self.runtime.exp.stop(flexi)

def _reset(self):
"""Resets the FLEXI environment.
Expand All @@ -287,15 +196,7 @@ def _reset(self):
functions "start()" and "stop()" manually. This function is thus
deprecated.
"""

# Close FLEXI instance
# self._end_flexi()
self._episode_ended = False

# Start new FLEXI instance and get initial state
# self.flexi = self._start_flexi(self.exp,self.n_procs,self.n_envs)
# self._state = self._get_current_state()

return ts.restart(self._state, batch_size=self.n_envs)

def _step(self, action):
Expand All @@ -311,9 +212,9 @@ def _step(self, action):
Returns:
Transition containing (state, reward, discount)
."""
"""
if self._episode_ended:
# The last action ended the episode. Ignore the current action and start a new episode.
# The last action ended the episode. Ignore the current action and start new one.
return self.reset()

# Update Prediction
Expand Down Expand Up @@ -370,7 +271,7 @@ def _get_current_state(self):
do_init = True
key = "state"
for tag in self.tag:
self.client.poll_tensor(tag+key, 10, 10000)
self.client.poll_tensor(tag+key, 10, 1000)
try:
data = self.client.get_tensor(tag+key)
except Exception:
Expand Down
4 changes: 2 additions & 2 deletions src/relexi/io/readin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def read_config(file_in, flatten=True):
Returns:
dict: Dictionary containing the contents of the file
"""
with open(file_in, 'r', encoding='ascii') as stream:
with open(file_in, 'r', encoding='utf-8') as stream:
config = yaml.safe_load(stream)

if flatten:
Expand Down Expand Up @@ -83,7 +83,7 @@ def read_file(filename, newline=None):
Returns:
str: Returns single string with the content of the file.
"""
with open(filename, 'r', encoding='ascii') as myfile:
with open(filename, 'r', encoding='utf-8') as myfile:
data = myfile.read()
if newline:
return data.replace('\n', newline)
Expand Down
Loading

0 comments on commit 46cba2e

Please sign in to comment.