From 1f94020e82e54d6296b932c8b65d6bc771ad9845 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 17 Mar 2024 20:16:18 -0500 Subject: [PATCH] Add type hints --- pysqa/cmd.py | 2 +- pysqa/executor/backend.py | 6 +- pysqa/executor/executor.py | 6 +- pysqa/executor/helper.py | 12 ++-- pysqa/ext/modular.py | 30 +++++----- pysqa/ext/remote.py | 79 +++++++++++++++----------- pysqa/queueadapter.py | 56 +++++++++---------- pysqa/utils/basic.py | 110 ++++++++++++++++++------------------- pysqa/utils/config.py | 2 +- pysqa/utils/execute.py | 12 ++-- pysqa/wrapper/flux.py | 10 ++-- pysqa/wrapper/generic.py | 12 ++-- pysqa/wrapper/gent.py | 8 +-- pysqa/wrapper/lsf.py | 10 ++-- pysqa/wrapper/moab.py | 6 +- pysqa/wrapper/sge.py | 10 ++-- pysqa/wrapper/slurm.py | 12 ++-- pysqa/wrapper/torque.py | 10 ++-- 18 files changed, 204 insertions(+), 189 deletions(-) diff --git a/pysqa/cmd.py b/pysqa/cmd.py index 747bb8e2..7aa72682 100644 --- a/pysqa/cmd.py +++ b/pysqa/cmd.py @@ -7,7 +7,7 @@ from pysqa.utils.execute import execute_command -def command_line(arguments_lst=None, execute_command=execute_command): +def command_line(arguments_lst: list=None, execute_command: callable = execute_command): """ Parse the command line arguments. diff --git a/pysqa/executor/backend.py b/pysqa/executor/backend.py index 776e7643..599dc8e3 100644 --- a/pysqa/executor/backend.py +++ b/pysqa/executor/backend.py @@ -10,7 +10,7 @@ ) -def execute_files_from_list(tasks_in_progress_dict, cache_directory, executor): +def execute_files_from_list(tasks_in_progress_dict: dict, cache_directory: str, executor): file_lst = os.listdir(cache_directory) for file_name_in in file_lst: key = file_name_in.split(".in.pl")[0] @@ -37,7 +37,7 @@ def execute_files_from_list(tasks_in_progress_dict, cache_directory, executor): ) -def execute_tasks(cores, cache_directory): +def execute_tasks(cores: int, cache_directory: str): tasks_in_progress_dict = {} with PyMPIExecutor( max_workers=cores, @@ -58,7 +58,7 @@ def execute_tasks(cores, cache_directory): ) -def command_line(arguments_lst=None): +def command_line(arguments_lst: list=None): if arguments_lst is None: arguments_lst = sys.argv[1:] cores_arg = arguments_lst[arguments_lst.index("--cores") + 1] diff --git a/pysqa/executor/executor.py b/pysqa/executor/executor.py index 1f6e9087..a5a1245d 100644 --- a/pysqa/executor/executor.py +++ b/pysqa/executor/executor.py @@ -12,7 +12,7 @@ class Executor(FutureExecutor): - def __init__(self, cwd=None, queue_adapter=None, queue_adapter_kwargs=None): + def __init__(self, cwd: str=None, queue_adapter=None, queue_adapter_kwargs: dict=None): self._task_queue = queue.Queue() self._memory_dict = {} self._cache_directory = os.path.abspath(os.path.expanduser(cwd)) @@ -42,7 +42,7 @@ def __init__(self, cwd=None, queue_adapter=None, queue_adapter_kwargs=None): ) self._process.start() - def submit(self, fn, *args, **kwargs): + def submit(self, fn: callable, *args, **kwargs): funct_dict = serialize_funct(fn, *args, **kwargs) key = list(funct_dict.keys())[0] if key not in self._memory_dict.keys(): @@ -53,7 +53,7 @@ def submit(self, fn, *args, **kwargs): self._task_queue.put({key: self._memory_dict[key]}) return self._memory_dict[key] - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): if cancel_futures: cancel_items_in_queue(que=self._task_queue) self._task_queue.put({"shutdown": True, "wait": wait}) diff --git a/pysqa/executor/helper.py b/pysqa/executor/helper.py index 0dd250f9..d99980da 100644 --- a/pysqa/executor/helper.py +++ b/pysqa/executor/helper.py @@ -14,7 +14,7 @@ def deserialize(funct_dict): return {} -def find_executed_tasks(future_queue, cache_directory): +def find_executed_tasks(future_queue: queue.Queue, cache_directory: str): task_memory_dict = {} while True: task_dict = {} @@ -32,13 +32,13 @@ def find_executed_tasks(future_queue, cache_directory): ) -def read_from_file(file_name): +def read_from_file(file_name: str) -> dict: name = file_name.split("/")[-1].split(".")[0] with open(file_name, "rb") as f: return {name: f.read()} -def reload_previous_futures(future_queue, future_dict, cache_directory): +def reload_previous_futures(future_queue: queue.Queue, future_dict: dict, cache_directory: str): file_lst = os.listdir(cache_directory) for f in file_lst: if f.endswith(".in.pl"): @@ -54,16 +54,16 @@ def reload_previous_futures(future_queue, future_dict, cache_directory): future_queue.put({key: future_dict[key]}) -def serialize_result(result_dict): +def serialize_result(result_dict: dict): return {k: cloudpickle.dumps(v) for k, v in result_dict.items()} -def serialize_funct(fn, *args, **kwargs): +def serialize_funct(fn: callable, *args, **kwargs): binary = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs}) return {fn.__name__ + _get_hash(binary=binary): binary} -def write_to_file(funct_dict, state, cache_directory): +def write_to_file(funct_dict: dict, state, cache_directory: str): file_name_lst = [] for k, v in funct_dict.items(): file_name = _get_file_name(name=k, state=state) diff --git a/pysqa/ext/modular.py b/pysqa/ext/modular.py index 3036bc70..ce3a3f25 100644 --- a/pysqa/ext/modular.py +++ b/pysqa/ext/modular.py @@ -8,7 +8,7 @@ class ModularQueueAdapter(BasisQueueAdapter): - def __init__(self, config, directory="~/.queues", execute_command=execute_command): + def __init__(self, config: dict, directory: str = "~/.queues", execute_command: callable = execute_command): super(ModularQueueAdapter, self).__init__( config=config, directory=directory, execute_command=execute_command ) @@ -26,16 +26,16 @@ def __init__(self, config, directory="~/.queues", execute_command=execute_comman def submit_job( self, - queue=None, - job_name=None, - working_directory=None, - cores=None, - memory_max=None, - run_time_max=None, - dependency_list=None, - command=None, + queue: str = None, + job_name: str = None, + working_directory: str = None, + cores: int = None, + memory_max: str = None, + run_time_max: int = None, + dependency_list: list[str] = None, + command: str = None, **kwargs, - ): + ) -> int: """ Args: @@ -79,7 +79,7 @@ def submit_job( else: return None - def enable_reservation(self, process_id): + def enable_reservation(self, process_id: int): """ Args: @@ -103,7 +103,7 @@ def enable_reservation(self, process_id): else: return None - def delete_job(self, process_id): + def delete_job(self, process_id: int): """ Args: @@ -127,7 +127,7 @@ def delete_job(self, process_id): else: return None - def get_queue_status(self, user=None): + def get_queue_status(self, user: str = None) -> pandas.DataFrame: """ Args: @@ -155,11 +155,11 @@ def get_queue_status(self, user=None): return df[df["user"] == user] @staticmethod - def _resolve_queue_id(process_id, cluster_dict): + def _resolve_queue_id(process_id: int, cluster_dict: dict): cluster_queue_id = int(process_id / 10) cluster_module = cluster_dict[process_id - cluster_queue_id * 10] return cluster_module, cluster_queue_id @staticmethod - def _switch_cluster_command(cluster_module): + def _switch_cluster_command(cluster_module: str): return ["module", "--quiet", "swap", "cluster/{};".format(cluster_module)] diff --git a/pysqa/ext/remote.py b/pysqa/ext/remote.py index d36f4f1c..8219dd17 100644 --- a/pysqa/ext/remote.py +++ b/pysqa/ext/remote.py @@ -15,7 +15,7 @@ class RemoteQueueAdapter(BasisQueueAdapter): - def __init__(self, config, directory="~/.queues", execute_command=execute_command): + def __init__(self, config: dict, directory: str = "~/.queues", execute_command: callable = execute_command): super(RemoteQueueAdapter, self).__init__( config=config, directory=directory, execute_command=execute_command ) @@ -78,22 +78,37 @@ def __init__(self, config, directory="~/.queues", execute_command=execute_comman self._ssh_proxy_connection = None self._remote_flag = True - def convert_path_to_remote(self, path): + def convert_path_to_remote(self, path: str): working_directory = os.path.abspath(os.path.expanduser(path)) return self._get_remote_working_dir(working_directory=working_directory) def submit_job( self, - queue=None, - job_name=None, - working_directory=None, - cores=None, - memory_max=None, - run_time_max=None, - dependency_list=None, - command=None, + queue: str = None, + job_name: str = None, + working_directory: str = None, + cores: int = None, + memory_max: int = None, + run_time_max: int = None, + dependency_list: list[str] = None, + command: str = None, **kwargs, - ): + ) -> int: + """ + + Args: + queue (str/None): + job_name (str/None): + working_directory (str/None): + cores (int/None): + memory_max (int/None): + run_time_max (int/None): + dependency_list (list/None): + command (str/None): + + Returns: + int: + """ if dependency_list is not None: raise NotImplementedError( "Submitting jobs with dependencies to a remote cluster is not yet supported." @@ -102,7 +117,7 @@ def submit_job( output = self._execute_remote_command(command=command) return int(output.split()[-1]) - def enable_reservation(self, process_id): + def enable_reservation(self, process_id: int) -> str: """ Args: @@ -115,7 +130,7 @@ def enable_reservation(self, process_id): command=self._reservation_command(job_id=process_id) ) - def delete_job(self, process_id): + def delete_job(self, process_id: int) -> str: """ Args: @@ -128,7 +143,7 @@ def delete_job(self, process_id): command=self._delete_command(job_id=process_id) ) - def get_queue_status(self, user=None): + def get_queue_status(self, user: str = None) -> pandas.DataFrame: """ Args: @@ -147,7 +162,7 @@ def get_queue_status(self, user=None): else: return df[df["user"] == user] - def get_job_from_remote(self, working_directory): + def get_job_from_remote(self, working_directory: str): """ Get the results of the calculation - this is necessary when the calculation was executed on a remote host. """ @@ -176,7 +191,7 @@ def get_job_from_remote(self, working_directory): if self._ssh_delete_file_on_remote: self._execute_remote_command(command="rm -r " + remote_working_directory) - def transfer_file(self, file, transfer_back=False, delete_file_on_remote=False): + def transfer_file(self, file: str, transfer_back: bool = False, delete_file_on_remote: bool = False): working_directory = os.path.abspath(os.path.expanduser(file)) remote_working_directory = self._get_remote_working_dir( working_directory=working_directory @@ -200,7 +215,7 @@ def _check_ssh_connection(self): if self._ssh_connection is None: self._ssh_connection = self._open_ssh_connection() - def _transfer_files(self, file_dict, sftp=None, transfer_back=False): + def _transfer_files(self, file_dict: dict, sftp=None, transfer_back: bool = False): if sftp is None: if self._ssh_continous_connection: self._check_ssh_connection() @@ -346,13 +361,13 @@ def _get_queue_status_command(self): def _submit_command( self, - queue=None, - job_name=None, - working_directory=None, - cores=None, - memory_max=None, - run_time_max=None, - command_str=None, + queue: str = None, + job_name: str = None, + working_directory: str = None, + cores: int = None, + memory_max: int = None, + run_time_max: int = None, + command_str: str = None, ): command = self._remote_command() + "--submit " if queue is not None: @@ -371,13 +386,13 @@ def _submit_command( command += '--command "' + command_str + '" ' return command - def _delete_command(self, job_id): + def _delete_command(self, job_id: int) -> str: return self._remote_command() + "--delete --id " + str(job_id) - def _reservation_command(self, job_id): + def _reservation_command(self, job_id: int) -> str: return self._remote_command() + "--reservation --id " + str(job_id) - def _execute_remote_command(self, command): + def _execute_remote_command(self, command: str): if self._ssh_continous_connection: self._check_ssh_connection() ssh = self._ssh_connection @@ -390,13 +405,13 @@ def _execute_remote_command(self, command): ssh.close() return output - def _get_remote_working_dir(self, working_directory): + def _get_remote_working_dir(self, working_directory: str): return os.path.join( self._ssh_remote_path, os.path.relpath(working_directory, self._ssh_local_path), ) - def _create_remote_dir(self, directory): + def _create_remote_dir(self, directory: str): if isinstance(directory, str): self._execute_remote_command(command="mkdir -p " + directory) elif isinstance(directory, list): @@ -407,7 +422,7 @@ def _create_remote_dir(self, directory): else: raise TypeError() - def _transfer_data_to_remote(self, working_directory): + def _transfer_data_to_remote(self, working_directory: str): working_directory = os.path.abspath(os.path.expanduser(working_directory)) remote_working_directory = self._get_remote_working_dir( working_directory=working_directory @@ -432,7 +447,7 @@ def _transfer_data_to_remote(self, working_directory): self._create_remote_dir(directory=new_dir_list) self._transfer_files(file_dict=file_dict, sftp=None, transfer_back=False) - def _get_user(self): + def _get_user(self) -> str: """ Returns: @@ -441,7 +456,7 @@ def _get_user(self): return self._ssh_username @staticmethod - def _get_file_transfer(file, local_dir, remote_dir): + def _get_file_transfer(file: str, local_dir: str, remote_dir: str) -> str: return os.path.abspath( os.path.join(remote_dir, os.path.relpath(file, local_dir)) ) diff --git a/pysqa/queueadapter.py b/pysqa/queueadapter.py index 812e5472..e052c28f 100644 --- a/pysqa/queueadapter.py +++ b/pysqa/queueadapter.py @@ -45,7 +45,7 @@ class QueueAdapter(object): Queues available for auto completion QueueAdapter().queues. returns the queue name. """ - def __init__(self, directory="~/.queues", execute_command=execute_command): + def __init__(self, directory: str = "~/.queues", execute_command: callable = execute_command): queue_yaml = os.path.join(directory, "queue.yaml") clusters_yaml = os.path.join(directory, "clusters.yaml") self._adapter = None @@ -76,7 +76,7 @@ def __init__(self, directory="~/.queues", execute_command=execute_command): ) self._adapter = self._queue_dict[primary_queue] - def list_clusters(self): + def list_clusters(self) -> list[str]: """ List available computing clusters for remote submission @@ -85,7 +85,7 @@ def list_clusters(self): """ return list(self._queue_dict.keys()) - def switch_cluster(self, cluster_name): + def switch_cluster(self, cluster_name: str): """ Switch to a different computing cluster @@ -104,11 +104,11 @@ def config(self): return self._adapter.config @property - def ssh_delete_file_on_remote(self): + def ssh_delete_file_on_remote(self) -> bool: return self._adapter.ssh_delete_file_on_remote @property - def remote_flag(self): + def remote_flag(self) -> bool: """ Returns: @@ -117,7 +117,7 @@ def remote_flag(self): return self._adapter.remote_flag @property - def queue_list(self): + def queue_list(self) -> list[str]: """ Returns: @@ -126,7 +126,7 @@ def queue_list(self): return self._adapter.queue_list @property - def queue_view(self): + def queue_view(self) -> pandas.DataFrame: """ Returns: @@ -135,21 +135,21 @@ def queue_view(self): return self._adapter.queue_view @property - def queues(self): + def queues(self) -> list[str]: return self._adapter.queues def submit_job( self, - queue=None, - job_name=None, - working_directory=None, - cores=None, - memory_max=None, - run_time_max=None, - dependency_list=None, - command=None, + queue: str = None, + job_name: str = None, + working_directory: str = None, + cores: int = None, + memory_max: int = None, + run_time_max: int = None, + dependency_list: list[str] = None, + command: str = None, **kwargs, - ): + ) -> int: """ Submits command to the given queue. @@ -181,7 +181,7 @@ def submit_job( **kwargs, ) - def enable_reservation(self, process_id): + def enable_reservation(self, process_id: int) -> str: """ Args: @@ -192,7 +192,7 @@ def enable_reservation(self, process_id): """ return self._adapter.enable_reservation(process_id=process_id) - def get_job_from_remote(self, working_directory): + def get_job_from_remote(self, working_directory: str): """ Get the results of the calculation - this is necessary when the calculation was executed on a remote host. @@ -202,7 +202,7 @@ def get_job_from_remote(self, working_directory): self._adapter.get_job_from_remote(working_directory=working_directory) def transfer_file_to_remote( - self, file, transfer_back=False, delete_file_on_remote=False + self, file: str, transfer_back: bool = False, delete_file_on_remote: bool = False ): """ Transfer file from remote host to local host @@ -218,7 +218,7 @@ def transfer_file_to_remote( delete_file_on_remote=delete_file_on_remote, ) - def convert_path_to_remote(self, path): + def convert_path_to_remote(self, path: str) -> str: """ Args: @@ -229,7 +229,7 @@ def convert_path_to_remote(self, path): """ return self._adapter.convert_path_to_remote(path=path) - def delete_job(self, process_id): + def delete_job(self, process_id: int) -> str: """ Args: @@ -240,7 +240,7 @@ def delete_job(self, process_id): """ return self._adapter.delete_job(process_id=process_id) - def get_queue_status(self, user=None): + def get_queue_status(self, user: str=None) -> pandas.DataFrame: """ Args: @@ -251,7 +251,7 @@ def get_queue_status(self, user=None): """ return self._adapter.get_queue_status(user=user) - def get_status_of_my_jobs(self): + def get_status_of_my_jobs(self) -> pandas.DataFrame: """ Returns: @@ -259,7 +259,7 @@ def get_status_of_my_jobs(self): """ return self._adapter.get_status_of_my_jobs() - def get_status_of_job(self, process_id): + def get_status_of_job(self, process_id: int) -> str: """ Args: @@ -270,7 +270,7 @@ def get_status_of_job(self, process_id): """ return self._adapter.get_status_of_job(process_id=process_id) - def get_status_of_jobs(self, process_id_lst): + def get_status_of_jobs(self, process_id_lst: list[int]) -> list[str]: """ Args: @@ -282,7 +282,7 @@ def get_status_of_jobs(self, process_id_lst): return self._adapter.get_status_of_jobs(process_id_lst=process_id_lst) def check_queue_parameters( - self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None + self, queue: str, cores: int = 1, run_time_max: int = None, memory_max: int = None, active_queue: dict = None ): """ @@ -305,7 +305,7 @@ def check_queue_parameters( ) -def set_queue_adapter(config, directory, execute_command=execute_command): +def set_queue_adapter(config: dict, directory: str, execute_command: callable=execute_command): """ Initialize the queue adapter diff --git a/pysqa/utils/basic.py b/pysqa/utils/basic.py index 29d8ab5b..5db323d0 100644 --- a/pysqa/utils/basic.py +++ b/pysqa/utils/basic.py @@ -44,7 +44,7 @@ class BasisQueueAdapter(object): Queues available for auto completion QueueAdapter().queues. returns the queue name. """ - def __init__(self, config, directory="~/.queues", execute_command=execute_command): + def __init__(self, config: dict, directory: str = "~/.queues", execute_command: callable = execute_command): self._config = config self._fill_queue_dict(queue_lst_dict=self._config["queues"]) self._load_templates(queue_lst_dict=self._config["queues"], directory=directory) @@ -89,11 +89,11 @@ def __init__(self, config, directory="~/.queues", execute_command=execute_comman self._execute_command_function = execute_command @property - def ssh_delete_file_on_remote(self): + def ssh_delete_file_on_remote(self) -> bool: return self._ssh_delete_file_on_remote @property - def remote_flag(self): + def remote_flag(self) -> bool: return self._remote_flag @property @@ -106,7 +106,7 @@ def config(self): return self._config @property - def queue_list(self): + def queue_list(self) -> list: """ Returns: @@ -115,7 +115,7 @@ def queue_list(self): return list(self._config["queues"].keys()) @property - def queue_view(self): + def queue_view(self) -> pandas.DataFrame: """ Returns: @@ -131,16 +131,16 @@ def queues(self): def submit_job( self, - queue=None, - job_name=None, - working_directory=None, - cores=None, - memory_max=None, - run_time_max=None, - dependency_list=None, - command=None, + queue: str = None, + job_name: str = None, + working_directory: str = None, + cores: int = None, + memory_max: int = None, + run_time_max: int = None, + dependency_list: list[str] = None, + command: str = None, **kwargs, - ): + ) -> int: """ Args: @@ -182,14 +182,14 @@ def submit_job( else: return None - def _list_command_to_be_executed(self, dependency_list, queue_script_path) -> list: + def _list_command_to_be_executed(self, dependency_list: list[str], queue_script_path: str) -> list: return ( self._commands.submit_job_command + self._commands.dependencies(dependency_list) + [queue_script_path] ) - def enable_reservation(self, process_id): + def enable_reservation(self, process_id: int): """ Args: @@ -207,7 +207,7 @@ def enable_reservation(self, process_id): else: return None - def delete_job(self, process_id): + def delete_job(self, process_id: int) -> str: """ Args: @@ -225,7 +225,7 @@ def delete_job(self, process_id): else: return None - def get_queue_status(self, user=None): + def get_queue_status(self, user: str = None) -> pandas.DataFrame: """ Args: @@ -243,7 +243,7 @@ def get_queue_status(self, user=None): else: return df[df["user"] == user] - def get_status_of_my_jobs(self): + def get_status_of_my_jobs(self) -> pandas.DataFrame: """ Returns: @@ -251,7 +251,7 @@ def get_status_of_my_jobs(self): """ return self.get_queue_status(user=self._get_user()) - def get_status_of_job(self, process_id): + def get_status_of_job(self, process_id: int) -> str: """ Args: @@ -267,14 +267,14 @@ def get_status_of_job(self, process_id): else: return None - def get_status_of_jobs(self, process_id_lst): + def get_status_of_jobs(self, process_id_lst: list[int]) -> list[str]: """ Args: - process_id_lst: + process_id_lst list[int]: Returns: - list: ['running', 'pending', 'error', ...] + list[str]: ['running', 'pending', 'error', ...] """ df = self.get_queue_status() results_lst = [] @@ -286,21 +286,21 @@ def get_status_of_jobs(self, process_id_lst): results_lst.append("finished") return results_lst - def get_job_from_remote(self, working_directory): + def get_job_from_remote(self, working_directory: str): """ Get the results of the calculation - this is necessary when the calculation was executed on a remote host. """ raise NotImplementedError - def convert_path_to_remote(self, path): + def convert_path_to_remote(self, path: str): raise NotImplementedError - def transfer_file(self, file, transfer_back=False, delete_file_on_remote=False): + def transfer_file(self, file: str, transfer_back: bool = False, delete_file_on_remote: bool = False): raise NotImplementedError def check_queue_parameters( - self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None - ): + self, queue: str, cores: int = 1, run_time_max: int = None, memory_max: int = None, active_queue: dict = None + ) -> list: """ Args: @@ -330,13 +330,13 @@ def check_queue_parameters( def _write_queue_script( self, - queue=None, - job_name=None, - working_directory=None, - cores=None, - memory_max=None, - run_time_max=None, - command=None, + queue: str = None, + job_name: str = None, + working_directory: str = None, + cores: int = None, + memory_max: int = None, + run_time_max: int = None, + command: str = None, **kwargs, ): """ @@ -374,15 +374,15 @@ def _write_queue_script( def _job_submission_template( self, - queue=None, - job_name="job.py", - working_directory=".", - cores=None, - memory_max=None, - run_time_max=None, - command=None, + queue: str = None, + job_name: str = "job.py", + working_directory: str = ".", + cores: int = None, + memory_max: int = None, + run_time_max: int = None, + command: str = None, **kwargs, - ): + ) -> str: """ Args: @@ -428,12 +428,12 @@ def _job_submission_template( def _execute_command( self, - commands, - working_directory=None, - split_output=True, - shell=False, - error_filename="pysqa.err", - ): + commands: str, + working_directory: str = None, + split_output: bool = True, + shell: bool = False, + error_filename: str = "pysqa.err", + ) -> str: """ Args: @@ -455,7 +455,7 @@ def _execute_command( ) @staticmethod - def _get_user(): + def _get_user() -> str: """ Returns: @@ -464,7 +464,7 @@ def _get_user(): return getpass.getuser() @staticmethod - def _fill_queue_dict(queue_lst_dict): + def _fill_queue_dict(queue_lst_dict: dict): """ Args: @@ -476,7 +476,7 @@ def _fill_queue_dict(queue_lst_dict): queue_dict[key] = None @staticmethod - def _load_templates(queue_lst_dict, directory="."): + def _load_templates(queue_lst_dict: dict, directory: str = "."): """ Args: @@ -498,7 +498,7 @@ def _load_templates(queue_lst_dict, directory="."): ) @staticmethod - def _value_error_if_none(value): + def _value_error_if_none(value: str): """ Args: @@ -548,7 +548,7 @@ def _value_in_range(cls, value, value_min=None, value_max=None): return value @staticmethod - def _is_memory_string(value): + def _is_memory_string(value: str) -> bool: """ Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are also valid. @@ -564,7 +564,7 @@ def _is_memory_string(value): @classmethod def _memory_spec_string_to_value( - cls, value, default_magnitude="m", target_magnitude="b" + cls, value: str, default_magnitude: str = "m", target_magnitude: str = "b" ): """ Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired diff --git a/pysqa/utils/config.py b/pysqa/utils/config.py index a831ea72..8b80dc1d 100644 --- a/pysqa/utils/config.py +++ b/pysqa/utils/config.py @@ -1,7 +1,7 @@ import yaml -def read_config(file_name="queue.yaml"): +def read_config(file_name: str = "queue.yaml") -> dict: """ Args: diff --git a/pysqa/utils/execute.py b/pysqa/utils/execute.py index 9bd560c1..6c663db8 100644 --- a/pysqa/utils/execute.py +++ b/pysqa/utils/execute.py @@ -3,12 +3,12 @@ def execute_command( - commands, - working_directory=None, - split_output=True, - shell=False, - error_filename="pysqa.err", -): + commands: str, + working_directory: str = None, + split_output: bool = True, + shell: bool = False, + error_filename: str = "pysqa.err", +) -> str: """ A wrapper around the subprocess.check_output function. diff --git a/pysqa/wrapper/flux.py b/pysqa/wrapper/flux.py index 1c84171e..efa52099 100644 --- a/pysqa/wrapper/flux.py +++ b/pysqa/wrapper/flux.py @@ -7,25 +7,25 @@ class FluxCommands(SchedulerCommands): @property - def submit_job_command(self): + def submit_job_command(self) -> list[str]: return ["flux", "batch"] @property - def delete_job_command(self): + def delete_job_command(self) -> list[str]: return ["flux", "cancel"] @property - def get_queue_status_command(self): + def get_queue_status_command(self) -> list[str]: return ["flux", "jobs", "-a", "--no-header"] @staticmethod - def get_job_id_from_output(queue_submit_output): + def get_job_id_from_output(queue_submit_output: str) -> int: return int( JobID(queue_submit_output.splitlines()[-1].rstrip().lstrip().split()[-1]) ) @staticmethod - def convert_queue_status(queue_status_output): + def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: line_split_lst = [line.split() for line in queue_status_output.splitlines()] job_id_lst, user_lst, job_name_lst, status_lst = [], [], [], [] for entry in line_split_lst: diff --git a/pysqa/wrapper/generic.py b/pysqa/wrapper/generic.py index 327b6e9a..05715b27 100644 --- a/pysqa/wrapper/generic.py +++ b/pysqa/wrapper/generic.py @@ -19,12 +19,12 @@ class SchedulerCommands(ABC): @property @abstractmethod - def submit_job_command(self): + def submit_job_command(self) -> list[str]: pass @property @abstractmethod - def delete_job_command(self): + def delete_job_command(self) -> list[str]: pass @property @@ -33,20 +33,20 @@ def enable_reservation_command(self): @property @abstractmethod - def get_queue_status_command(self): + def get_queue_status_command(self) -> list[str]: pass @staticmethod - def dependencies(dependency_list) -> list: + def dependencies(dependency_list: list[str]) -> list: if dependency_list is not None: raise NotImplementedError() else: return [] @staticmethod - def get_job_id_from_output(queue_submit_output): + def get_job_id_from_output(queue_submit_output: str) -> int: raise NotImplementedError() @staticmethod - def convert_queue_status(queue_status_output): + def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: raise NotImplementedError() diff --git a/pysqa/wrapper/gent.py b/pysqa/wrapper/gent.py index fe5a643d..3d6e8fd7 100644 --- a/pysqa/wrapper/gent.py +++ b/pysqa/wrapper/gent.py @@ -20,15 +20,15 @@ class GentCommands(SlurmCommands): @staticmethod - def get_job_id_from_output(queue_submit_output): + def get_job_id_from_output(queue_submit_output: str) -> int: return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(";")[0]) @staticmethod - def get_queue_from_output(queue_submit_output): + def get_queue_from_output(queue_submit_output: str) -> str: return str(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(";")[1]) @staticmethod - def convert_queue_status(queue_status_output): + def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: qstat = queue_status_output.splitlines() queue = qstat[0].split(":")[1].strip() if len(qstat) <= 1: # first row contains cluster name, check if there are jobs @@ -52,7 +52,7 @@ def convert_queue_status(queue_status_output): ) @staticmethod - def dependencies(dependency_list) -> list: + def dependencies(dependency_list: list[str]) -> list: if dependency_list is not None: raise NotImplementedError() else: diff --git a/pysqa/wrapper/lsf.py b/pysqa/wrapper/lsf.py index 0a22a630..cf04fabc 100644 --- a/pysqa/wrapper/lsf.py +++ b/pysqa/wrapper/lsf.py @@ -20,23 +20,23 @@ class LsfCommands(SchedulerCommands): @property - def submit_job_command(self): + def submit_job_command(self) -> list[str]: return ["bsub"] @property - def delete_job_command(self): + def delete_job_command(self) -> list[str]: return ["bkill"] @property - def get_queue_status_command(self): + def get_queue_status_command(self) -> list[str]: return ["bjobs"] @staticmethod - def get_job_id_from_output(queue_submit_output): + def get_job_id_from_output(queue_submit_output: str) -> int: return int(queue_submit_output.split("<")[1].split(">")[0]) @staticmethod - def convert_queue_status(queue_status_output): + def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: job_id_lst, user_lst, status_lst, job_name_lst = [], [], [], [] line_split_lst = queue_status_output.split("\n") if len(line_split_lst) > 1: diff --git a/pysqa/wrapper/moab.py b/pysqa/wrapper/moab.py index 19a6d0e6..e9300265 100644 --- a/pysqa/wrapper/moab.py +++ b/pysqa/wrapper/moab.py @@ -18,13 +18,13 @@ class MoabCommands(SchedulerCommands): @property - def submit_job_command(self): + def submit_job_command(self) -> list[str]: return ["msub"] @property - def delete_job_command(self): + def delete_job_command(self) -> list[str]: return ["mjobctl", "-c"] @property - def get_queue_status_command(self): + def get_queue_status_command(self) -> list[str]: return ["mdiag", "-x"] diff --git a/pysqa/wrapper/sge.py b/pysqa/wrapper/sge.py index c5dbc66f..5025bb2a 100644 --- a/pysqa/wrapper/sge.py +++ b/pysqa/wrapper/sge.py @@ -21,23 +21,23 @@ class SunGridEngineCommands(SchedulerCommands): @property - def submit_job_command(self): + def submit_job_command(self) -> list[str]: return ["qsub", "-terse"] @property - def delete_job_command(self): + def delete_job_command(self) -> list[str]: return ["qdel"] @property - def enable_reservation_command(self): + def enable_reservation_command(self) -> list[str]: return ["qalter", "-R", "y"] @property - def get_queue_status_command(self): + def get_queue_status_command(self) -> list[str]: return ["qstat", "-xml"] @staticmethod - def convert_queue_status(queue_status_output): + def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: def leaf_to_dict(leaf): return [ {sub_child.tag: sub_child.text for sub_child in child} for child in leaf diff --git a/pysqa/wrapper/slurm.py b/pysqa/wrapper/slurm.py index 6be70969..4cfd5424 100644 --- a/pysqa/wrapper/slurm.py +++ b/pysqa/wrapper/slurm.py @@ -20,23 +20,23 @@ class SlurmCommands(SchedulerCommands): @property - def submit_job_command(self): + def submit_job_command(self) -> list[str]: return ["sbatch", "--parsable"] @property - def delete_job_command(self): + def delete_job_command(self) -> list[str]: return ["scancel"] @property - def get_queue_status_command(self): + def get_queue_status_command(self) -> list[str]: return ["squeue", "--format", "%A|%u|%t|%.15j|%Z", "--noheader"] @staticmethod - def get_job_id_from_output(queue_submit_output): + def get_job_id_from_output(queue_submit_output: str) -> int: return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split()[-1]) @staticmethod - def convert_queue_status(queue_status_output): + def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: line_split_lst = [line.split("|") for line in queue_status_output.splitlines()] if len(line_split_lst) != 0: job_id_lst, user_lst, status_lst, job_name_lst, working_directory_lst = zip( @@ -67,7 +67,7 @@ def convert_queue_status(queue_status_output): return df @staticmethod - def dependencies(dependency_list) -> list: + def dependencies(dependency_list: list[str]) -> list: if dependency_list is not None: return ["--dependency=afterok:" + ",".join(dependency_list)] else: diff --git a/pysqa/wrapper/torque.py b/pysqa/wrapper/torque.py index b1da1e96..060e6b27 100644 --- a/pysqa/wrapper/torque.py +++ b/pysqa/wrapper/torque.py @@ -22,19 +22,19 @@ class TorqueCommands(SchedulerCommands): @property - def submit_job_command(self): + def submit_job_command(self) -> list[str]: return ["qsub"] @property - def delete_job_command(self): + def delete_job_command(self) -> list[str]: return ["qdel"] @property - def get_queue_status_command(self): + def get_queue_status_command(self) -> list[str]: return ["qstat", "-f"] @staticmethod - def get_job_id_from_output(queue_submit_output): + def get_job_id_from_output(queue_submit_output: str) -> int: # strip last line from output, leading and trailing whitespaces, # and separates the queue id from the stuff after "." # Adjust if your system doesn't have output like below! @@ -45,7 +45,7 @@ def get_job_id_from_output(queue_submit_output): ) @staticmethod - def convert_queue_status(queue_status_output): + def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: # # Run the qstat -f command and capture its output # output = subprocess.check_output(["qstat", "-f"]) -> output is the # queue_status_output that goes into this function