diff --git a/examples/HIT_24_DOF/prm.yaml b/examples/HIT_24_DOF/prm.yaml index 0124dbd..bffbfeb 100644 --- a/examples/HIT_24_DOF/prm.yaml +++ b/examples/HIT_24_DOF/prm.yaml @@ -14,8 +14,6 @@ environment: executable_path: ../../../flexi-extensions/build/bin/flexi parameter_file: ./simulation_files/parameter_flexi.ini mesh_file: ./simulation_files/CART_HEX_PERIODIC_004_mesh.h5 - #local_dir: /var/tmp - mpi_launch_mpmd: False env_launcher: local # All parameters for setting up the reward @@ -64,6 +62,5 @@ performance: # SmartSim smartsim: smartsim_port: 6780 - smartsim_num_dbs: 1 - smartsim_launcher: pbs - smartsim_orchestrator: pbs + smartsim_network_interface: local + smartsim_orchestrator: auto diff --git a/examples/HIT_32_DOF/prm.yaml b/examples/HIT_32_DOF/prm.yaml index 70c3368..ee16652 100644 --- a/examples/HIT_32_DOF/prm.yaml +++ b/examples/HIT_32_DOF/prm.yaml @@ -14,8 +14,6 @@ environment: executable_path: ../../../flexi-extensions/build/bin/flexi parameter_file: ./simulation_files/parameter_flexi.ini mesh_file: ./simulation_files/CART_HEX_PERIODIC_004_mesh.h5 - #local_dir: /var/tmp - mpi_launch_mpmd: False env_launcher: local # All parameters for setting up the reward @@ -64,6 +62,5 @@ performance: # SmartSim smartsim: smartsim_port: 6780 - smartsim_num_dbs: 1 - smartsim_launcher: pbs - smartsim_orchestrator: pbs + smartsim_network_interface: local + smartsim_orchestrator: auto diff --git a/requirements.txt b/requirements.txt index 4c6712e..8363819 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ smartsim>=0.4,<0.7 smartredis -tensorflow>=2.9,<2.16 +tensorflow>=2.15,<2.16 tf-agents cmake pyyaml @@ -8,3 +8,4 @@ matplotlib pdoc pytest pytest-cov +setuptools!=70.* diff --git a/src/relexi/env/flexiEnvSmartSim.py b/src/relexi/env/flexiEnvSmartSim.py index 7b0e1df..351dbaf 100644 --- a/src/relexi/env/flexiEnvSmartSim.py +++ b/src/relexi/env/flexiEnvSmartSim.py @@ -38,7 +38,7 @@ class flexiEnv(py_environment.PyEnvironment): """ def __init__(self, - exp, + runtime, flexi_path, prm_file, spectra_file, @@ -47,16 +47,10 @@ def __init__(self, reward_scale, n_procs=1, n_envs=1, - n_procs_per_node=1, restart_files=None, random_restart_file=True, - entry_db="127.0.0.1", - port=6780, - is_db_cluster=False, debug=0, tag=None, - hosts=None, - rankfiles=None, mpi_launch_mpmd=False, env_launcher='mpirun' ): @@ -64,11 +58,8 @@ def __init__(self, # Path to FLEXI executable self.n_envs = n_envs self.n_procs = n_procs - self.n_procs_per_node = n_procs_per_node self.prm_file = prm_file self.flexi_path = flexi_path - self.hosts = hosts - self.rankfiles = rankfiles # Save values for reward function self.reward_kmin = reward_kmin @@ -80,9 +71,8 @@ def __init__(self, # Sanity Check Launcher self.env_launcher = env_launcher if ((self.env_launcher == 'local') and (n_procs != 1)): - rlxout.warning("For env_launcher 'local', only single execution is allowed! Setting 'n_procs=1'!") - rlxout.warning("To run evironments in parallel with MPI, use env_launcher='mpi'!") - n_procs = 1 + rlxout.warning("For env_launcher 'local', only single execution is allowed! Setting 'n_procs=1'") + self.n_procs = 1 if self.env_launcher == 'mpirun': self.mpi_launch_mpmd = mpi_launch_mpmd @@ -95,7 +85,7 @@ def __init__(self, # Read target DNS spectra from file if spectra_file: - with open(spectra_file, 'r', encoding='ascii') as csvfile: + with open(spectra_file, 'r', encoding='utf-8') as csvfile: reader = csv.reader(csvfile, delimiter=',') col_e = next(reader).index('E') e = [] @@ -103,30 +93,22 @@ def __init__(self, e.append(float(rows[col_e])) self.e_dns = e - # Get experiment handle and port of db - self.exp = exp - self.port = port - # Should be IP address not hostname, since "-.,'" in hostname will cause a crash - self.entry_db = entry_db - self.is_db_cluster = is_db_cluster + # Get runtime environment + self.runtime = runtime # Connect python redis client to an orchestrator database - self.client = Client(address=f"{self.entry_db}:{str(self.port)}", cluster=self.is_db_cluster) + self.client = Client(address=self.runtime.db_entry, cluster=False) # Build tag from tag plus env number if tag: - self.tag = [tag+str(i)+'_' for i in range(self.n_envs)] + self.tag = [f'{tag}{i:03d}_' for i in range(self.n_envs)] else: self.tag = None - # Startup FLEXI instances inside experiment to get state size - self.flexi = self._start_flexi(self.exp, self.n_procs, self.n_envs) - - # Get current state from FLEXI environment + # Startup FLEXI instances to get state size + self.flexi = self._start_flexi() self._state = self._get_current_state() - - # End FLEXI again. Otherwise it will compute the entire run... - self._end_flexi() + self._stop_flexi() # Specify action and observation dimensions (neglect first batch dimension) self._action_spec = array_spec.ArraySpec( @@ -141,6 +123,16 @@ def __del__(self): """Finalize launched FLEXI instances if deleted.""" self.stop() + def stop(self): + """Stops all flexi instances inside launched in this environment.""" + if self.flexi: + self._stop_flexi() + + def start(self): + """Starts all flexi instances with configuration specified in initialization.""" + self.flexi = self._start_flexi() + self._state = self._get_current_state() + @property def batched(self): """Override batched property to indicate that environment is batched.""" @@ -151,128 +143,45 @@ def batch_size(self): """Override batch size property according to chosen batch size.""" return self.n_envs - def stop(self): - """Stops all flexi instances inside launched in this environment.""" - if self.exp: - self._stop_flexi_instances(self.exp) - - def start(self): - """Starts all flexi instances with configuration specified in initialization.""" - # Start new FLEXI instance and get initial state - self.flexi = self._start_flexi(self.exp, self.n_procs, self.n_envs) - self._state = self._get_current_state() - - def _stop_flexi_instances(self, exp): - """Stop all FLEXI instances. - - Uses the current SmartSim experiment to loop over all FLEXI instances - and stop them if they have not finished yet. - - Args: - exp (smartsim.Experiment): Experiment in which the Orchestrator and - the FLEXI instances were launched. - - Returns: - None - - TODO: - * `exp` should be a property of the class. - """ - if self.flexi: - for flexi in self.flexi: - if not exp.finished(flexi): - exp.stop(flexi) - - def _start_flexi(self, exp, n_procs, n_envs): - """Start FLEXI instances within SmartSim experiment. + def _start_flexi(self): + """Start FLEXI instances within runtime environment. - Args: - exp (smartsim.Experiment): Experiment in which the Orchestrator - was launched. - n_procs (int): Number of processors used to run each simulation - environment. - n_envs (int): Number of environments to be launched. - Returns: List of `smartsim` handles for each started FLEXI environment. """ - - # Build hostlist to specify on which hosts each flexi is started - # First check: Are there enough free ranks? - ranks_avail = self.n_procs_per_node*len(self.hosts) - ranks_needed = n_envs*n_procs - if ranks_needed > ranks_avail: - rlxout.warning(f'Only {ranks_avail} ranks are available, but {ranks_needed} would be required '+ - 'to start {n_envs} on {n_procs} each.') - - # Distribute ranks to instances in a round robin fashion - # TODO: Get ranks directly from hostfile for PBS Orchestrator - hosts_per_flexi = np.zeros((n_envs, 2), dtype=np.int8) - n_cores_used = 0 - for i in range(n_envs): - # 1. Get first node the instance has ranks on - hosts_per_flexi[i, 0] = n_cores_used // self.n_procs_per_node - # 2. Increase amount of used cores accordingly - n_cores_used = n_cores_used + n_procs - # 3. Get last node the instance has ranks on - hosts_per_flexi[i,1] = (n_cores_used-1) // self.n_procs_per_node # last node - - flexi = [] - # Build list of individual FLEXI instances - for i in range(n_envs): - + exe_args = [] + exe_name = [] + # Build list of arguments for each executable + for i in range(self.n_envs): + # First argument is parameter file + exe_args.append([self.prm_file]) # Select (possibly random drawn) restart file for the run if self.random_restart_file: - restart_file = random.choice(self.restart_files) + exe_args[i].append(random.choice(self.restart_files)) else: - restart_file = self.restart_files[0] - - args = [self.prm_file, restart_file] + exe_args[i].append(self.restart_files[0]) + # Tags are given to FLEXI with the Syntax "--tag [value]" if self.tag[i]: - # Tags are given to FLEXI with the Syntax "--tag [value]" - args.append('--tag') - args.append(self.tag[i]) - - if self.env_launcher == 'mpirun': - run_args = { - "rankfile": self.rankfiles[i], - "report-bindings": "" - } - run = MpirunSettings( - exe=self.flexi_path, - exe_args=args, - run_args=run_args) - run.set_tasks(n_procs) - - # Create MPMD Settings and start later in single command - if self.mpi_launch_mpmd: - if i == 0: - f_mpmd = run - else: - f_mpmd.make_mpmd(run) - - else: # Otherwise do not use launcher - run = RunSettings(exe=self.flexi_path, exe_args=args) - - # Create and directly start FLEXI instances - if not self.mpi_launch_mpmd: - flexi_instance = exp.create_model(self.tag[i]+"flexi", run) - exp.start(flexi_instance, block=False, summary=False) - flexi.append(flexi_instance) - - # Create MPMD Model from settings and start - if self.mpi_launch_mpmd: - flexi = exp.create_model(self.tag[0]+"flexi", f_mpmd) - exp.start(flexi, block=False, summary=False) - flexi = [flexi] - - return flexi - - def _end_flexi(self): - """Stop FLEXI experiment with SmartSim.""" + exe_args[i].append('--tag') + exe_args[i].append(self.tag[i]) + # And create name of executable + exe_name.append(self.tag[i]+'flexi') + + # Launch executables in runtime + return self.runtime.launch_models( + self.flexi_path, + exe_args, + exe_name, + self.n_procs, + self.n_envs, + launcher=self.env_launcher + ) + + def _stop_flexi(self): + """Stop all FLEXI instances currently running.""" for flexi in self.flexi: - if not self.exp.finished(flexi): - self.exp.stop(flexi) + if not self.runtime.exp.finished(flexi): + self.runtime.exp.stop(flexi) def _reset(self): """Resets the FLEXI environment. @@ -287,15 +196,7 @@ def _reset(self): functions "start()" and "stop()" manually. This function is thus deprecated. """ - - # Close FLEXI instance - # self._end_flexi() self._episode_ended = False - - # Start new FLEXI instance and get initial state - # self.flexi = self._start_flexi(self.exp,self.n_procs,self.n_envs) - # self._state = self._get_current_state() - return ts.restart(self._state, batch_size=self.n_envs) def _step(self, action): @@ -311,9 +212,9 @@ def _step(self, action): Returns: Transition containing (state, reward, discount) - .""" + """ if self._episode_ended: - # The last action ended the episode. Ignore the current action and start a new episode. + # The last action ended the episode. Ignore the current action and start new one. return self.reset() # Update Prediction @@ -370,7 +271,7 @@ def _get_current_state(self): do_init = True key = "state" for tag in self.tag: - self.client.poll_tensor(tag+key, 10, 10000) + self.client.poll_tensor(tag+key, 10, 1000) try: data = self.client.get_tensor(tag+key) except Exception: diff --git a/src/relexi/io/readin.py b/src/relexi/io/readin.py index 6446327..efaf90c 100644 --- a/src/relexi/io/readin.py +++ b/src/relexi/io/readin.py @@ -20,7 +20,7 @@ def read_config(file_in, flatten=True): Returns: dict: Dictionary containing the contents of the file """ - with open(file_in, 'r', encoding='ascii') as stream: + with open(file_in, 'r', encoding='utf-8') as stream: config = yaml.safe_load(stream) if flatten: @@ -83,7 +83,7 @@ def read_file(filename, newline=None): Returns: str: Returns single string with the content of the file. """ - with open(filename, 'r', encoding='ascii') as myfile: + with open(filename, 'r', encoding='utf-8') as myfile: data = myfile.read() if newline: return data.replace('\n', newline) diff --git a/src/relexi/rl/ppo/train.py b/src/relexi/rl/ppo/train.py index d47be33..f2bacc7 100644 --- a/src/relexi/rl/ppo/train.py +++ b/src/relexi/rl/ppo/train.py @@ -21,10 +21,10 @@ import relexi.rl.models import relexi.rl.tf_helpers import relexi.env.flexiEnvSmartSim -import relexi.smartsim.init_smartsim +import relexi.runtime import relexi.io.readin as rlxin import relexi.io.output as rlxout -from relexi.smartsim.helpers import generate_rankfile_ompi, copy_to_nodes, parser_flexi_parameters +from relexi.runtime.helpers import copy_to_nodes, parser_flexi_parameters def train( config_file @@ -62,9 +62,9 @@ def train( config_file ,do_profile = False ,smartsim_port = 6780 ,smartsim_num_dbs = 1 - ,smartsim_launcher = "local" - ,smartsim_orchestrator = "local" - ,env_launcher = "mpirun" + ,smartsim_orchestrator = 'local' + ,smartsim_network_interface = 'local' + ,env_launcher = 'mpirun' ,mpi_launch_mpmd = False ,local_dir = None ,n_procs_per_node=128 # Hawk @@ -107,81 +107,55 @@ def train( config_file tf.config.optimizer.set_jit(True) # Initialize SmartSim - exp, worker_nodes, db, entry_db, is_db_cluster = relexi.smartsim.init_smartsim.init_smartsim(port = smartsim_port - ,num_dbs = smartsim_num_dbs - ,launcher_type = smartsim_launcher - ,orchestrator_type = smartsim_orchestrator - ) - - # generating rankfiles for OpenMPI - if mpi_launch_mpmd: - # If all MPI jobs are run with single mpirun command, all jobs are allocated based on single rankfile - rank_files = generate_rankfile_ompi(worker_nodes - ,n_procs_per_node - ,n_par_env=1 - ,ranks_per_env=num_parallel_environments*num_procs_per_environment - ) - - else: - # Otherwise every MPI job gets its own rankfile - rank_files = generate_rankfile_ompi(worker_nodes - ,n_procs_per_node - ,num_parallel_environments - ,num_procs_per_environment - ) - - # Copy all local files into local directory, possibly fast RAM-Disk or similar - # for performance and to reduce Filesystem access - if local_dir: - # Prefix with PBS Job ID if PBS job - if smartsim_launcher.casefold() == 'pbs': - pbs_job_id = os.environ['PBS_JOBID'] - local_dir = os.path.join(local_dir, pbs_job_id) - - rlxout.info(f"Moving local files to {local_dir} ..." ) - - # Get list of all nodes - nodes = copy.deepcopy(worker_nodes) - ai_node = os.environ['HOSTNAME'] - nodes.insert(0, ai_node) - - # Move all files to local dir - # TODO: control which files are copied by 'local_files' variable! - train_files = copy_to_nodes(train_files, local_dir,nodes,subfolder='train_files') - eval_files = copy_to_nodes(eval_files, local_dir,nodes,subfolder='eval_files') - reward_spectrum_file = copy_to_nodes(reward_spectrum_file,local_dir,nodes,subfolder='reward_files') - rank_files = copy_to_nodes(rank_files, local_dir,nodes,subfolder='ompi_rank_files') - mesh_file = copy_to_nodes(mesh_file, local_dir,nodes,subfolder='ompi_rank_files') - - # We have to update the meshfile in the parameter file before copying - parameter_file = parser_flexi_parameters(parameter_file, 'MeshFile', mesh_file) - parameter_file = copy_to_nodes(parameter_file,local_dir,nodes,subfolder='parameter_files') - - rlxout.info(" DONE! ",newline=False) - - if mpi_launch_mpmd: - rank_files = [rank_files[0] for _ in range(num_parallel_environments)] + runtime = relexi.runtime.Runtime( + type_=smartsim_orchestrator, + db_port=smartsim_port, + db_network_interface=smartsim_network_interface, + ) + runtime.info() + + ## Copy all local files into local directory, possibly fast RAM-Disk or similar + ## for performance and to reduce Filesystem access + #if local_dir: + # # Prefix with PBS Job ID if PBS job + # if smartsim_launcher.casefold() == 'pbs': + # pbs_job_id = os.environ['PBS_JOBID'] + # local_dir = os.path.join(local_dir, pbs_job_id) + + # rlxout.info(f"Moving local files to {local_dir} ..." ) + + # # Get list of all nodes + # nodes = copy.deepcopy(runtime.workers) + # ai_node = os.environ['HOSTNAME'] + # nodes.insert(0, ai_node) + + # # Move all files to local dir + # # TODO: control which files are copied by 'local_files' variable! + # train_files = copy_to_nodes(train_files, local_dir,nodes,subfolder='train_files') + # eval_files = copy_to_nodes(eval_files, local_dir,nodes,subfolder='eval_files') + # reward_spectrum_file = copy_to_nodes(reward_spectrum_file,local_dir,nodes,subfolder='reward_files') + # mesh_file = copy_to_nodes(mesh_file, local_dir,nodes,subfolder='meshf_file') + + # # We have to update the meshfile in the parameter file before copying + # parameter_file = parser_flexi_parameters(parameter_file, 'MeshFile', mesh_file) + # parameter_file = copy_to_nodes(parameter_file,local_dir,nodes,subfolder='parameter_files') + + # rlxout.info(" DONE! ",newline=False) # Instantiate parallel collection environment my_env = tf_py_environment.TFPyEnvironment( - relexi.env.flexiEnvSmartSim.flexiEnv(exp + relexi.env.flexiEnvSmartSim.flexiEnv(runtime ,executable_path ,parameter_file ,tag = 'train' - ,port = smartsim_port - ,entry_db = entry_db - ,is_db_cluster = is_db_cluster - ,hosts = worker_nodes ,n_envs = num_parallel_environments ,n_procs = num_procs_per_environment - ,n_procs_per_node = n_procs_per_node ,spectra_file = reward_spectrum_file ,reward_kmin = reward_kmin ,reward_kmax = reward_kmax ,reward_scale = reward_scale ,restart_files = train_files - ,rankfiles = rank_files ,env_launcher = env_launcher ,mpi_launch_mpmd = mpi_launch_mpmd ,debug = debug @@ -193,23 +167,17 @@ def train( config_file eval_files = train_files my_eval_env = tf_py_environment.TFPyEnvironment( - relexi.env.flexiEnvSmartSim.flexiEnv(exp + relexi.env.flexiEnvSmartSim.flexiEnv(runtime ,executable_path ,parameter_file ,tag = 'eval' - ,port = smartsim_port - ,entry_db = entry_db - ,is_db_cluster = is_db_cluster - ,hosts = worker_nodes ,n_procs = num_procs_per_environment - ,n_procs_per_node = n_procs_per_node ,spectra_file = reward_spectrum_file ,reward_kmin = reward_kmin ,reward_kmax = reward_kmax ,reward_scale = reward_scale ,restart_files = eval_files ,random_restart_file = False - ,rankfiles = rank_files ,env_launcher = env_launcher ,debug = debug )) @@ -391,5 +359,5 @@ def train( config_file del my_env del my_eval_env - exp.stop(db) + del runtime time.sleep(2.) # Wait for orchestrator to be properly closed diff --git a/src/relexi/rl/tf_helpers.py b/src/relexi/rl/tf_helpers.py index 89ee787..18e811b 100644 --- a/src/relexi/rl/tf_helpers.py +++ b/src/relexi/rl/tf_helpers.py @@ -4,7 +4,7 @@ import tensorflow as tf import relexi.io.output as rlxout -import relexi.smartsim.helpers +import relexi.runtime.helpers def write_metrics(metrics, step, category_name): @@ -101,7 +101,7 @@ def collect_trajectories(driver, env): # Stop FLEXI instances env.stop() # Cleanup OMP files - relexi.smartsim.helpers.clean_ompi_tmpfiles() + relexi.runtime.helpers.clean_ompi_tmpfiles() @tf.function diff --git a/src/relexi/runtime/__init__.py b/src/relexi/runtime/__init__.py new file mode 100644 index 0000000..c19c6fa --- /dev/null +++ b/src/relexi/runtime/__init__.py @@ -0,0 +1,23 @@ +"""Provides functionalities to create and manage an HPC runtime environment. + +This module provides the necessary functionalities to create and manage a +runtime environment on distributed HPC systems for distributed Reinforcement +Learning (RL) algorithms. The main class is `Runtime`, which is used to +identify the resources available on the system, create the necessary +environment variables, and run the given program. The `LaunchConfig` class +provides a configuration for launching a batch of executables in the runtime. +This include most importantly the distribution of the executables across the +available resources. The `helpers` module provides some helper functions to +facilitate the process of creating and managing the runtime environment. + +The public classes and functions are: + - `Runtime`: The main class to create and manage a runtime environment. + - `LaunchConfig`: A class to define the launch configuration for a batch of + executables in a runtime. + - `helpers`: A module with helper functions to facilitate the process. +""" +from .launch_configuration import LaunchConfig +from .runtime import Runtime +from . import helpers + +__all__ = ['Runtime', 'LaunchConfig', 'helpers'] diff --git a/src/relexi/smartsim/helpers.py b/src/relexi/runtime/helpers.py similarity index 65% rename from src/relexi/smartsim/helpers.py rename to src/relexi/runtime/helpers.py index 840b2ae..f6b2acb 100644 --- a/src/relexi/smartsim/helpers.py +++ b/src/relexi/runtime/helpers.py @@ -14,46 +14,6 @@ import shutil -def generate_rankfile_ompi(hosts, cores_per_node, n_par_env, ranks_per_env, base_path=None): - """Generate rank file for OpenMPI process binding. - - Args: - hosts (list): List of hostnames - cores_per_node (int): Number of cores per node - n_par_env (int): Number of parallel environments to be launched - ranks_per_env (int): Number of ranks per environments - base_path (str): (Optional.) Path to the directory of the rank files - - Returns: - list: List of filenames of the rankfiles - """ - - # If no base_path given, use CWD - if base_path: - rankfile_dir = os.path.join(base_path, "ompi-rankfiles") - else: - rankfile_dir = "ompi-rankfiles" - - if os.path.exists(rankfile_dir): - shutil.rmtree(rankfile_dir) - os.makedirs(rankfile_dir, exist_ok=True) - - rankfiles = [] - next_free_slot = 0 - n_cores_used = 0 - for env_idx in range(n_par_env): - filename = os.path.join(rankfile_dir, f"par_env_{env_idx:05d}") - rankfiles.append(filename) - with open(filename, 'w', encoding='ascii') as rankfile: - for i in range(ranks_per_env): - rankfile.write(f"rank {i}={hosts[n_cores_used//cores_per_node]} slot={next_free_slot}\n") - next_free_slot = next_free_slot + 1 - n_cores_used = n_cores_used + 1 - if next_free_slot > (cores_per_node - 1): - next_free_slot = 0 - return rankfiles - - def parser_flexi_parameters(parameter_file, keyword, value): """Changes the value for a keyword in a FLEXI parameter file. @@ -70,20 +30,20 @@ def parser_flexi_parameters(parameter_file, keyword, value): Returns: str: Path to new (modified) parameter file """ - pattern = re.compile(fr"({keyword})\s*=.*", re.IGNORECASE) - subst = keyword + "=" + value + pattern = re.compile(fr'({keyword})\s*=.*', re.IGNORECASE) + subst = keyword + '=' + value parameter_file_in = parameter_file pbs_job_id = os.environ['PBS_JOBID'] - parameter_file_out = f"parameter_flexi-{pbs_job_id[0:7]}.ini" + parameter_file_out = f'parameter_flexi-{pbs_job_id[0:7]}.ini' - with open(parameter_file_out, 'w', encoding='ascii') as new_file: - with open(parameter_file_in, 'r', encoding='ascii') as old_file: + with open(parameter_file_out, 'w', encoding='utf-8') as new_file: + with open(parameter_file_in, 'r', encoding='utf-8') as old_file: for line in old_file: new_file.write(pattern.sub(subst, line)) return parameter_file_out -def clean_ompi_tmpfiles(env_variable="TMPDIR"): +def clean_ompi_tmpfiles(env_variable='TMPDIR'): """Cleans up temporary files created by OpenMPI. OpenMPI creates temporary files with each invocation, which might cause the @@ -96,13 +56,12 @@ def clean_ompi_tmpfiles(env_variable="TMPDIR"): folder for termporary files is stored. Returns: - int: - * 1 if operation was successfull, - * -1 otherwise. + int: Returns + - `1` if operation was successfull, + - `-1` otherwise. """ - try: - tmpdir = os.environ[env_variable] - except Exception: + tmpdir = os.getenv(env_variable) + if tmpdir is None: return -1 path = os.path.join(tmpdir, 'ompi.*') @@ -160,7 +119,7 @@ def copy_to_nodes(my_files, base_path, hosts, subfolder=None): os.system(f'ssh {host} mkdir -p {target}') # Copy files for my_file in my_files: - os.system(f'scp -q "{my_file}" "{host}:{target}"') + os.system(f'scp -q {my_file} {host}:{target}') # Get new path of files my_files_new = [] diff --git a/src/relexi/runtime/launch_configuration.py b/src/relexi/runtime/launch_configuration.py new file mode 100644 index 0000000..78ee0e2 --- /dev/null +++ b/src/relexi/runtime/launch_configuration.py @@ -0,0 +1,291 @@ +#/usr/bin/env python3 + +"""Launch configuration for a batch of executables in a runtime.""" + +from __future__ import annotations + +from typing import List + +import numpy as np + +class LaunchConfig(): + """Launch configuration for a batch of executables in a runtime. + + This class provides a launch configuration for a batch of executables in a + runtime. It contains the specific configuration to distribute the + executables to the available resources. The configuration can be of three + types: 'local', 'mpirun', and 'srun'. The 'local' configuration is used + for local execution, 'mpirun' for OpenMPI, and 'srun' for SLURM. + + Attributes: + type (str): Type of the launch configuration. + n_exe (int): Number of executables to launch. + n_procs (List[int]): Number of processes to launch per executable. Must + be of length `n_exe`. + workers (List[str]): List of worker nodes available. + n_worker_slots (int): Number of available worker slots. + config (dict): Configuration dictionary. + rankfiles (List[str]): List of rankfiles if `type=='mpirun'`, is `None` + otherwise. + hosts_per_exe (List[List[str]]): List of lists containing the hostnames + for each executable if `type=='srun'`, is `None` otherwise. + + Methods: + from_dict(cls, config: dict, runtime: Runtime) -> LaunchConfiguration: + Instantiate a launch configuration from a configuration dictionary. + as_dict() -> dict: + Return the launch configuration as a dictionary. + is_compatible(config: dict) -> bool: + Check if other launch configuration is compatible based on dict. + config_is_valid(config: dict) -> bool: + Check if the given configuration is valid. + + Raises: + ValueError: If the requested configuration is invalid. + RuntimeError: If the configuration cannot be generated. + """ + + TYPES = ['local', 'mpirun', 'srun'] + """Supported types of launch configurations.""" + + CONFIG_KEYS = ['type', 'n_exe', 'n_procs', 'workers'] + """Keys for the configuration dictionary.""" + + def __init__(self, type_: str, runtime, n_exe: int, n_procs: List[int]): + """Initialize the launch configuration. + + Args: + type_ (str): Type of the launch configuration. + runtime (Runtime): Runtime instance for which launch configuration + should be generated. + n_exe (int): Number of executables to launch. + n_procs (List[int]): Number of processes to launch per executable. + """ + self.type = type_ + self.n_exe = n_exe + self.n_procs = n_procs + self.workers = runtime.workers + self.n_worker_slots = runtime.n_worker_slots + # Set with property setter to check for validity + self.config = {'type': self.type, + 'n_exe': self.n_exe, + 'n_procs': self.n_procs, + 'workers': self.workers} + + # Generate rankfiles for OpenMPI + self._rankfiles = None + if self.type == 'mpirun': + slots_per_node = runtime.n_worker_slots//len(self.workers) + self._rankfiles = self._generate_rankfile_ompi(self.workers, + slots_per_node, + n_exe, + n_procs) + # Distribute workers for SLURM + self._hosts_per_exe = None + if self.type == 'srun': + self._hosts_per_exe = self._distribute_workers_slurm(n_procs, + n_exe, + runtime.workers, + runtime.n_worker_slots) + + @property + def config(self) -> dict: + """Return the current launch configuration as dict.""" + return self._config + + @config.setter + def config(self, config: dict): + """Set a launch configuration.""" + if not self.config_is_valid(config): + raise ValueError('Invalid configuration dictionary!') + if sum(config['n_procs']) > self.n_worker_slots: + raise ValueError('Not enough processes available!') + self._config = config + + @property + def type(self) -> str: + """Return the type of the launch configuration.""" + return self._type + + @type.setter + def type(self, value): + """Set the type of the launch configuration.""" + if value not in self.TYPES: + raise ValueError('Invalid launch configuration type!') + self._type = value + + @property + def rankfiles(self) -> List[str]: + """Return paths to rankfiles for `mpirun` launcher.""" + if self._rankfiles is None: + raise ValueError('Rankfiles not yet generated!') + return self._rankfiles + + @property + def hosts_per_exe(self) -> List[List[str]]: + """Return the hosts for each executable for `srun`.""" + if self._hosts_per_exe is None: + raise ValueError('Hosts not yet generated!') + return self._hosts_per_exe + + def as_dict(self) -> dict: + """Return the launch configuration as a dictionary.""" + return self._config + + def is_compatible(self, config: dict) -> bool: + """Check if other launch configuration is compatible based on dict. + + Another launch configuration is compatible if the first `n_exe` + executables can be launched on the same resources as the first `n_exe` + executables of the existing launch configuration. + + Args: + config (dict): Dictionary of the other launch configuration. + + Returns: + bool: `True` if the configurations are compatible, `False` otherwise. + """ + if self.type != config['type']: + return False + if self.n_exe != config['n_exe']: + return False + if self.n_procs != config['n_procs']: + return False + return True + + @classmethod + def from_dict(cls, config: dict, runtime) -> LaunchConfig: + """Instantiate a launch configuration from a configuration dictionary. + + The dictionary has to take the form of: + ``` + { + 'type': str, + 'n_exe': int, + 'n_procs': List[int] + 'workers': List[str] + } + ``` + + Args: + config (dict): Configuration dictionary. + runtime (Runtime): Runtime object. + + Returns: + LaunchConfig: Launch configuration instance. + """ + if not cls.config_is_valid(config): + raise ValueError('Invalid configuration dictionary!') + return cls(config['type'], runtime, config['n_exe'], config['n_procs']) + + @classmethod + def config_is_valid(cls, config: dict) -> bool: + """Check if the given configuration is valid. + + The configuration is valid if it contains all necessary keys and the + values are valid. However, the availability of the resources is not + checked! + + Args: + config (dict): Configuration dictionary. + + Returns: + bool: `True` if the configuration is valid, `False` otherwise. + """ + if not all(key in config for key in cls.CONFIG_KEYS): + raise ValueError('Configuration dictionary does not contain all neccessary keys!') + if config['type'] not in cls.TYPES: + return False + if len(config['n_procs']) != config['n_exe']: + return False + if len(config['workers']) < 1: + return False + if min(config['n_procs']) < 1: + return False + if config['n_exe'] < 1: + return False + return True + + @staticmethod + def _distribute_workers_slurm( + n_procs: List[int], + n_exe: int, + workers: List[str], + procs_avail: int + ) -> List[List[str]]: + """Distribute the executables to the available nodes for SLURM. + + Uses two different strategies to distribute the executables to the + available nodes. Either multiple executables per node or multiple nodes + per executable. However, a single executable cannot be placed on parts + of multiple nodes, since this causes problems with SLURM. Either + executable spans multiple whole nodes, or single partial node. + + Args: + n_procs (List[int]): Number of processes to launch per executable. + n_exe (int): Number of executables to launch. + workers (List[str]): List of worker nodes available. + procs_avail (int): Number of available processes. + + Returns: + List[List[str]]: List of lists containing the hostnames for each + executable. + """ + if sum(n_procs) > procs_avail: + raise RuntimeError('Failed to distribute models to resources!') + procs_per_worker = procs_avail//len(workers) + nodes_avail = workers + slurm_hosts_per_exe = [] + # Either multiple executables per node or multiple nodes per executable + if max(n_procs) > procs_per_worker: + # Use whole nodes per executable + for i in range(n_exe): + n_nodes_req = int(np.ceil(n_procs[i]/procs_per_worker)) + current_hosts = [] + for _ in range(n_nodes_req): + current_hosts.append(nodes_avail.pop(0)) + slurm_hosts_per_exe.append(current_hosts) + else: + # Use multiple executables peper + cores_avail = procs_per_worker + for i in range(n_exe): + # Does not fit on remaining slots on node + if n_procs[i] > cores_avail: + if len(nodes_avail) <= 1: + raise RuntimeError('Failed to distribute models to resources!') + # Take next node + nodes_avail.pop(0) + cores_avail = procs_per_worker + cores_avail -= n_procs[i] + slurm_hosts_per_exe.append([nodes_avail[0]]) + return slurm_hosts_per_exe + + @staticmethod + def _generate_rankfile_ompi(workers: List[str], + n_slots_per_worker: int, + n_exe: int, + n_procs: List[int],) -> List[str]: + """Generate rank file for OpenMPI process binding. + + Args: + workers (list): List of hostnames + n_exe (int): Number of executables to be launched + n_procs (int): Number of ranks per environments + + Returns: + list: List of filenames of the rankfiles + """ + rankfiles = [] + next_free_slot = 0 + n_cores_used = 0 + for i_exe in range(n_exe): + filename = f'.env_{i_exe:05d}.txt' + rankfiles.append(filename) + with open(filename, 'w', encoding='utf-8') as rankfile: + for i in range(n_procs[i_exe]): + rankfile.write(f'rank {i}={workers[n_cores_used//n_slots_per_worker]} slot={next_free_slot}\n') + next_free_slot += 1 + n_cores_used += 1 + if next_free_slot >= n_slots_per_worker: + next_free_slot = 0 + return rankfiles diff --git a/src/relexi/runtime/runtime.py b/src/relexi/runtime/runtime.py new file mode 100644 index 0000000..3e91da4 --- /dev/null +++ b/src/relexi/runtime/runtime.py @@ -0,0 +1,477 @@ +#!/usr/bin/env python3 + +"""The Runtime class for managing the HPC runtime environment.""" + +import os +import socket +import subprocess +from typing import List, Optional, Union + +import numpy as np + +import smartsim +from smartsim import Experiment +from smartsim.database.orchestrator import Orchestrator + +import relexi.io.output as rlxout +from relexi.runtime import LaunchConfig + + +class Runtime(): + """Class containing information about and handling the HPC environment. + + This class defines the interface for an HPC runtime, which contains all + information of the HPC environment used for the training environments and + methods to manage it. This includes in particular to identify the scheduler + environment, the hostnames of the available nodes and launching and + managing the SmartSim `Experiment` including the `Orchestrator`. + + Two possible modes are available for using the available compute resources: + + - **Distributed**: The `localhost` running the main training script + becomes the dedicated **Head** node that hosts the database, + evaluates the model and runs the training loop. All training + environments are distributed to the available **Worker** nodes. + + - **Local**: The training script, the database and the training + environments are all placed on the `localhost`. + + More than 1 node has to be available in order to initiate a + **Distributed** runtime. Otherwise, if only a single node is available, + **Local** mode will be used. The mode of the `Runtime` can be retrieved + via the `is_distributed` attribute. + + Attributes: + type (str): Type of runtime. Must be `'local'`, `'pbs'`, or `'slurm'`. + is_distributed (bool): Indicates whether a **Distributed** or **Local** + runtime is used. + hosts (list): List of hostnames of available nodes. + head (str): Hostname of Head node (is name of `localhost` if in + **Local Mode**). + workers (list): List of worker nodes (contains only `localhost` if in + **Local Mode**). + n_worker_slots (int): Total number of slots available on workers. + db (Orchestrator): The launched `Orchestrator` database from the + `smartsim` packagev. + db_entry (str): IP address and port of the host of the database. + Takes the form `IP_ADDRESS:PORT`. + exp (Experiment): The `Experiment` object the `Orchestrator` is + launched with. + launch_config (LaunchConfig): CurrentcConfiguration for launching a + batch of executables in the runtime. + + Raises: + ValueError: If the scheduler type is not supported. + RuntimeError: If the following conditions are met: + - The scheduler environment cannot be identified, or + - Launching the `Orchestrator` fails. + NotImplementedError: If the methods are not implemented for the + provided scheduler type. + """ + + TYPES = ['local', 'pbs', 'slurm'] + """Supported types of runtime environments.""" + + def __init__( + self, + type_: Optional[str] = 'auto', + db_network_interface: Optional[str] = 'lo', + db_port: Optional[int] = 6790, + do_launch_orchestrator: Optional[bool] = True + ): + """Initialize the Runtime. + + Args: + type_ (str, optional): Type of runtime. Must be `'local'`, `'pbs'`, + `'slurm'` or `'auto'`. Defaults to `'auto'`, for which the type + of runtime environment is identified automatically. + db_network_interface (str, optional): Network interface to use for + the Orchestrator. Defaults to `'lo'`. + db_port (int, optional): Port to start the Orchestrator on. + Defaults to `6790`. + do_launch_orchestrator (bool, optional): Whether to launch the + `Orchestrator` immediately. Defaults to `True`. + """ + try: + # Using SmartSim utility to identify type automatically + if type_ == 'auto': + rlxout.info('Identifying environment...') + scheduler = smartsim.wlm.detect_launcher() + rlxout.info(f'Found "{scheduler}" environment!', newline=False) + self.type = scheduler.casefold().strip() + else: + self.type = type_.casefold().strip() + + rlxout.info(f'Setting up "{self.type}" runtime...') + self._hosts = self._get_hostlist() + # Check that actually sufficient hosts found + if self.type != 'local' and len(self._hosts) < 2: + raise ValueError('Less than 2 hosts found in environment!') + except Exception as e: + rlxout.warning(f'Failed: {e}') + if type_ != 'local': + rlxout.info('Trying to setup LOCAL runtime instead...', newline=False) + try: + self.type = 'local' + self._hosts = self._get_hostlist() + except Exception as f: + raise RuntimeError('Also failed to setup LOCAL environment!') from f + else: + raise RuntimeError('Failed to setup LOCAL training environment!') from e + rlxout.info('Success!', newline=False) + + self._exp = None + self._db = None + self._db_entry = None + if do_launch_orchestrator: + try: + self._exp, self._db, self._db_entry = self._launch_orchestrator( + port=db_port, + network_interface=db_network_interface, + ) + except Exception as e: + raise RuntimeError('Failed to launch the Orchestrator!') from e + self.launch_config = None + self.n_worker_slots = self._get_total_worker_slots() + + def __del__(self): + if self.db: + try: + self.exp.stop(self.db) + except Exception as e: + raise RuntimeError('Failed to stop the Orchestrator!') from e + + def info(self): + """Prints information about the current runtime environment.""" + rlxout.info('Configuration of runtime environment:') + rlxout.info(f' Scheduler: {self.type}', newline=False) + rlxout.info(f' Hosts: {self.hosts}', newline=False) + if self.is_distributed: + rlxout.info('Running in DISTRIBUTED mode:') + rlxout.info(f' Head: {self.head}', newline=False) + rlxout.info(f' Workers: {self.workers}', newline=False) + else: + rlxout.info(f'Running in LOCAL mode on: {self.head}') + + def launch_models( + self, + exe: Union[str, List[str]], + exe_args: Union[str, List[str]], + exe_name: Union[str, List[str]], + n_procs: Union[int, List[int]], + n_exe: Optional[int] = 1, + launcher: Optional[str] = 'local' + ) -> List[smartsim.entity.model.Model]: + """Launch the models on the available nodes. + + Args: + exe (str, List(str)): Path to the executable to launch. Can either + be a single path or a list of length `n_exe`. If only a single + path is provided, it is used for all executables. + exe_args (str, List(str)): Arguments to pass to the executable. Can + either be a single string or a list of length `n_exe`. If only + a single string is provided, it is used for all executables. + exe_name (str, List(str)): Name of the executable used to identify + launched model in the SmartSim context. Can either be a single + string or a list of length `n_exe`. If only a single string is + provided, it is used for all executables. + n_procs (int, List(int)): Number of processes to launch. Can either + be a single integer or a list of length `n_exe`. If only a + single integer is provided, it is used for all executables. + n_exe (int): Number of executables to launch. Defaults to `1`. + launcher (str): Launcher to use for the executable. Must be one of + `'mpirun'`, `'srun'`, or `'local'`. + """ + def _validate_args(arg, n): + """Validate the length of the arguments.""" + if isinstance(arg, list) and not len(arg) == n: + raise ValueError(f'Expected {n} entries, but got {len(arg)}!') + if not isinstance(arg, list): + return [arg] * n + return arg + + # Validate that arguments are of correct length + exe = _validate_args(exe, n_exe) + exe_args = _validate_args(exe_args, n_exe) + exe_name = _validate_args(exe_name, n_exe) + n_procs = _validate_args(n_procs, n_exe) + + # Check compatibility of launcher and scheduler type + if (launcher == 'local') and (max(n_procs) > 1): + raise ValueError('Local launcher only supports single process execution!') + if (launcher == 'srun') and (self.type != 'slurm'): + raise ValueError('srun launcher only supported for SLURM scheduler!') + + # Check if launch config is up-to-date and create or update if required + config_dict = {'type': launcher, + 'n_exe': n_exe, + 'n_procs': n_procs, + 'workers': self.workers} + if self.launch_config is None: + self.launch_config = LaunchConfig.from_dict(config_dict, self) + else: + if not self.launch_config.is_compatible(config_dict): + self.launch_config.config = config_dict + + models = [] + for i in range(n_exe): + if launcher == 'local': + run = self.exp.create_run_settings( + exe=exe[i], + exe_args=exe_args[i], + ) + else: + if launcher == 'mpirun': + run_args = { + 'rankfile': self.launch_config.rankfiles[i], + 'report-bindings': None + } + elif launcher == 'srun': + run_args = { + 'mpi': 'pmix', + 'nodelist': ','.join(self.launch_config.hosts_per_exe[i]), + 'distribution': 'block:block:block,Pack', + 'cpu-bind': 'verbose', + 'exclusive': None, + } + run = self.exp.create_run_settings( + exe=exe[i], + exe_args=exe_args[i], + run_command=launcher, + run_args=run_args + ) + run.set_tasks(n_procs[i]) + + model = self.exp.create_model(exe_name[i], run) + self.exp.start(model, block=False, summary=False) + models.append(model) + + return models + + @property + def type(self) -> str: + """Get the type of the runtime environment. + + Returns: + str: Type of the runtime environment. + """ + return self._type + + @type.setter + def type(self, value: str): + """Set the type of environment used for the runtime. + + Validates that the type is actually supported. + + Args: + value (str): Type of the runtime environment. + """ + if value not in self.TYPES: + raise ValueError(f'Runtime of type {value} not supported.') + self._type = value + + @property + def hosts(self) -> List[str]: + """Get the list of hosts within the runtime environment. + + Returns: + list: List containing the hostnames as strings. + """ + return self._hosts + + @property + def is_distributed(self) -> bool: + """Whether runtime is **Distributed** or **Local**. + + Checks for the number of hosts available. If more than one host is + found in runtime, it runs in **Distributed** mode, otherwise it runs + in **Local** mode. + + Returns: + bool: `True` if **Distributed**, `False` otherwise. + """ + return len(self._hosts) > 1 + + @property + def head(self) -> str: + """Return name of Head node, which is where this instance is located. + + Returns: + str: Hostname of the Head node. + """ + return self._get_local_hostname() + + @property + def workers(self) -> List[str]: + """Returns list of Workers found in the current runtime environment. + + Obtains Workers by removing the Head node from the list of hosts. + + Returns: + list: List containing the hostnames of Workers as strings. + """ + if self.is_distributed: + workers = self.hosts.copy() + if self.head in workers: + workers.remove(self.head) + else: + rlxout.warning(f'Localhost "{self.head}" not found in hosts list:') + rlxout.warning(f' {workers}') + return workers + return self.hosts + + @property + def db(self) -> Orchestrator: + """Get the Orchestrator database instance. + + Returns: + Orchestrator: The `Orchestrator` database instance. + """ + return self._db + + @property + def db_entry(self) -> str: + """Get IP address of database. + + Returns: + str: Address of the database. Takes the form `IP_ADDRESS:PORT`. + """ + return self._db_entry + + @property + def exp(self) -> Experiment: + """Get the `Experiment` instance the `Orchestrator` is launched in. + + Returns: + Experiment: The `Experiment` instance. + """ + return self._exp + + def _launch_orchestrator( + self, + port: int, + network_interface: str + ) -> tuple[Experiment, Orchestrator, str]: + """Launches a SmartSim `Orchestratori` in the current runtime. + + Args: + port (int): Port to start the `Orchestrator` on. + network_interface (str): Network interface to use for the + `Orchestrator`. + + Returns: + tuple: The `Experiment` instance, the `Orchestrator` instance and + the IP address of the host of the database. + """ + # Generate relexi experiment + exp = Experiment('relexi', launcher=self.type) + + # Initialize the orchestrator based on the orchestrator_type + db = exp.create_database( + port=port, + interface='lo' if self.type == 'local' else network_interface, + hosts=self.head if self.type in {'pbs', 'slurm'} else None, + ) + + rlxout.info('Starting the Orchestrator...', newline=False) + try: + exp.start(db) + except Exception as e: + raise RuntimeError(f'Failed to start the Orchestrator: {e}') from e + rlxout.info('Success!', newline=False) + + db_entry = socket.gethostbyname(db.hosts[0]) + rlxout.info('Use this command to shutdown database if not terminated correctly:') + rlxout.info(f'$(smart dbcli) -h {db.hosts[0]} -p {port} shutdown', newline=False) + + return exp, db, f'{db_entry}:{port}' + + + def _get_hostlist(self) -> List[str]: + """Get the list of hosts the script is executed on. + + Returns: + list: List containing the hostnames as strings. + + Raises: + NotImplementedError: If the method is not implemented for the + scheduler type. + """ + if self.type == 'local': + return [self._get_local_hostname()] + if self.type == 'pbs': + return smartsim.wlm.pbs.get_hosts() + if self.type == 'slurm': + return smartsim.wlm.slurm.get_hosts() + raise NotImplementedError( + f'Method `get_hostlist` not implemented for runtime "{self.type}"!') + + def _get_slots_per_node_slurm(self) -> List[int]: + """Get the number of slots per node for the SLURM scheduler. + + Returns: + list(int): List containing the number of slots per node. + """ + if self.type != 'slurm': + raise ValueError('Method only available for SLURM scheduler!') + # 1. Get the nodelist + slots = os.getenv('SLURM_JOB_CPUS_PER_NODE') + if slots is None: + raise ValueError("SLURM_JOB_CPUS_PER_NODE is not set!") + # 2. split all entries at comma + nodelist = slots.split(',') + # 3. expand all compressed entries + expanded_list = [] + for entry in nodelist: + if '(' in entry: + num_cpus, count = entry.split('(x') + num_cpus = int(num_cpus) + count = int(count[:-1]) # remove trailing ')' + expanded_list.extend([num_cpus] * count) + else: + expanded_list.append(int(entry)) + return expanded_list + + def _get_slots_per_node_pbs(self) -> List[int]: + """Get the number of slots per node for the PBS scheduler. + + Returns: + list(int): List containing the number of slots per node. + """ + if self.type != 'pbs': + raise ValueError('Method only available for PBS scheduler!') + # 1. Get the nodelist + node_file = os.getenv('PBS_NODEFILE') + if node_file is None: + raise KeyError('Environment variable "PBS_NODEFILE" not found!') + # 2. Read the nodelist + with open(node_file, 'r', encoding='utf-8') as f: + nodes = [line.strip().split('.')[0] for line in f.readlines()] + # 3. Count the number of slots (i.e. lines) per node + return [nodes.count(host) for host in self.hosts] + + def _get_total_worker_slots(self) -> int: + """Get the total number of worker slots available in the runtime. + + Returns: + int: Number of slots per worker node. + """ + if self.type == 'local': + # Leave one core for the head node + return os.cpu_count()-1 + if self.type == 'pbs': + slots_per_node = self._get_slots_per_node_pbs() + return np.sum(slots_per_node[1:]) + if self.type == 'slurm': + slots_per_node = self._get_slots_per_node_slurm() + return np.sum(slots_per_node[1:]) + raise NotImplementedError( + f'Method `get_slots_per_worker` not implemented for runtime "{self.type}"!') + + def _get_local_hostname(self) -> str: + """Get the hostname of the machine executing the Python script. + + Returns: + str: Hostname of the local machine executing the script. + """ + return socket.gethostname().split('.')[0] diff --git a/src/relexi/smartsim/__init__.py b/src/relexi/smartsim/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/relexi/smartsim/init_smartsim.py b/src/relexi/smartsim/init_smartsim.py deleted file mode 100644 index 337f4dd..0000000 --- a/src/relexi/smartsim/init_smartsim.py +++ /dev/null @@ -1,201 +0,0 @@ -#!/usr/bin/env python3 - -"""Helpers for launching the SmartSim Orchestrator.""" - -import os -import json -import socket -import subprocess - -from smartsim import Experiment -from smartsim.database import Orchestrator - -import relexi.io.output as rlxout - - -def get_host(): - """Get the host the script is executed on from the env variable. - - Returns: - Hostname as string - """ - return socket.gethostname() - - -def get_pbs_hosts(): - """Get the host list from the PBS Nodefile. - - Returns: - List containing the hostnames as strings - """ - nodefile_path = os.environ["PBS_NODEFILE"] - with open(nodefile_path, "r", encoding='ascii') as f: - hostlist = [] - for line in f: - # only take the name not the entire ip-address otherwise there will be an error - # it will set the command line flag "mpirun ... -host " - # This only works with the hostname shorthand - full_host_ip = line.strip() # e.g. abc.ib0...de - hostname = full_host_ip.split(".")[0] # e.g. abc - if not hostname in hostlist: - hostlist.append(hostname) - return hostlist - - -def get_pbs_walltime(): - """Get the walltime of the current PBS job. - - Returns: - Walltime of current PBS job. - """ - job_id = os.environ["PBS_JOBID"] - cmd = f"qstat -xfF json {job_id}" - stat_json_str = subprocess.check_output(cmd, shell=True, text=True) - stat_json = json.loads(stat_json_str) - return stat_json["Jobs"][job_id]["Resource_List"]["walltime"] - - -def init_smartsim( - port=6790, - num_dbs=1, - network_interface="ib0", - launcher_type="local", - orchestrator_type="local" -): - """Starts the orchestrator, launches an experiment and gets list of hosts. - - Args: - port (int): (Optional.) Port number on which Orchestrator will be - launched. - num_dbs (int): (Optional.) Number of databases should be launched. - `num_dbs>1` imply that the database is clustered , i.e. distributed - across multiple instances. - network_interface (string) = (Optional.) Name of network interface to - be used to establish communication to clients. - launcher_type (string): (Optional.) Launcher to be used to start the - executable. Currently implemented are: - * local - * mpirun - orchestrator_type (string): Scheduler environment in which the - orchestrator is launched. Currently implemented are: - * local - * pbs - Returns: - smartsim.Experiment: The experiments in which the Orchestrator was - started - list: List of names of the nodes used as workers to run the simulations - smarsim.Orchestrator: The launched Orchestrator - string: The IP address and port used to access the Orchestrator - bool: Flag to indicate whether Orchestrator is clustered. - - Note: - Admissable combinations of Experiment launcher and orchestrator type: - * laun.: local, orch.: pbs = incompatible. - * laun.: local, orch.: local = only 1 in-memory database possible. - `mpirun` will still distribute the flexi instances to other - nodes. - * laun.: pbs, orch.: pbs = does not support clusters of size 2 - otherwise works flawlessly (warning: orchestrator doesn't find - the cluster configuration). - * laun.: pbs, orch.: local = not supported error: not supported by - PBSPro. - - TODO: - * Add support for SLURM. - * Clean implementation and nesting. - * Make object out of this. - * Allow to reconnect to already started Orchestrator - * Or closue Orchestrator still open from previous run - """ - - rlxout.small_banner('Starting SmartSim...') - - # Check whether launcher and orchestrator are identical (case-insensitive) - if not launcher_type.casefold() == orchestrator_type.casefold(): - rlxout.warning(f'Chosen Launcher {launcher_type} and orchestrator {orchestrator_type} are incompatible! Please choose identical types for both!') - - # Is database clustered, i.e. hosted on different nodes? - db_is_clustered = num_dbs > 1 - - # First try PBS if necessary. Use local configuration as backup - pbs_failed = False - if launcher_type.casefold() == 'pbs': - try: - # try to load the batch settings from the batch job environment - # variables like PBS_JOBID and PBS_NODEFILE - walltime = get_pbs_walltime() - hosts = get_pbs_hosts() - num_hosts = len(hosts) - rlxout.info(f"Identified available nodes: {hosts}") - - # Maximum of 1 DB per node allowed for PBS Orchestrator - if num_hosts < num_dbs: - rlxout.warning(f"You selected {num_dbs} databases and {num_hosts} nodes, but maximum is 1 database per node. Setting number of databases to {num_hosts}") - num_dbs = num_hosts - - # Clustered DB with PBS orchestrator requires at least 3 nodes for reasons - if db_is_clustered: - if num_dbs < 3: - rlxout.warning(f"Only {num_dbs} databases requested, but clustered orchestrator requires 3 or more databases. Non-clustered orchestrator is launched instead!") - db_is_clustered = False - else: - rlxout.info(f"Using a clustered database with {num_dbs} instances.") - else: - rlxout.info("Using an UNclustered database on root node.") - - except Exception: - # If no env. variables for batchjob, use the local launcher - rlxout.warning("Didn't find pbs batch environment. Switching to local setup.") - pbs_failed = True - - # If local configuration is required or if scheduler-based launcher failed. - if (launcher_type.casefold() == 'local') or pbs_failed: - launcher_type = "local" - orchestrator_type = "local" - db_is_clustered = False - hosts = [get_host()] - - # Generate flexi experiment - exp = Experiment("flexi", launcher=launcher_type) - - # Initialize the orchestrator based on the orchestrator_type - if orchestrator_type.casefold() == "local": - db = Orchestrator( - port=port, - interface='lo' - ) - - elif orchestrator_type.casefold() == "pbs": - db = Orchestrator( - launcher='pbs', - port=port, - db_nodes=num_dbs, - batch=False, # false if it is launched in an interactive batch job - time=walltime, # this is necessary, otherwise the orchestrator wont run properly - interface=network_interface, - hosts=hosts, # this must be the hostnames of the nodes, it mustn't be the ip-addresses - run_command="mpirun" - ) - else: - rlxout.warning(f"Orchester type {orchestrator_type} not implemented!") - raise NotImplementedError - - # startup Orchestrator - rlxout.info("Starting the Database...", newline=False) - exp.start(db) - - # get the database nodes and select the first one - entry_db = socket.gethostbyname(db.hosts[0]) - rlxout.info(f"Identified 1 of {len(db.hosts)} database hosts to later connect clients to: {entry_db}", newline=False) - rlxout.info("If the SmartRedis database isn't stopping properly you can use this command to stop it from the command line:") - for db_host in db.hosts: - rlxout.info(f"$(smart dbcli) -h {db_host} -p {port} shutdown", newline=False) - - # If multiple nodes are available, the first executes Relexi, while - # all worker processes are started on different nodes. - if len(hosts) > 1: - worker_nodes = hosts[1:] - else: # Only single node - worker_nodes = hosts - - return exp, worker_nodes, db, entry_db, db_is_clustered diff --git a/tests/test_env_flexi.py b/tests/test_env_flexi.py index 7fbf311..85d7a51 100644 --- a/tests/test_env_flexi.py +++ b/tests/test_env_flexi.py @@ -1,19 +1,22 @@ #!/usr/bin/env python3 +import os.path +import unittest +from unittest.mock import patch + +from smartsim import Experiment + from .context import relexi import relexi.env.flexiEnvSmartSim as rlxenv -from smartsim import Experiment -from unittest.mock import patch -import os.path from relexi.env.flexiEnvSmartSim import Client """ Contains pytest - tests for the functionalities of the relexi.env.flexiEnv module """ @patch.object(rlxenv.flexiEnv,'_start_flexi') @patch.object(rlxenv.flexiEnv,'_get_current_state') -@patch.object(rlxenv.flexiEnv,'_end_flexi') +@patch.object(rlxenv.flexiEnv,'stop') @patch('relexi.env.flexiEnvSmartSim.Client') -def init_flexi_env(mock__start_flexi, mock__get_current_state, mock__end_flexi, mock_Client): +def init_flexi_env(mock__start_flexi, mock__get_current_state, mock_stop, mock_Client): smartsim_port = 6780 smartsim_num_dbs = 1 @@ -59,7 +62,6 @@ def init_flexi_env(mock__start_flexi, mock__get_current_state, mock__end_flexi, ,tag = 'eval' ,port = smartsim_port ,entry_db = entry_db - ,is_db_cluster = is_db_cluster ,hosts = worker_nodes ,n_procs = num_procs_per_environment ,n_envs = num_parallel_environments @@ -77,6 +79,7 @@ def init_flexi_env(mock__start_flexi, mock__get_current_state, mock__end_flexi, return flexi_env +@unittest.skip('Has to be adapted to new Runtime implementation.') @patch('os.path.isfile') @patch('os.access') @patch.object(Experiment, 'start') diff --git a/tests/test_helpers.py b/tests/test_helpers.py deleted file mode 100644 index e2efce3..0000000 --- a/tests/test_helpers.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 - -from .context import relexi -import relexi.smartsim.helpers as rlxhelpers - -""" Contains pytest - tests for the functionalities of the relexi.smartsim.helpers module """ - -def test_generate_rankfile_ompi(tmp_path): - - hosts = ["r1n1c1n1", "r1n1c1n2"] - cores_per_node = 4 - n_par_env = 4 - ranks_per_env = 2 - base_path = tmp_path - - expected = list() - - expected.append("rank 0=r1n1c1n1 slot=0\nrank 1=r1n1c1n1 slot=1") - expected.append("rank 0=r1n1c1n1 slot=2\nrank 1=r1n1c1n1 slot=3") - expected.append("rank 0=r1n1c1n2 slot=0\nrank 1=r1n1c1n2 slot=1") - expected.append("rank 0=r1n1c1n2 slot=2\nrank 1=r1n1c1n2 slot=3") - - rankfiles_out = rlxhelpers.generate_rankfile_ompi(hosts, cores_per_node, n_par_env, ranks_per_env, base_path) - - i = 0 - for rankfile in rankfiles_out: - with open(rankfile, 'r') as fh: - assert fh.read().rstrip()==expected[i], f"Rankfile for rank {i} is wrong" - i = i+1 diff --git a/tests/test_launch_configuration.py b/tests/test_launch_configuration.py new file mode 100644 index 0000000..1cfc6a5 --- /dev/null +++ b/tests/test_launch_configuration.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 + +"""Test for the launchconfig module based on `unittest` package.""" + +import unittest + +from .context import relexi +from relexi.runtime import LaunchConfig, Runtime + +class TestLaunchConfig(unittest.TestCase): + """Tests for the launchconfig module.""" + + #@unittest.mock.patch('runtime.Runtime.n_worker_slots', return_value=4) + #def test_launchconfig_init_local(self): + # """Test init of launchconfig with database.""" + # launchconfig = LaunchConfig(type_='local') + # assert launchconfig is not None + # assert launchconfig.type == 'local' + + def test_generate_rankfile_ompi(self): + """Test generate_rankfile_ompi.""" + # Specify configuration + workers = ['r1n1c1n1', 'r1n1c1n2'] + n_slots_per_worker = 4 + n_par_env = 4 + n_procs = [2, 2, 1, 3] + # Generate rankfiles + rankfiles = LaunchConfig._generate_rankfile_ompi( + workers, + n_slots_per_worker, + n_par_env, + n_procs + ) + # Prepare expected content + expected = [ + 'rank 0=r1n1c1n1 slot=0\nrank 1=r1n1c1n1 slot=1', + 'rank 0=r1n1c1n1 slot=2\nrank 1=r1n1c1n1 slot=3', + 'rank 0=r1n1c1n2 slot=0', + 'rank 0=r1n1c1n2 slot=1\nrank 1=r1n1c1n2 slot=2\nrank 2=r1n1c1n2 slot=3' + ] + # Check that rankfiles are correct + for i, rankfile in enumerate(rankfiles): + with open(rankfile, 'r', encoding='utf-8') as f: + file_lines = f.read().rstrip() + print(f'Rankfile {i}:\n{file_lines}') + assert file_lines == expected[i] + + def test_distribute_workers_slurm_1(self): + """Test distribute_workers_slurm.""" + # Specify configuration + n_procs = [2, 2, 1, 3] + n_exe = 4 + workers = ['r1n1c1n1', 'r1n1c1n2'] + procs_avail = 8 + # Distribute workers + hosts_per_exe = LaunchConfig._distribute_workers_slurm( + n_procs, + n_exe, + workers, + procs_avail + ) + # Check that workers are correct + expected = [['r1n1c1n1'], ['r1n1c1n1'], ['r1n1c1n2'], ['r1n1c1n2']] + assert expected == hosts_per_exe + + def test_distribute_workers_slurm_2(self): + """Test distribute_workers_slurm.""" + # Specify configuration + n_procs = [3, 3] + n_exe = 2 + workers = ['r1n1c1n1', 'r1n1c1n2'] + procs_avail = 8 + # Distribute workers + hosts_per_exe = LaunchConfig._distribute_workers_slurm( + n_procs, + n_exe, + workers, + procs_avail + ) + # Check that workers are correct + expected = [['r1n1c1n1'], ['r1n1c1n2']] + assert expected == hosts_per_exe + + def test_distribute_workers_slurm_3(self): + """Test errors raised when not sufficient resources.""" + # Specify configuration + n_procs = [3, 3] + n_exe = 2 + workers = ['r1n1c1n1', 'r1n1c1n2'] + procs_avail = 4 + # Check that workers are correct + self.assertRaises( + RuntimeError, + LaunchConfig._distribute_workers_slurm, + n_procs, + n_exe, + workers, + procs_avail + ) diff --git a/tests/test_runtime.py b/tests/test_runtime.py new file mode 100644 index 0000000..8e59da4 --- /dev/null +++ b/tests/test_runtime.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 + +"""Tests for the runtime module based on `unittest` package.""" + +import os +import socket +import unittest +from unittest import mock + +import smartsim + +from .context import relexi +from relexi.runtime import Runtime + + +class TestRuntime(unittest.TestCase): + """Tests for the runtime module.""" + def test_runtime_auto(self): + """Test auto-detection of runtime type.""" + runtime = Runtime(type_='auto', do_launch_orchestrator=False) + assert runtime is not None + assert runtime.type == smartsim.wlm.detect_launcher() + + def test_runtime_init(self): + """Test init of runtime with database.""" + runtime = Runtime(type_='local') + assert runtime is not None + assert runtime.type == 'local' + + def test_runtime_info_local(self): + """For Info, we just want to check if it runs without errors.""" + try: + runtime = Runtime(type_='local') + runtime.info() + except Exception as e: + self.fail(f"Runtime.info() raised an exception: {e}") + + def test_runtime_env_local(self): + runtime = Runtime(type_='local', db_port=6780) + assert runtime is not None + assert runtime.type == 'local' + assert not runtime.is_distributed + assert runtime.hosts == [runtime.head] + assert runtime.hosts == runtime.workers + assert runtime.db_entry == '127.0.0.1:6780' + assert runtime.db is not None + assert runtime.exp is not None + + @mock.patch.dict(os.environ, {'SLURM_JOB_CPUS_PER_NODE': '4(x2),8,16(x2)'}) + def test_runtime_init_slurm(self): + """Test setup based on mocked SLURM_JOB_CPUS_PER_NODE.""" + # Also mock retrieval of hostnames, since smartsim util needs + # `scontrol` for that, which is not installed on non-SLURM systems. + with mock.patch('smartsim.wlm.slurm.get_hosts', + return_value=['node1', 'node2', 'node3', 'node4', 'node5']): + # Set localhost to correct name + with mock.patch('socket.gethostname', return_value='node1'): + runtime = Runtime(type_='slurm', do_launch_orchestrator=False) + assert runtime is not None + assert runtime.is_distributed + assert runtime.type == 'slurm' + assert runtime.hosts == ['node1', 'node2', 'node3', 'node4', 'node5'] + assert runtime.workers == ['node2','node3', 'node4', 'node5'] + assert runtime.head == 'node1' + assert runtime.n_worker_slots == 44 + assert runtime._get_slots_per_node_slurm() == [4, 4, 8, 16, 16] + + @mock.patch.dict(os.environ, {'PBS_NODEFILE': '.nodefile.mock'}) + def test_runtime_env_pbs_1(self): + """Test setup based on mocked nodefile.""" + # Prepare rank file + with open('.nodefile.mock', 'w', encoding='utf-8') as f: + f.write('node1\nnode2\nnode2\nnode2\nnode3\nnode3\n') + # Set localhost to correct name + with mock.patch('socket.gethostname', return_value='node1'): + runtime = Runtime(type_='pbs', do_launch_orchestrator=False) + assert runtime is not None + assert runtime.is_distributed + assert runtime.type == 'pbs' + assert runtime.hosts == ['node1', 'node2', 'node3'] + assert runtime.workers == ['node2','node3'] + assert runtime.head == 'node1' + assert runtime.n_worker_slots == 5 + assert runtime._get_slots_per_node_pbs() == [1, 3, 2] + + @mock.patch.dict(os.environ, {'PBS_NODEFILE': '.nodefile.mock'}) + def test_runtime_env_pbs_2(self): + """Test long-form hostnames in nodefile.""" + # Prepare rank file + with open('.nodefile.mock', 'w', encoding='utf-8') as f: + f.write('node1.some.thing\nnode1.some.thing\nnode2.some.thing\nnode2.some.thing\nnode2.some.thing\nnode3.some.thing\n') + # Set localhost to correct name + with mock.patch('socket.gethostname', return_value='node1'): + runtime = Runtime(type_='pbs', do_launch_orchestrator=False) + assert runtime is not None + assert runtime.is_distributed + assert runtime.type == 'pbs' + assert runtime.hosts == ['node1', 'node2', 'node3'] + assert runtime.workers == ['node2','node3'] + assert runtime.head == 'node1' + assert runtime.n_worker_slots == 4 + assert runtime._get_slots_per_node_pbs() == [2, 3, 1] + + @mock.patch.dict(os.environ, {'PBS_NODEFILE': '.nodefile.mock'}) + def test_runtime_env_pbs_3(self): + """Test fallback to 'local' mode for empty nodefile""" + # Prepare rank file + with open ('.nodefile.mock', 'w', encoding='utf-8') as f: + f.write('') + runtime = Runtime(type_='pbs', do_launch_orchestrator=False) + assert runtime is not None + assert runtime.type == 'local' + assert not runtime.is_distributed + assert runtime.hosts == [runtime.head] + assert runtime.hosts == runtime.workers + + def test_runtime_init_wrong(self): + """Test fallback for invalid runtime type""" + runtime = Runtime(type_='wrong') + assert runtime.type == 'local' + assert not runtime.is_distributed + assert runtime.hosts == [runtime.head] + assert runtime.hosts == runtime.workers + assert runtime.db is not None + assert runtime.exp is not None