diff --git a/pysqa/cmd.py b/pysqa/cmd.py index 197a308..8c07dc9 100644 --- a/pysqa/cmd.py +++ b/pysqa/cmd.py @@ -10,14 +10,16 @@ def command_line( arguments_lst: Optional[list] = None, execute_command: callable = execute_command -): +) -> None: """ Parse the command line arguments. Args: - argv: Command line arguments - execute_command: function to comunicate with shell process + arguments_lst (Optional[list]): Command line arguments + execute_command (callable): Function to communicate with shell process + Returns: + None """ directory = "~/.queues" queue = None diff --git a/pysqa/executor/backend.py b/pysqa/executor/backend.py index 06c2962..29bd25e 100644 --- a/pysqa/executor/backend.py +++ b/pysqa/executor/backend.py @@ -15,6 +15,17 @@ def execute_files_from_list( tasks_in_progress_dict: dict, cache_directory: str, executor ): + """ + Execute files from the given list. + + Args: + tasks_in_progress_dict (dict): Dictionary of tasks in progress. + cache_directory (str): Directory where the files are stored. + executor: Executor object for executing tasks. + + Returns: + None + """ file_lst = os.listdir(cache_directory) for file_name_in in file_lst: key = file_name_in.split(".in.pl")[0] @@ -42,6 +53,16 @@ def execute_files_from_list( def execute_tasks(cores: int, cache_directory: str): + """ + Execute tasks from the given cache directory using the specified number of cores. + + Args: + cores (int): Number of cores to use for execution. + cache_directory (str): Directory where the files are stored. + + Returns: + None + """ tasks_in_progress_dict = {} with Executor( max_cores=cores, @@ -61,9 +82,18 @@ def execute_tasks(cores: int, cache_directory: str): ) -def command_line(arguments_lst: Optional[list] = None): +def command_line(arguments_lst: Optional[list] = None) -> None: + """ + Execute tasks from the command line. + + Args: + arguments_lst (Optional[list]): List of command line arguments. + + Returns: + None + """ if arguments_lst is None: arguments_lst = sys.argv[1:] cores_arg = arguments_lst[arguments_lst.index("--cores") + 1] path_arg = arguments_lst[arguments_lst.index("--path") + 1] - execute_tasks(cores=cores_arg, cache_directory=path_arg) + execute_tasks(cores=int(cores_arg), cache_directory=path_arg) diff --git a/pysqa/executor/executor.py b/pysqa/executor/executor.py index 9070a8c..b2c8bc8 100644 --- a/pysqa/executor/executor.py +++ b/pysqa/executor/executor.py @@ -21,6 +21,14 @@ def __init__( queue_adapter=None, queue_adapter_kwargs: Optional[dict] = None, ): + """ + Initialize the Executor. + + Args: + cwd (Optional[str]): The current working directory. Defaults to None. + queue_adapter: The queue adapter. Defaults to None. + queue_adapter_kwargs (Optional[dict]): Additional keyword arguments for the queue adapter. Defaults to None. + """ self._task_queue = queue.Queue() self._memory_dict = {} self._cache_directory = os.path.abspath(os.path.expanduser(cwd)) @@ -50,7 +58,18 @@ def __init__( ) self._process.start() - def submit(self, fn: callable, *args, **kwargs): + def submit(self, fn: callable, *args, **kwargs) -> Future: + """ + Submit a function for execution. + + Args: + fn (callable): The function to be executed. + *args: Positional arguments to be passed to the function. + **kwargs: Keyword arguments to be passed to the function. + + Returns: + Future: A Future object representing the execution of the function. + """ funct_dict = serialize_funct(fn, *args, **kwargs) key = list(funct_dict.keys())[0] if key not in self._memory_dict.keys(): @@ -62,6 +81,13 @@ def submit(self, fn: callable, *args, **kwargs): return self._memory_dict[key] def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): + """ + Shutdown the Executor. + + Args: + wait (bool): Whether to wait for all tasks to complete before shutting down. Defaults to True. + cancel_futures (bool): Whether to cancel pending futures. Defaults to False. + """ if cancel_futures: cancel_items_in_queue(que=self._task_queue) self._task_queue.put({"shutdown": True, "wait": wait}) diff --git a/pysqa/executor/helper.py b/pysqa/executor/helper.py index 9828089..df0fd98 100644 --- a/pysqa/executor/helper.py +++ b/pysqa/executor/helper.py @@ -3,18 +3,40 @@ import queue import re from concurrent.futures import Future +from typing import List import cloudpickle def deserialize(funct_dict: dict) -> dict: + """ + Deserialize a dictionary of serialized functions. + + Args: + funct_dict (dict): A dictionary containing serialized functions. + + Returns: + dict: A dictionary with deserialized functions. + + """ try: return {k: cloudpickle.loads(v) for k, v in funct_dict.items()} except EOFError: return {} -def find_executed_tasks(future_queue: queue.Queue, cache_directory: str): +def find_executed_tasks(future_queue: queue.Queue, cache_directory: str) -> None: + """ + Find executed tasks from the future queue and update the task memory dictionary. + + Args: + future_queue (queue.Queue): The queue containing the futures of executed tasks. + cache_directory (str): The directory where the task cache is stored. + + Returns: + None + + """ task_memory_dict = {} while True: task_dict = {} @@ -33,6 +55,16 @@ def find_executed_tasks(future_queue: queue.Queue, cache_directory: str): def read_from_file(file_name: str) -> dict: + """ + Read the contents of a file and return it as a dictionary. + + Args: + file_name (str): The name of the file to read. + + Returns: + dict: A dictionary containing the contents of the file, with the file name as the key. + + """ name = file_name.split("/")[-1].split(".")[0] with open(file_name, "rb") as f: return {name: f.read()} @@ -40,7 +72,19 @@ def read_from_file(file_name: str) -> dict: def reload_previous_futures( future_queue: queue.Queue, future_dict: dict, cache_directory: str -): +) -> None: + """ + Reload previous futures from the cache directory and update the future dictionary. + + Args: + future_queue (queue.Queue): The queue containing the futures of executed tasks. + future_dict (dict): A dictionary containing the current futures. + cache_directory (str): The directory where the task cache is stored. + + Returns: + None + + """ file_lst = os.listdir(cache_directory) for f in file_lst: if f.endswith(".in.pl"): @@ -56,16 +100,50 @@ def reload_previous_futures( future_queue.put({key: future_dict[key]}) -def serialize_result(result_dict: dict): +def serialize_result(result_dict: dict) -> dict: + """ + Serialize the values in a dictionary using cloudpickle. + + Args: + result_dict (dict): A dictionary containing the values to be serialized. + + Returns: + dict: A dictionary with serialized values. + + """ return {k: cloudpickle.dumps(v) for k, v in result_dict.items()} -def serialize_funct(fn: callable, *args, **kwargs): +def serialize_funct(fn: callable, *args, **kwargs) -> dict: + """ + Serialize a function along with its arguments and keyword arguments. + + Args: + fn (callable): The function to be serialized. + *args: The arguments to be passed to the function. + **kwargs: The keyword arguments to be passed to the function. + + Returns: + dict: A dictionary containing the serialized function. + + """ binary = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs}) return {fn.__name__ + _get_hash(binary=binary): binary} -def write_to_file(funct_dict: dict, state, cache_directory: str): +def write_to_file(funct_dict: dict, state: str, cache_directory: str) -> List[str]: + """ + Write the contents of a dictionary to files in the cache directory. + + Args: + funct_dict (dict): A dictionary containing the contents to be written. + state (str): The state of the files to be written. + cache_directory (str): The directory where the files will be written. + + Returns: + List[str]: A list of file names that were written. + + """ file_name_lst = [] for k, v in funct_dict.items(): file_name = _get_file_name(name=k, state=state) @@ -75,23 +153,69 @@ def write_to_file(funct_dict: dict, state, cache_directory: str): return file_name_lst -def _get_file_name(name, state): +def _get_file_name(name: str, state: str) -> str: + """ + Generate a file name based on the given name and state. + + Args: + name (str): The name to be included in the file name. + state (str): The state of the file. + + Returns: + str: The generated file name. + + """ return name + "." + state + ".pl" -def _get_hash(binary): +def _get_hash(binary: bytes) -> str: + """ + Get the hash of a binary using MD5 algorithm. + + Args: + binary (bytes): The binary data to be hashed. + + Returns: + str: The hexadecimal representation of the hash. + + """ # Remove specification of jupyter kernel from hash to be deterministic binary_no_ipykernel = re.sub(b"(?<=/ipykernel_)(.*)(?=/)", b"", binary) return str(hashlib.md5(binary_no_ipykernel).hexdigest()) -def _set_future(file_name, future): +def _set_future(file_name: str, future: Future) -> None: + """ + Set the result of a future based on the contents of a file. + + Args: + file_name (str): The name of the file containing the result. + future (Future): The future to set the result for. + + Returns: + None + + """ values = deserialize(funct_dict=read_from_file(file_name=file_name)).values() if len(values) == 1: future.set_result(list(values)[0]) -def _update_task_dict(task_dict, task_memory_dict, cache_directory): +def _update_task_dict( + task_dict: dict, task_memory_dict: dict, cache_directory: str +) -> None: + """ + Update the task memory dictionary with the futures from the task dictionary. + + Args: + task_dict (dict): A dictionary containing the futures of tasks. + task_memory_dict (dict): The dictionary to store the task memory. + cache_directory (str): The directory where the task cache is stored. + + Returns: + None + + """ file_lst = os.listdir(cache_directory) for key, future in task_dict.items(): task_memory_dict[key] = future diff --git a/pysqa/ext/modular.py b/pysqa/ext/modular.py index b2cf01a..5318d7b 100644 --- a/pysqa/ext/modular.py +++ b/pysqa/ext/modular.py @@ -9,6 +9,22 @@ class ModularQueueAdapter(BasisQueueAdapter): + """ + A class representing a modular queue adapter. + + Args: + config (dict): The configuration dictionary. + directory (str, optional): The directory path. Defaults to "~/.queues". + execute_command (callable, optional): The execute command function. Defaults to execute_command. + + Attributes: + _queue_to_cluster_dict (dict): A dictionary mapping queues to clusters. + + Raises: + ValueError: If a cluster is not found in the list of clusters. + + """ + def __init__( self, config: dict, @@ -43,19 +59,21 @@ def submit_job( **kwargs, ) -> int: """ + Submit a job to the queue. Args: - queue (str/None): - job_name (str/None): - working_directory (str/None): - cores (int/None): - memory_max (int/None): - run_time_max (int/None): - dependency_list (list/None): - command (str/None): + queue (str, optional): The queue name. Defaults to None. + job_name (str, optional): The job name. Defaults to None. + working_directory (str, optional): The working directory. Defaults to None. + cores (int, optional): The number of cores. Defaults to None. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[str], optional): The list of dependencies. Defaults to None. + command (str, optional): The command to execute. Defaults to None. Returns: - int: + int: The cluster queue ID. + """ working_directory, queue_script_path = self._write_queue_script( queue=queue, @@ -88,12 +106,14 @@ def submit_job( def enable_reservation(self, process_id: int): """ + Enable reservation for a process. Args: - process_id (int): + process_id (int): The process ID. Returns: - str: + str: The output of the enable reservation command. + """ cluster_module, cluster_queue_id = self._resolve_queue_id( process_id=process_id, cluster_dict=self._config["cluster"] @@ -112,12 +132,14 @@ def enable_reservation(self, process_id: int): def delete_job(self, process_id: int): """ + Delete a job. Args: - process_id (int): + process_id (int): The process ID. Returns: - str: + str: The output of the delete job command. + """ cluster_module, cluster_queue_id = self._resolve_queue_id( process_id=process_id, cluster_dict=self._config["cluster"] @@ -136,12 +158,14 @@ def delete_job(self, process_id: int): def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame: """ + Get the queue status. Args: - user (str): + user (str, optional): The user name. Defaults to None. Returns: - pandas.DataFrame: + pandas.DataFrame: The queue status. + """ df_lst = [] for cluster_module in self._config["cluster"]: @@ -163,10 +187,31 @@ def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame: @staticmethod def _resolve_queue_id(process_id: int, cluster_dict: dict): + """ + Resolve the queue ID. + + Args: + process_id (int): The process ID. + cluster_dict (dict): The cluster dictionary. + + Returns: + tuple: The cluster module and cluster queue ID. + + """ cluster_queue_id = int(process_id / 10) cluster_module = cluster_dict[process_id - cluster_queue_id * 10] return cluster_module, cluster_queue_id @staticmethod def _switch_cluster_command(cluster_module: str): + """ + Generate the switch cluster command. + + Args: + cluster_module (str): The cluster module. + + Returns: + list: The switch cluster command. + + """ return ["module", "--quiet", "swap", "cluster/{};".format(cluster_module)] diff --git a/pysqa/ext/remote.py b/pysqa/ext/remote.py index 1e9dd15..90c778d 100644 --- a/pysqa/ext/remote.py +++ b/pysqa/ext/remote.py @@ -5,7 +5,7 @@ import json import os import warnings -from typing import Optional +from typing import List, Optional, Union import pandas import paramiko @@ -16,6 +16,72 @@ class RemoteQueueAdapter(BasisQueueAdapter): + """ + A class representing a remote queue adapter. + + Args: + config (dict): The configuration dictionary. + directory (str, optional): The directory path. Defaults to "~/.queues". + execute_command (callable, optional): The execute command function. Defaults to execute_command. + + Attributes: + _ssh_host (str): The SSH host. + _ssh_username (str): The SSH username. + _ssh_known_hosts (str): The path to the known hosts file. + _ssh_key (str): The path to the SSH key file. + _ssh_ask_for_password (bool): Flag indicating whether to ask for SSH password. + _ssh_password (str): The SSH password. + _ssh_key_passphrase (str): The SSH key passphrase. + _ssh_two_factor_authentication (bool): Flag indicating whether to use two-factor authentication. + _ssh_authenticator_service (str): The SSH authenticator service. + _ssh_proxy_host (str): The SSH proxy host. + _ssh_remote_config_dir (str): The remote configuration directory. + _ssh_remote_path (str): The remote path. + _ssh_local_path (str): The local path. + _ssh_delete_file_on_remote (bool): Flag indicating whether to delete files on the remote host. + _ssh_port (int): The SSH port. + _ssh_continous_connection (bool): Flag indicating whether to use continuous SSH connection. + _ssh_connection (None or paramiko.SSHClient): The SSH connection object. + _ssh_proxy_connection (None or paramiko.SSHClient): The SSH proxy connection object. + _remote_flag (bool): Flag indicating whether the adapter is for remote queue. + + Methods: + convert_path_to_remote(path: str) -> str: + Converts a local path to a remote path. + + submit_job(queue: Optional[str] = None, job_name: Optional[str] = None, working_directory: Optional[str] = None, + cores: Optional[int] = None, memory_max: Optional[int] = None, run_time_max: Optional[int] = None, + dependency_list: Optional[list[str]] = None, command: Optional[str] = None, **kwargs) -> int: + Submits a job to the remote queue. + + enable_reservation(process_id: int) -> str: + Enables a reservation for a job. + + delete_job(process_id: int) -> str: + Deletes a job from the remote queue. + + get_queue_status(user: Optional[str] = None) -> pandas.DataFrame: + Retrieves the queue status. + + get_job_from_remote(working_directory: str): + Retrieves the results of a calculation executed on a remote host. + + transfer_file(file: str, transfer_back: bool = False, delete_file_on_remote: bool = False): + Transfers a file to/from the remote host. + + __del__(): + Closes the SSH connections. + + _check_ssh_connection(): + Checks if an SSH connection is open. + + _transfer_files(file_dict: dict, sftp=None, transfer_back: bool = False): + Transfers files to/from the remote host. + + _open_ssh_connection() -> paramiko.SSHClient: + Opens an SSH connection. + """ + def __init__( self, config: dict, @@ -84,7 +150,16 @@ def __init__( self._ssh_proxy_connection = None self._remote_flag = True - def convert_path_to_remote(self, path: str): + def convert_path_to_remote(self, path: str) -> str: + """ + Converts a local path to a remote path. + + Args: + path (str): The local path. + + Returns: + str: The remote path. + """ working_directory = os.path.abspath(os.path.expanduser(path)) return self._get_remote_working_dir(working_directory=working_directory) @@ -101,19 +176,20 @@ def submit_job( **kwargs, ) -> int: """ + Submits a job to the remote queue. Args: - queue (str/None): - job_name (str/None): - working_directory (str/None): - cores (int/None): - memory_max (int/None): - run_time_max (int/None): - dependency_list (list/None): - command (str/None): + queue (str, optional): The queue name. + job_name (str, optional): The job name. + working_directory (str, optional): The working directory. + cores (int, optional): The number of cores. + memory_max (int, optional): The maximum memory. + run_time_max (int, optional): The maximum run time. + dependency_list (list[str], optional): The list of job dependencies. + command (str, optional): The command to execute. Returns: - int: + int: The process ID of the submitted job. """ if dependency_list is not None: raise NotImplementedError( @@ -125,12 +201,13 @@ def submit_job( def enable_reservation(self, process_id: int) -> str: """ + Enables a reservation for a job. Args: - process_id (int): + process_id (int): The process ID. Returns: - str: + str: The output of the reservation command. """ return self._execute_remote_command( command=self._reservation_command(job_id=process_id) @@ -138,12 +215,13 @@ def enable_reservation(self, process_id: int) -> str: def delete_job(self, process_id: int) -> str: """ + Deletes a job from the remote queue. Args: - process_id (int): + process_id (int): The process ID. Returns: - str: + str: The output of the delete command. """ return self._execute_remote_command( command=self._delete_command(job_id=process_id) @@ -151,12 +229,13 @@ def delete_job(self, process_id: int) -> str: def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame: """ + Retrieves the queue status. Args: - user (str): + user (str, optional): The username. Returns: - pandas.DataFrame: + pandas.DataFrame: The queue status. """ df = pandas.DataFrame( json.loads( @@ -170,7 +249,10 @@ def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame: def get_job_from_remote(self, working_directory: str): """ - Get the results of the calculation - this is necessary when the calculation was executed on a remote host. + Retrieves the results of a calculation executed on a remote host. + + Args: + working_directory (str): The local working directory. """ working_directory = os.path.abspath(os.path.expanduser(working_directory)) remote_working_directory = self._get_remote_working_dir( @@ -203,6 +285,14 @@ def transfer_file( transfer_back: bool = False, delete_file_on_remote: bool = False, ): + """ + Transfers a file to/from the remote host. + + Args: + file (str): The file path. + transfer_back (bool, optional): Flag indicating whether to transfer the file back to the local host. + delete_file_on_remote (bool, optional): Flag indicating whether to delete the file on the remote host. + """ working_directory = os.path.abspath(os.path.expanduser(file)) remote_working_directory = self._get_remote_working_dir( working_directory=working_directory @@ -217,16 +307,30 @@ def transfer_file( self._execute_remote_command(command="rm " + remote_working_directory) def __del__(self): + """ + Closes the SSH connections. + """ if self._ssh_connection is not None: self._ssh_connection.close() if self._ssh_proxy_connection is not None: self._ssh_proxy_connection.close() def _check_ssh_connection(self): + """ + Checks if an SSH connection is open. + """ if self._ssh_connection is None: self._ssh_connection = self._open_ssh_connection() def _transfer_files(self, file_dict: dict, sftp=None, transfer_back: bool = False): + """ + Transfers files to/from the remote host. + + Args: + file_dict (dict): The dictionary containing the file paths. + sftp (None or paramiko.SFTPClient, optional): The SFTP client object. + transfer_back (bool, optional): Flag indicating whether to transfer the files back to the local host. + """ if sftp is None: if self._ssh_continous_connection: self._check_ssh_connection() @@ -251,7 +355,13 @@ def _transfer_files(self, file_dict: dict, sftp=None, transfer_back: bool = Fals if sftp is None: sftp_client.close() - def _open_ssh_connection(self): + def _open_ssh_connection(self) -> paramiko.SSHClient: + """ + Opens an SSH connection. + + Returns: + paramiko.SSHClient: The SSH connection object. + """ ssh = paramiko.SSHClient() ssh.load_host_keys(self._ssh_known_hosts) if ( @@ -364,10 +474,22 @@ def authentication(title, instructions, prompt_list): else: return ssh - def _remote_command(self): + def _remote_command(self) -> str: + """ + Returns the command to execute on the remote host. + + Returns: + str: The remote command. + """ return "python -m pysqa --config_directory " + self._ssh_remote_config_dir + " " - def _get_queue_status_command(self): + def _get_queue_status_command(self) -> str: + """ + Returns the command to get the queue status on the remote host. + + Returns: + str: The queue status command. + """ return self._remote_command() + "--status" def _submit_command( @@ -379,7 +501,22 @@ def _submit_command( memory_max: Optional[int] = None, run_time_max: Optional[int] = None, command_str: Optional[str] = None, - ): + ) -> str: + """ + Generates the command to submit a job to the remote host. + + Args: + queue (str, optional): The queue to submit the job to. + job_name (str, optional): The name of the job. + working_directory (str, optional): The working directory for the job. + cores (int, optional): The number of cores required for the job. + memory_max (int, optional): The maximum memory required for the job. + run_time_max (int, optional): The maximum run time for the job. + command_str (str, optional): The command to be executed by the job. + + Returns: + str: The submit command. + """ command = self._remote_command() + "--submit " if queue is not None: command += "--queue " + queue + " " @@ -388,22 +525,49 @@ def _submit_command( if working_directory is not None: command += "--working_directory " + working_directory + " " if cores is not None: - command += "--cores " + cores + " " + command += "--cores " + str(cores) + " " if memory_max is not None: - command += "--memory " + memory_max + " " + command += "--memory " + str(memory_max) + " " if run_time_max is not None: - command += "--run_time " + run_time_max + " " + command += "--run_time " + str(run_time_max) + " " if command_str is not None: command += '--command "' + command_str + '" ' return command def _delete_command(self, job_id: int) -> str: + """ + Generates the command to delete a job on the remote host. + + Args: + job_id (int): The ID of the job to delete. + + Returns: + str: The delete command. + """ return self._remote_command() + "--delete --id " + str(job_id) def _reservation_command(self, job_id: int) -> str: + """ + Generates the command to reserve a job on the remote host. + + Args: + job_id (int): The ID of the job to reserve. + + Returns: + str: The reservation command. + """ return self._remote_command() + "--reservation --id " + str(job_id) - def _execute_remote_command(self, command: str): + def _execute_remote_command(self, command: str) -> str: + """ + Executes a remote command on the SSH connection. + + Args: + command (str): The command to execute. + + Returns: + str: The output of the command. + """ if self._ssh_continous_connection: self._check_ssh_connection() ssh = self._ssh_connection @@ -416,13 +580,31 @@ def _execute_remote_command(self, command: str): ssh.close() return output - def _get_remote_working_dir(self, working_directory: str): + def _get_remote_working_dir(self, working_directory: str) -> str: + """ + Get the remote working directory path. + + Args: + working_directory (str): The local working directory path. + + Returns: + str: The remote working directory path. + """ return os.path.join( self._ssh_remote_path, os.path.relpath(working_directory, self._ssh_local_path), ) - def _create_remote_dir(self, directory: str): + def _create_remote_dir(self, directory: Union[str, List[str]]) -> None: + """ + Creates a remote directory on the SSH server. + + Args: + directory (Union[str, List[str]]): The directory or list of directories to create. + + Raises: + TypeError: If the directory argument is not a string or a list. + """ if isinstance(directory, str): self._execute_remote_command(command="mkdir -p " + directory) elif isinstance(directory, list): @@ -431,9 +613,18 @@ def _create_remote_dir(self, directory: str): command += d + " " self._execute_remote_command(command=command) else: - raise TypeError() + raise TypeError("Invalid directory argument type.") + + def _transfer_data_to_remote(self, working_directory: str) -> None: + """ + Transfers data from the local machine to the remote host. - def _transfer_data_to_remote(self, working_directory: str): + Args: + working_directory (str): The local working directory path. + + Returns: + None + """ working_directory = os.path.abspath(os.path.expanduser(working_directory)) remote_working_directory = self._get_remote_working_dir( working_directory=working_directory @@ -460,14 +651,26 @@ def _transfer_data_to_remote(self, working_directory: str): def _get_user(self) -> str: """ + Get the username used for SSH connection. Returns: - str: + str: The username. """ return self._ssh_username @staticmethod def _get_file_transfer(file: str, local_dir: str, remote_dir: str) -> str: + """ + Get the absolute path of the file on the remote host. + + Args: + file (str): The file path. + local_dir (str): The local working directory path. + remote_dir (str): The remote working directory path. + + Returns: + str: The absolute path of the file on the remote host. + """ return os.path.abspath( os.path.join(remote_dir, os.path.relpath(file, local_dir)) ) diff --git a/pysqa/queueadapter.py b/pysqa/queueadapter.py index e8e2c74..2282328 100644 --- a/pysqa/queueadapter.py +++ b/pysqa/queueadapter.py @@ -2,7 +2,7 @@ # Copyright (c) Jan Janssen import os -from typing import Optional +from typing import List, Optional import pandas @@ -51,6 +51,13 @@ class QueueAdapter(object): def __init__( self, directory: str = "~/.queues", execute_command: callable = execute_command ): + """ + Initialize the QueueAdapter. + + Args: + directory (str): Directory containing the queue.yaml files and corresponding templates. + execute_command (callable): Function to execute commands. + """ queue_yaml = os.path.join(directory, "queue.yaml") clusters_yaml = os.path.join(directory, "clusters.yaml") self._adapter = None @@ -81,12 +88,12 @@ def __init__( ) self._adapter = self._queue_dict[primary_queue] - def list_clusters(self) -> list[str]: + def list_clusters(self) -> List[str]: """ List available computing clusters for remote submission Returns: - list: List of computing clusters + List of computing clusters """ return list(self._queue_dict.keys()) @@ -100,47 +107,63 @@ def switch_cluster(self, cluster_name: str): self._adapter = self._queue_dict[cluster_name] @property - def config(self): + def config(self) -> dict: """ + Get the QueueAdapter configuration. Returns: - dict: + dict: The QueueAdapter configuration. """ return self._adapter.config @property def ssh_delete_file_on_remote(self) -> bool: + """ + Get the value of ssh_delete_file_on_remote property. + + Returns: + bool: The value of ssh_delete_file_on_remote property. + """ return self._adapter.ssh_delete_file_on_remote @property def remote_flag(self) -> bool: """ + Get the value of remote_flag property. Returns: - bool: + bool: The value of remote_flag property. """ return self._adapter.remote_flag @property - def queue_list(self) -> list[str]: + def queue_list(self) -> List[str]: """ + Get the list of available queues. Returns: - list: + List[str]: The list of available queues. """ return self._adapter.queue_list @property def queue_view(self) -> pandas.DataFrame: """ + Get the Pandas DataFrame representation of the available queues. Returns: - pandas.DataFrame: + pandas.DataFrame: The Pandas DataFrame representation of the available queues. """ return self._adapter.queue_view @property - def queues(self) -> list[str]: + def queues(self) -> List[str]: + """ + Get the list of available queues. + + Returns: + List[str]: The list of available queues. + """ return self._adapter.queues def submit_job( @@ -151,7 +174,7 @@ def submit_job( cores: Optional[int] = None, memory_max: Optional[int] = None, run_time_max: Optional[int] = None, - dependency_list: Optional[list[str]] = None, + dependency_list: Optional[List[str]] = None, command: Optional[str] = None, **kwargs, ) -> int: @@ -172,7 +195,7 @@ def submit_job( corresponding template. Returns: - int: Job id received from the queuing system for the job which was submitted \ + int: Job id received from the queuing system for the job which was submitted """ return self._adapter.submit_job( queue=queue, @@ -188,12 +211,13 @@ def submit_job( def enable_reservation(self, process_id: int) -> str: """ + Enable reservation for a process. Args: - process_id (int): + process_id (int): The process id. Returns: - str: + str: The result of enabling reservation. """ return self._adapter.enable_reservation(process_id=process_id) @@ -202,7 +226,7 @@ def get_job_from_remote(self, working_directory: str): Get the results of the calculation - this is necessary when the calculation was executed on a remote host. Args: - working_directory (str): + working_directory (str): The working directory. """ self._adapter.get_job_from_remote(working_directory=working_directory) @@ -213,12 +237,12 @@ def transfer_file_to_remote( delete_file_on_remote: bool = False, ): """ - Transfer file from remote host to local host + Transfer file from remote host to local host. Args: - file (str): - transfer_back (bool): - delete_file_on_remote (bool): + file (str): The file to transfer. + transfer_back (bool): Whether to transfer the file back. + delete_file_on_remote (bool): Whether to delete the file on the remote host. """ self._adapter.transfer_file( file=file, @@ -228,64 +252,70 @@ def transfer_file_to_remote( def convert_path_to_remote(self, path: str) -> str: """ + Convert a local path to a remote path. Args: - path (str): + path (str): The local path. Returns: - str: + str: The remote path. """ return self._adapter.convert_path_to_remote(path=path) def delete_job(self, process_id: int) -> str: """ + Delete a job. Args: - process_id (int): + process_id (int): The process id. Returns: - str: + str: The result of deleting the job. """ return self._adapter.delete_job(process_id=process_id) def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame: """ + Get the status of the queue. Args: - user (str): + user (str/None): The user. Returns: - pandas.DataFrame: + pandas.DataFrame: The status of the queue. """ return self._adapter.get_queue_status(user=user) def get_status_of_my_jobs(self) -> pandas.DataFrame: """ + Get the status of the user's jobs. Returns: - pandas.DataFrame: + pandas.DataFrame: The status of the user's jobs. """ return self._adapter.get_status_of_my_jobs() def get_status_of_job(self, process_id: int) -> str: """ + Get the status of a job. Args: - process_id: + process_id: The process id. Returns: - str: ['running', 'pending', 'error'] + str: The status of the job. Possible values are ['running', 'pending', 'error']. """ return self._adapter.get_status_of_job(process_id=process_id) - def get_status_of_jobs(self, process_id_lst: list[int]) -> list[str]: + def get_status_of_jobs(self, process_id_lst: List[int]) -> List[str]: """ + Get the status of multiple jobs. Args: - process_id_lst: + process_id_lst: The list of process ids. Returns: - list: ['running', 'pending', 'error', ...] + List[str]: The status of the jobs. Possible values are ['running', 'pending', 'error', ...]. """ return self._adapter.get_status_of_jobs(process_id_lst=process_id_lst) @@ -296,18 +326,19 @@ def check_queue_parameters( run_time_max: Optional[int] = None, memory_max: Optional[int] = None, active_queue: Optional[dict] = None, - ): + ) -> List: """ + Check the parameters of a queue. Args: - queue (str/None): - cores (int): - run_time_max (int/None): - memory_max (int/None): - active_queue (dict): + queue (str/None): The queue name. + cores (int): The number of cores. + run_time_max (int/None): The maximum runtime. + memory_max (int/None): The maximum memory. + active_queue (dict/None): The active queue. Returns: - list: [cores, run_time_max, memory_max] + List: A list containing the checked parameters [cores, run_time_max, memory_max]. """ return self._adapter.check_queue_parameters( queue=queue, diff --git a/pysqa/utils/basic.py b/pysqa/utils/basic.py index 6da25ee..3d3ed4c 100644 --- a/pysqa/utils/basic.py +++ b/pysqa/utils/basic.py @@ -5,7 +5,7 @@ import importlib import os import re -from typing import List, Optional +from typing import List, Optional, Tuple, Union import pandas from jinja2 import Template @@ -21,28 +21,15 @@ class BasisQueueAdapter(object): locally. Args: - config (dict): - directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the - individual queues. - execute_command(funct): + config (dict): Configuration for the QueueAdapter. + directory (str): Directory containing the queue.yaml files as well as corresponding jinja2 templates for the individual queues. + execute_command(funct): Function to execute commands. Attributes: - - .. attribute:: config - - QueueAdapter configuration read from the queue.yaml file. - - .. attribute:: queue_list - - List of available queues - - .. attribute:: queue_view - - Pandas DataFrame representation of the available queues, read from queue.yaml. - - .. attribute:: queues - - Queues available for auto completion QueueAdapter().queues. returns the queue name. + config (dict): QueueAdapter configuration read from the queue.yaml file. + queue_list (list): List of available queues. + queue_view (pandas.DataFrame): Pandas DataFrame representation of the available queues, read from queue.yaml. + queues: Queues available for auto completion QueueAdapter().queues. returns the queue name. """ def __init__( @@ -96,36 +83,51 @@ def __init__( @property def ssh_delete_file_on_remote(self) -> bool: + """ + Get the value of ssh_delete_file_on_remote. + + Returns: + bool: The value of ssh_delete_file_on_remote. + """ return self._ssh_delete_file_on_remote @property def remote_flag(self) -> bool: + """ + Get the value of remote_flag. + + Returns: + bool: The value of remote_flag. + """ return self._remote_flag @property - def config(self): + def config(self) -> dict: """ + Get the QueueAdapter configuration. Returns: - dict: + dict: The QueueAdapter configuration. """ return self._config @property def queue_list(self) -> list: """ + Get the list of available queues. Returns: - list: + list: The list of available queues. """ return list(self._config["queues"].keys()) @property def queue_view(self) -> pandas.DataFrame: """ + Get the Pandas DataFrame representation of the available queues. Returns: - pandas.DataFrame: + pandas.DataFrame: The Pandas DataFrame representation of the available queues. """ return pandas.DataFrame(self._config["queues"]).T.drop( ["script", "template"], axis=1 @@ -133,6 +135,12 @@ def queue_view(self) -> pandas.DataFrame: @property def queues(self): + """ + Get the available queues. + + Returns: + Queues: The available queues. + """ return self._queues def submit_job( @@ -143,24 +151,25 @@ def submit_job( cores: Optional[int] = None, memory_max: Optional[int] = None, run_time_max: Optional[int] = None, - dependency_list: Optional[list[str]] = None, + dependency_list: Optional[List[str]] = None, command: Optional[str] = None, **kwargs, ) -> int: """ + Submit a job to the queue. Args: - queue (str/None): - job_name (str/None): - working_directory (str/None): - cores (int/None): - memory_max (int/None): - run_time_max (int/None): - dependency_list (list[str]/None: - command (str/None): + queue (str/None): The queue to submit the job to. + job_name (str/None): The name of the job. + working_directory (str/None): The working directory for the job. + cores (int/None): The number of cores required for the job. + memory_max (int/None): The maximum memory required for the job. + run_time_max (int/None): The maximum run time for the job. + dependency_list (list[str]/None): List of job dependencies. + command (str/None): The command to execute for the job. Returns: - int: + int: The job ID. """ if working_directory is not None and " " in working_directory: raise ValueError( @@ -189,16 +198,26 @@ def submit_job( return None def _list_command_to_be_executed(self, queue_script_path: str) -> list: + """ + Get the list of commands to be executed. + + Args: + queue_script_path (str): The path to the queue script. + + Returns: + list: The list of commands to be executed. + """ return self._commands.submit_job_command + [queue_script_path] def enable_reservation(self, process_id: int): """ + Enable reservation for a process. Args: - process_id (int): + process_id (int): The process ID. Returns: - str: + str: The result of the enable reservation command. """ out = self._execute_command( commands=self._commands.enable_reservation_command + [str(process_id)], @@ -211,12 +230,13 @@ def enable_reservation(self, process_id: int): def delete_job(self, process_id: int) -> str: """ + Delete a job. Args: - process_id (int): + process_id (int): The process ID. Returns: - str: + str: The result of the delete job command. """ out = self._execute_command( commands=self._commands.delete_job_command + [str(process_id)], @@ -229,12 +249,13 @@ def delete_job(self, process_id: int) -> str: def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame: """ + Get the status of the queue. Args: - user (str): + user (str): The user to filter the queue status for. Returns: - pandas.DataFrame: + pandas.DataFrame: The queue status. """ out = self._execute_command( commands=self._commands.get_queue_status_command, split_output=False @@ -247,20 +268,22 @@ def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame: def get_status_of_my_jobs(self) -> pandas.DataFrame: """ + Get the status of the user's jobs. Returns: - pandas.DataFrame: + pandas.DataFrame: The status of the user's jobs. """ return self.get_queue_status(user=self._get_user()) def get_status_of_job(self, process_id: int) -> str: """ + Get the status of a job. Args: - process_id: + process_id (int): The process ID. Returns: - str: ['running', 'pending', 'error'] + str: The status of the job.results_lst.append(df_selected.values[0]) """ df = self.get_queue_status() df_selected = df[df["jobid"] == process_id]["status"] @@ -269,14 +292,15 @@ def get_status_of_job(self, process_id: int) -> str: else: return None - def get_status_of_jobs(self, process_id_lst: list[int]) -> list[str]: + def get_status_of_jobs(self, process_id_lst: List[int]) -> List[str]: """ + Get the status of multiple jobs. Args: - process_id_lst list[int]: + process_id_lst (list[int]): List of process IDs. Returns: - list[str]: ['running', 'pending', 'error', ...] + list[str]: List of job statuses. """ df = self.get_queue_status() results_lst = [] @@ -288,13 +312,25 @@ def get_status_of_jobs(self, process_id_lst: list[int]) -> list[str]: results_lst.append("finished") return results_lst - def get_job_from_remote(self, working_directory: str): + def get_job_from_remote(self, working_directory: str) -> None: """ Get the results of the calculation - this is necessary when the calculation was executed on a remote host. + + Args: + working_directory (str): The working directory where the calculation was executed. """ raise NotImplementedError def convert_path_to_remote(self, path: str): + """ + Converts a local file path to a remote file path. + + Args: + path (str): The local file path to be converted. + + Returns: + str: The converted remote file path. + """ raise NotImplementedError def transfer_file( @@ -303,6 +339,14 @@ def transfer_file( transfer_back: bool = False, delete_file_on_remote: bool = False, ): + """ + Transfer a file to a remote location. + + Args: + file (str): The path of the file to be transferred. + transfer_back (bool, optional): Whether to transfer the file back after processing. Defaults to False. + delete_file_on_remote (bool, optional): Whether to delete the file on the remote location after transfer. Defaults to False. + """ raise NotImplementedError def check_queue_parameters( @@ -314,16 +358,17 @@ def check_queue_parameters( active_queue: Optional[dict] = None, ) -> list: """ + Check the parameters of a queue. Args: - queue (str/None): - cores (int): - run_time_max (int/None): - memory_max (int/None): - active_queue (dict): + queue (str): The queue to check. + cores (int, optional): The number of cores. Defaults to 1. + run_time_max (int, optional): The maximum run time. Defaults to None. + memory_max (int, optional): The maximum memory. Defaults to None. + active_queue (dict, optional): The active queue. Defaults to None. Returns: - list: [cores, run_time_max, memory_max] + list: A list of queue parameters [cores, run_time_max, memory_max]. """ if active_queue is None: active_queue = self._config["queues"][queue] @@ -351,18 +396,22 @@ def _write_queue_script( dependency_list: Optional[List[int]] = None, command: Optional[str] = None, **kwargs, - ): + ) -> Tuple[str, str]: """ + Write the queue script to a file. Args: - queue (str/None): - job_name (str/None): - working_directory (str/None): - cores (int/None): - memory_max (int/None): - run_time_max (int/None): - command (str/None): + queue (str/None): The queue name. + job_name (str/None): The job name. + working_directory (str/None): The working directory. + cores (int/None): The number of cores. + memory_max (int/None): The maximum memory. + run_time_max (int/None): The maximum run time. + dependency_list (list/None): The list of dependency job IDs. + command (str/None): The command to be executed. + Returns: + Tuple[str, str]: A tuple containing the working directory and the path to the queue script file. """ if isinstance(command, list): command = "".join(command) @@ -399,19 +448,20 @@ def _job_submission_template( **kwargs, ) -> str: """ + Generate the job submission template. Args: - queue (str/None): - job_name (str): - working_directory (str): - cores (int/None): - memory_max (int/None): - run_time_max (int/None): - dependency_list (list/None): - command (str/None): + queue (str, optional): The queue name. Defaults to None. + job_name (str, optional): The job name. Defaults to "job.py". + working_directory (str, optional): The working directory. Defaults to ".". + cores (int, optional): The number of cores. Defaults to None. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. + command (str, optional): The command to be executed. Defaults to None. Returns: - str: + str: The job submission template. """ if queue is None: queue = self._config["queue_primary"] @@ -445,23 +495,24 @@ def _job_submission_template( def _execute_command( self, - commands: str, + commands: Union[str, List[str]], working_directory: Optional[str] = None, split_output: bool = True, shell: bool = False, error_filename: str = "pysqa.err", ) -> str: """ + Execute a command or a list of commands. Args: - commands (list/str): - working_directory (str): - split_output (bool): - shell (bool): - error_filename (str): + commands (Union[str, List[str]]): The command(s) to be executed. + working_directory (Optional[str], optional): The working directory. Defaults to None. + split_output (bool, optional): Whether to split the output into lines. Defaults to True. + shell (bool, optional): Whether to use the shell to execute the command. Defaults to False. + error_filename (str, optional): The name of the error file. Defaults to "pysqa.err". Returns: - str: + str: The output of the command(s). """ return self._execute_command_function( commands=commands, @@ -474,18 +525,20 @@ def _execute_command( @staticmethod def _get_user() -> str: """ + Get the current user. Returns: - str: + str: The current user. """ return getpass.getuser() @staticmethod def _fill_queue_dict(queue_lst_dict: dict): """ + Fill missing keys in the queue dictionary with None values. Args: - queue_lst_dict (dict): + queue_lst_dict (dict): The queue dictionary. """ queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"] for queue_dict in queue_lst_dict.values(): @@ -493,12 +546,13 @@ def _fill_queue_dict(queue_lst_dict: dict): queue_dict[key] = None @staticmethod - def _load_templates(queue_lst_dict: dict, directory: str = "."): + def _load_templates(queue_lst_dict: dict, directory: str = ".") -> None: """ + Load the queue templates from files and store them in the queue dictionary. Args: - queue_lst_dict (dict): - directory (str): + queue_lst_dict (dict): The queue dictionary. + directory (str, optional): The directory where the queue template files are located. Defaults to ".". """ for queue_dict in queue_lst_dict.values(): if "script" in queue_dict.keys(): @@ -515,28 +569,39 @@ def _load_templates(queue_lst_dict: dict, directory: str = "."): ) @staticmethod - def _value_error_if_none(value: str): + def _value_error_if_none(value: str) -> None: """ + Raise a ValueError if the value is None or not a string. Args: - value (str/None): + value (str/None): The value to check. + + Raises: + ValueError: If the value is None. + TypeError: If the value is not a string. """ if value is None: - raise ValueError() + raise ValueError("Value cannot be None.") if not isinstance(value, str): raise TypeError() @classmethod - def _value_in_range(cls, value, value_min=None, value_max=None): + def _value_in_range( + cls, + value: Union[int, float, None], + value_min: Union[int, float, None] = None, + value_max: Union[int, float, None] = None, + ) -> Union[int, float, None]: """ + Check if a value is within a specified range. Args: - value (int/float/None): - value_min (int/float/None): - value_max (int/float/None): + value (int/float/None): The value to check. + value_min (int/float/None): The minimum value. Defaults to None. + value_max (int/float/None): The maximum value. Defaults to None. Returns: - int/float/None: + int/float/None: The value if it is within the range, otherwise the minimum or maximum value. """ if value is not None: @@ -567,14 +632,13 @@ def _value_in_range(cls, value, value_min=None, value_max=None): @staticmethod def _is_memory_string(value: str) -> bool: """ - Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are - also valid. + Check if a string specifies a certain amount of memory. Args: - value (str): the string to test + value (str): The string to check. Returns: - (bool): A boolean value if the string matches a memory specification + bool: True if the string matches a memory specification, False otherwise. """ memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?" return re.findall(memory_spec_pattern, value)[0] == value @@ -582,19 +646,19 @@ def _is_memory_string(value: str) -> bool: @classmethod def _memory_spec_string_to_value( cls, value: str, default_magnitude: str = "m", target_magnitude: str = "b" - ): + ) -> Union[int, float]: """ Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude` Args: - value (str): the string - default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T] - target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T] + value (str): The string to convert. + default_magnitude (str): The magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T]. Defaults to "m". + target_magnitude (str): The magnitude to which the output value should be converted [b, B, k, K, m, M, g, G, t, T]. Defaults to "b". Returns: - (float/int): the value of the string in `target_magnitude` units + Union[int, float]: The value of the string in `target_magnitude` units. """ magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4} if cls._is_memory_string(value): diff --git a/pysqa/utils/config.py b/pysqa/utils/config.py index 8b80dc1..1883ea5 100644 --- a/pysqa/utils/config.py +++ b/pysqa/utils/config.py @@ -3,12 +3,13 @@ def read_config(file_name: str = "queue.yaml") -> dict: """ + Read and parse a YAML configuration file. Args: - file_name (str): + file_name (str): The name of the YAML file to read. Returns: - dict: + dict: The parsed configuration as a dictionary. """ with open(file_name, "r") as f: return yaml.load(f, Loader=yaml.FullLoader) diff --git a/pysqa/utils/execute.py b/pysqa/utils/execute.py index f2b8703..9b4fdf0 100644 --- a/pysqa/utils/execute.py +++ b/pysqa/utils/execute.py @@ -1,6 +1,6 @@ import os import subprocess -from typing import Optional +from typing import List, Optional, Union def execute_command( @@ -9,19 +9,19 @@ def execute_command( split_output: bool = True, shell: bool = False, error_filename: str = "pysqa.err", -) -> str: +) -> Union[str, List[str]]: """ A wrapper around the subprocess.check_output function. Args: - commands (list/str): These commands are executed on the command line - working_directory (str/None): The directory where the command is executed - split_output (bool): Boolean flag to split newlines in the output - shell (bool): Additional switch to convert list of commands to one string - error_filename (str): In case the execution fails the output is written to this file + commands (str): The command(s) to be executed on the command line + working_directory (str, optional): The directory where the command is executed. Defaults to None. + split_output (bool, optional): Boolean flag to split newlines in the output. Defaults to True. + shell (bool, optional): Additional switch to convert commands to a single string. Defaults to False. + error_filename (str, optional): In case the execution fails, the output is written to this file. Defaults to "pysqa.err". Returns: - str/list: output of the shell command either as string or as a list of strings + Union[str, List[str]]: Output of the shell command either as a string or as a list of strings """ if shell and isinstance(commands, list): commands = " ".join(commands) diff --git a/pysqa/utils/queues.py b/pysqa/utils/queues.py index c2d28b0..3b348a2 100644 --- a/pysqa/utils/queues.py +++ b/pysqa/utils/queues.py @@ -1,5 +1,4 @@ -# coding: utf-8 -# Copyright (c) Jan Janssen +from typing import List class Queues(object): @@ -8,14 +7,41 @@ class Queues(object): interactive environments like jupyter. """ - def __init__(self, list_of_queues): + def __init__(self, list_of_queues: List[str]): + """ + Initialize the Queues object. + + Args: + list_of_queues (List[str]): A list of queue names. + + """ self._list_of_queues = list_of_queues - def __getattr__(self, item): + def __getattr__(self, item: str) -> str: + """ + Get the queue name. + + Args: + item (str): The name of the queue. + + Returns: + str: The name of the queue. + + Raises: + AttributeError: If the queue name is not in the list of queues. + + """ if item in self._list_of_queues: return item else: raise AttributeError - def __dir__(self): + def __dir__(self) -> List[str]: + """ + Get the list of queues. + + Returns: + List[str]: The list of queues. + + """ return self._list_of_queues diff --git a/pysqa/wrapper/flux.py b/pysqa/wrapper/flux.py index f7b9d2d..63abe72 100644 --- a/pysqa/wrapper/flux.py +++ b/pysqa/wrapper/flux.py @@ -8,24 +8,29 @@ class FluxCommands(SchedulerCommands): @property def submit_job_command(self) -> list[str]: + """Returns the command to submit a job.""" return ["flux", "batch"] @property def delete_job_command(self) -> list[str]: + """Returns the command to delete a job.""" return ["flux", "cancel"] @property def get_queue_status_command(self) -> list[str]: + """Returns the command to get the queue status.""" return ["flux", "jobs", "-a", "--no-header"] @staticmethod def get_job_id_from_output(queue_submit_output: str) -> int: + """Extracts the job ID from the output of the queue submit command.""" return int( JobID(queue_submit_output.splitlines()[-1].rstrip().lstrip().split()[-1]) ) @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: + """Converts the queue status output into a pandas DataFrame.""" line_split_lst = [line.split() for line in queue_status_output.splitlines()] job_id_lst, user_lst, job_name_lst, status_lst = [], [], [], [] for entry in line_split_lst: diff --git a/pysqa/wrapper/generic.py b/pysqa/wrapper/generic.py index a7ec18c..5e72d20 100644 --- a/pysqa/wrapper/generic.py +++ b/pysqa/wrapper/generic.py @@ -22,24 +22,57 @@ class SchedulerCommands(ABC): @property @abstractmethod def submit_job_command(self) -> list[str]: + """ + Returns the command to submit a job to the scheduler. + + Returns: + list[str]: The command to submit a job. + """ pass @property @abstractmethod def delete_job_command(self) -> list[str]: + """ + Returns the command to delete a job from the scheduler. + + Returns: + list[str]: The command to delete a job. + """ pass @property def enable_reservation_command(self) -> list[str]: + """ + Returns the command to enable job reservation on the scheduler. + + Returns: + list[str]: The command to enable job reservation. + """ raise NotImplementedError() @property @abstractmethod def get_queue_status_command(self) -> list[str]: + """ + Returns the command to get the status of the job queue. + + Returns: + list[str]: The command to get the queue status. + """ pass @staticmethod def dependencies(dependency_list: list[str]) -> list: + """ + Returns the list of dependencies for a job. + + Args: + dependency_list (list[str]): The list of dependencies. + + Returns: + list: The list of dependencies. + """ if dependency_list is not None: raise NotImplementedError() else: @@ -47,8 +80,26 @@ def dependencies(dependency_list: list[str]) -> list: @staticmethod def get_job_id_from_output(queue_submit_output: str) -> int: + """ + Returns the job ID from the output of the job submission command. + + Args: + queue_submit_output (str): The output of the job submission command. + + Returns: + int: The job ID. + """ raise NotImplementedError() @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: + """ + Converts the output of the queue status command to a pandas DataFrame. + + Args: + queue_status_output (str): The output of the queue status command. + + Returns: + pandas.DataFrame: The queue status as a DataFrame. + """ raise NotImplementedError() diff --git a/pysqa/wrapper/gent.py b/pysqa/wrapper/gent.py index 3d6e8fd..20ee4ff 100644 --- a/pysqa/wrapper/gent.py +++ b/pysqa/wrapper/gent.py @@ -21,14 +21,44 @@ class GentCommands(SlurmCommands): @staticmethod def get_job_id_from_output(queue_submit_output: str) -> int: + """ + Extracts the job ID from the queue submit output. + + Args: + queue_submit_output (str): The output of the queue submit command. + + Returns: + int: The job ID. + + """ return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(";")[0]) @staticmethod def get_queue_from_output(queue_submit_output: str) -> str: + """ + Extracts the queue name from the queue submit output. + + Args: + queue_submit_output (str): The output of the queue submit command. + + Returns: + str: The queue name. + + """ return str(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(";")[1]) @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: + """ + Converts the queue status output into a pandas DataFrame. + + Args: + queue_status_output (str): The output of the queue status command. + + Returns: + pandas.DataFrame: The converted queue status. + + """ qstat = queue_status_output.splitlines() queue = qstat[0].split(":")[1].strip() if len(qstat) <= 1: # first row contains cluster name, check if there are jobs @@ -52,7 +82,20 @@ def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: ) @staticmethod - def dependencies(dependency_list: list[str]) -> list: + def dependencies(dependency_list: list[str]) -> list[str]: + """ + Returns the dependencies for the job. + + Args: + dependency_list (list[str]): The list of job dependencies. + + Returns: + list[str]: The dependencies for the job. + + Raises: + NotImplementedError: If dependency_list is not None. + + """ if dependency_list is not None: raise NotImplementedError() else: diff --git a/pysqa/wrapper/lsf.py b/pysqa/wrapper/lsf.py index 541ed29..b3762bc 100644 --- a/pysqa/wrapper/lsf.py +++ b/pysqa/wrapper/lsf.py @@ -21,22 +21,27 @@ class LsfCommands(SchedulerCommands): @property def submit_job_command(self) -> list[str]: + """Return the command to submit a job.""" return ["bsub"] @property def delete_job_command(self) -> list[str]: + """Return the command to delete a job.""" return ["bkill"] @property def get_queue_status_command(self) -> list[str]: + """Return the command to get the queue status.""" return ["bjobs"] @staticmethod def get_job_id_from_output(queue_submit_output: str) -> int: + """Extract the job ID from the queue submit output.""" return int(queue_submit_output.split("<")[1].split(">")[0]) @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: + """Convert the queue status output to a pandas DataFrame.""" job_id_lst, user_lst, status_lst, job_name_lst = [], [], [], [] line_split_lst = queue_status_output.split("\n") if len(line_split_lst) > 1: diff --git a/pysqa/wrapper/moab.py b/pysqa/wrapper/moab.py index e930026..6105f13 100644 --- a/pysqa/wrapper/moab.py +++ b/pysqa/wrapper/moab.py @@ -19,12 +19,30 @@ class MoabCommands(SchedulerCommands): @property def submit_job_command(self) -> list[str]: + """ + Get the command to submit a job. + + Returns: + list[str]: The command to submit a job. + """ return ["msub"] @property def delete_job_command(self) -> list[str]: + """ + Get the command to delete a job. + + Returns: + list[str]: The command to delete a job. + """ return ["mjobctl", "-c"] @property def get_queue_status_command(self) -> list[str]: + """ + Get the command to get the queue status. + + Returns: + list[str]: The command to get the queue status. + """ return ["mdiag", "-x"] diff --git a/pysqa/wrapper/sge.py b/pysqa/wrapper/sge.py index 5025bb2..de67bca 100644 --- a/pysqa/wrapper/sge.py +++ b/pysqa/wrapper/sge.py @@ -22,22 +22,36 @@ class SunGridEngineCommands(SchedulerCommands): @property def submit_job_command(self) -> list[str]: + """Return the command to submit a job.""" return ["qsub", "-terse"] @property def delete_job_command(self) -> list[str]: + """Return the command to delete a job.""" return ["qdel"] @property def enable_reservation_command(self) -> list[str]: + """Return the command to enable job reservation.""" return ["qalter", "-R", "y"] @property def get_queue_status_command(self) -> list[str]: + """Return the command to get the queue status.""" return ["qstat", "-xml"] @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: + """Convert the queue status output to a pandas DataFrame. + + Args: + queue_status_output: The output of the queue status command. + + Returns: + A pandas DataFrame containing the converted queue status. + + """ + def leaf_to_dict(leaf): return [ {sub_child.tag: sub_child.text for sub_child in child} for child in leaf diff --git a/pysqa/wrapper/slurm.py b/pysqa/wrapper/slurm.py index 4cfd542..9395c6b 100644 --- a/pysqa/wrapper/slurm.py +++ b/pysqa/wrapper/slurm.py @@ -21,22 +21,27 @@ class SlurmCommands(SchedulerCommands): @property def submit_job_command(self) -> list[str]: + """Returns the command to submit a job to Slurm.""" return ["sbatch", "--parsable"] @property def delete_job_command(self) -> list[str]: + """Returns the command to delete a job from Slurm.""" return ["scancel"] @property def get_queue_status_command(self) -> list[str]: + """Returns the command to get the queue status from Slurm.""" return ["squeue", "--format", "%A|%u|%t|%.15j|%Z", "--noheader"] @staticmethod def get_job_id_from_output(queue_submit_output: str) -> int: + """Extracts the job ID from the output of the job submission command.""" return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split()[-1]) @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: + """Converts the queue status output from Slurm into a pandas DataFrame.""" line_split_lst = [line.split("|") for line in queue_status_output.splitlines()] if len(line_split_lst) != 0: job_id_lst, user_lst, status_lst, job_name_lst, working_directory_lst = zip( @@ -67,7 +72,8 @@ def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: return df @staticmethod - def dependencies(dependency_list: list[str]) -> list: + def dependencies(dependency_list: list[str]) -> list[str]: + """Returns the dependency options for job submission.""" if dependency_list is not None: return ["--dependency=afterok:" + ",".join(dependency_list)] else: diff --git a/pysqa/wrapper/torque.py b/pysqa/wrapper/torque.py index 12888ba..07a0125 100644 --- a/pysqa/wrapper/torque.py +++ b/pysqa/wrapper/torque.py @@ -23,37 +23,47 @@ class TorqueCommands(SchedulerCommands): @property def submit_job_command(self) -> list[str]: + """Returns the command to submit a job.""" return ["qsub"] @property def delete_job_command(self) -> list[str]: + """Returns the command to delete a job.""" return ["qdel"] @property def get_queue_status_command(self) -> list[str]: + """Returns the command to get the queue status.""" return ["qstat", "-f"] @staticmethod def get_job_id_from_output(queue_submit_output: str) -> int: - # strip last line from output, leading and trailing whitespaces, - # and separates the queue id from the stuff after "." - # Adjust if your system doesn't have output like below! - # e.g. qsub run_queue.sh -> "12347673.gadi-pbs", the below returns 12347673 - # It must return an integer for it to not raise an exception later. + """Extracts the job ID from the queue submit output. + + Args: + queue_submit_output (str): The output of the queue submit command. + + Returns: + int: The job ID. + + Raises: + ValueError: If the job ID cannot be extracted. + """ return int( queue_submit_output.splitlines()[-1].rstrip().lstrip().split(sep=".")[0] ) @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: - # # Run the qstat -f command and capture its output - # output = subprocess.check_output(["qstat", "-f"]) -> output is the - # queue_status_output that goes into this function + """Converts the queue status output into a pandas DataFrame. - # Split the output into lines - lines = queue_status_output.split("\n") # .decode().split("\n") + Args: + queue_status_output (str): The output of the queue status command. - # concatenate all lines into a single string + Returns: + pandas.DataFrame: The queue status as a DataFrame. + """ + lines = queue_status_output.split("\n") input_string = "".join(lines) # remove all whitespaces input_string = "".join(input_string.split())