diff --git a/src/qtoolkit/io/__init__.py b/src/qtoolkit/io/__init__.py index cc9080d..bbfbb49 100644 --- a/src/qtoolkit/io/__init__.py +++ b/src/qtoolkit/io/__init__.py @@ -1,6 +1,7 @@ from qtoolkit.io.base import BaseSchedulerIO from qtoolkit.io.pbs import PBSIO, PBSState +from qtoolkit.io.sge import SGEIO, SGEState from qtoolkit.io.shell import ShellIO, ShellState from qtoolkit.io.slurm import SlurmIO, SlurmState -scheduler_mapping = {"slurm": SlurmIO, "pbs": PBSIO, "shell": ShellIO} +scheduler_mapping = {"slurm": SlurmIO, "pbs": PBSIO, "sge": SGEIO, "shell": ShellIO} diff --git a/src/qtoolkit/io/pbs.py b/src/qtoolkit/io/pbs.py index 22f6fce..eec2ea0 100644 --- a/src/qtoolkit/io/pbs.py +++ b/src/qtoolkit/io/pbs.py @@ -1,22 +1,10 @@ from __future__ import annotations import re -from datetime import timedelta - -from qtoolkit.core.data_objects import ( - CancelResult, - CancelStatus, - ProcessPlacement, - QJob, - QJobInfo, - QResources, - QState, - QSubState, - SubmissionResult, - SubmissionStatus, -) -from qtoolkit.core.exceptions import OutputParsingError, UnsupportedResourcesError -from qtoolkit.io.base import BaseSchedulerIO + +from qtoolkit.core.data_objects import QJob, QJobInfo, QState, QSubState +from qtoolkit.core.exceptions import OutputParsingError +from qtoolkit.io.pbs_base import PBSIOBase # States in PBS from qstat's man. # B Array job: at least one subjob has started. @@ -79,7 +67,7 @@ def qstate(self) -> QState: } -class PBSIO(BaseSchedulerIO): +class PBSIO(PBSIOBase): header_template: str = """ #PBS -q $${queue} #PBS -N $${job_name} @@ -100,65 +88,25 @@ class PBSIO(BaseSchedulerIO): SUBMIT_CMD: str | None = "qsub" CANCEL_CMD: str | None = "qdel" + system_name: str = "PBS" + default_unit: str = "mb" + power_labels: dict = {"kb": 0, "mb": 1, "gb": 2, "tb": 3} + _qresources_mapping: dict = { + "queue_name": "queue", + "job_name": "job_name", + "account": "account", + "priority": "priority", + "output_filepath": "qout_path", + "error_filepath": "qerr_path", + "project": "group_list", + } - def parse_submit_output(self, exit_code, stdout, stderr) -> SubmissionResult: - if isinstance(stdout, bytes): - stdout = stdout.decode() - if isinstance(stderr, bytes): - stderr = stderr.decode() - if exit_code != 0: - return SubmissionResult( - exit_code=exit_code, - stdout=stdout, - stderr=stderr, - status=SubmissionStatus("FAILED"), - ) - job_id = stdout.strip() - status = ( - SubmissionStatus("SUCCESSFUL") - if job_id - else SubmissionStatus("JOB_ID_UNKNOWN") - ) - return SubmissionResult( - job_id=job_id, - exit_code=exit_code, - stdout=stdout, - stderr=stderr, - status=status, - ) - - def parse_cancel_output(self, exit_code, stdout, stderr) -> CancelResult: - """Parse the output of the scancel command.""" - # Possible error messages: - # qdel: Unknown Job Id 100 - # qdel: Job has finished 1004 - # Correct execution: no output - if isinstance(stdout, bytes): - stdout = stdout.decode() - if isinstance(stderr, bytes): - stderr = stderr.decode() - if exit_code != 0: - return CancelResult( - exit_code=exit_code, - stdout=stdout, - stderr=stderr, - status=CancelStatus("FAILED"), - ) - - # PBS does not return the job id if the job is successfully deleted - status = CancelStatus("SUCCESSFUL") - return CancelResult( - job_id=None, - exit_code=exit_code, - stdout=stdout, - stderr=stderr, - status=status, - ) - - def _get_job_cmd(self, job_id: str): - cmd = f"qstat -f {job_id}" + def extract_job_id(self, stdout): + return stdout.strip() - return cmd + def extract_job_id_from_cancel(self, stderr): + # PBS doesn't return the job ID if successfully canceled, so return None + return None def parse_job_output(self, exit_code, stdout, stderr) -> QJob | None: out = self.parse_jobs_list_output(exit_code, stdout, stderr) @@ -166,24 +114,11 @@ def parse_job_output(self, exit_code, stdout, stderr) -> QJob | None: return out[0] return None - def _get_jobs_list_cmd( - self, job_ids: list[str] | None = None, user: str | None = None - ) -> str: - if user and job_ids: - raise ValueError("Cannot query by user and job(s) in PBS") - - command = [ - "qstat", - "-f", - ] + def _get_qstat_base_command(self) -> list[str]: + return ["qstat", "-f"] - if user: - command.append(f"-u {user}") - - if job_ids: - command.append(" ".join(job_ids)) - - return " ".join(command) + def _get_job_ids_flag(self, job_ids_str: str) -> str: + return job_ids_str def parse_jobs_list_output(self, exit_code, stdout, stderr) -> list[QJob]: if isinstance(stdout, bytes): @@ -316,143 +251,3 @@ def _convert_str_to_time(time_str: str | None): raise OutputParsingError() return time[3] * 86400 + time[2] * 3600 + time[1] * 60 + time[0] - - @staticmethod - def _convert_memory_str(memory: str | None) -> int | None: - if not memory: - return None - - match = re.match(r"([0-9]+)([a-zA-Z]*)", memory) - if not match: - raise OutputParsingError("No numbers and units parsed") - memory, units = match.groups() - - power_labels = {"kb": 0, "mb": 1, "gb": 2, "tb": 3} - - if not units: - units = "mb" - elif units not in power_labels: - raise OutputParsingError(f"Unknown units {units}") - try: - v = int(memory) - except ValueError: - raise OutputParsingError - - return v * (1024 ** power_labels[units]) - - # helper attribute to match the values defined in QResources and - # the dictionary that should be passed to the template - _qresources_mapping = { - "queue_name": "queue", - "job_name": "job_name", - "account": "account", - "priority": "priority", - "output_filepath": "qout_path", - "error_filepath": "qerr_path", - "project": "group_list", - } - - @staticmethod - def _convert_time_to_str(time: int | float | timedelta) -> str: - if not isinstance(time, timedelta): - time = timedelta(seconds=time) - - hours, remainder = divmod(int(time.total_seconds()), 3600) - minutes, seconds = divmod(remainder, 60) - - time_str = f"{hours}:{minutes}:{seconds}" - return time_str - - def _convert_qresources(self, resources: QResources) -> dict: - """ - Converts a QResources instance to a dict that will be used to fill in the - header of the submission script. - """ - - header_dict = {} - for qr_field, pbs_field in self._qresources_mapping.items(): - val = getattr(resources, qr_field) - if val is not None: - header_dict[pbs_field] = val - - if resources.njobs and resources.njobs > 1: - header_dict["array"] = f"1-{resources.njobs}" - - if resources.time_limit: - header_dict["walltime"] = self._convert_time_to_str(resources.time_limit) - - if resources.rerunnable is not None: - header_dict["rerunnable"] = "y" if resources.rerunnable else "n" - - nodes, processes, processes_per_node = resources.get_processes_distribution() - select = None - if resources.process_placement == ProcessPlacement.NO_CONSTRAINTS: - select = f"select={processes}" - if resources.threads_per_process: - select += f":ncpus={resources.threads_per_process}" - select += f":ompthreads={resources.threads_per_process}" - if resources.memory_per_thread: - threads_per_process = resources.threads_per_process or 1 - select += f":mem={threads_per_process * resources.memory_per_thread}mb" - elif resources.process_placement in ( - ProcessPlacement.EVENLY_DISTRIBUTED, - ProcessPlacement.SAME_NODE, - ProcessPlacement.SCATTERED, - ): - select = f"select={nodes}" - if resources.threads_per_process and resources.threads_per_process > 1: - cpus = resources.threads_per_process * processes_per_node - ompthreads = resources.threads_per_process - else: - cpus = processes_per_node - ompthreads = None - select += f":ncpus={cpus}" - select += f":mpiprocs={processes_per_node}" - if ompthreads: - select += f":ompthreads={ompthreads}" - if resources.memory_per_thread: - mem = cpus * resources.memory_per_thread - select += f":mem={mem}mb" - - if resources.process_placement in ( - ProcessPlacement.EVENLY_DISTRIBUTED, - ProcessPlacement.SCATTERED, - ): - header_dict["place"] = "scatter" - elif resources.process_placement == ProcessPlacement.SAME_NODE: - header_dict["place"] = "pack" - else: - msg = f"process placement {resources.process_placement} is not supported for PBS" - raise UnsupportedResourcesError(msg) - - header_dict["select"] = select - - if resources.email_address: - header_dict["mail_user"] = resources.email_address - header_dict["mail_type"] = "abe" - - if resources.scheduler_kwargs: - header_dict.update(resources.scheduler_kwargs) - - return header_dict - - @property - def supported_qresources_keys(self) -> list: - """ - List of attributes of QResources that are correctly handled by the - _convert_qresources method. It is used to validate that the user - does not pass an unsupported value, expecting to have an effect. - """ - supported = list(self._qresources_mapping.keys()) - supported += [ - "njobs", - "time_limit", - "processes", - "processes_per_node", - "process_placement", - "nodes", - "threads_per_process", - "email_address", - "scheduler_kwargs", - ] - return supported diff --git a/src/qtoolkit/io/pbs_base.py b/src/qtoolkit/io/pbs_base.py new file mode 100644 index 0000000..d414dfa --- /dev/null +++ b/src/qtoolkit/io/pbs_base.py @@ -0,0 +1,247 @@ +from __future__ import annotations + +import abc +import re +from abc import ABC +from datetime import timedelta + +from qtoolkit.core.data_objects import ( + CancelResult, + CancelStatus, + ProcessPlacement, + QResources, + SubmissionResult, + SubmissionStatus, +) +from qtoolkit.core.exceptions import OutputParsingError, UnsupportedResourcesError +from qtoolkit.io.base import BaseSchedulerIO + + +class PBSIOBase(BaseSchedulerIO, ABC): + """Abstract class for PBS and SGE schedulers.""" + + header_template: str + + SUBMIT_CMD: str | None = "qsub" + CANCEL_CMD: str | None = "qdel" + _qresources_mapping: dict + system_name: str + default_unit: str + power_labels: dict + + def parse_submit_output(self, exit_code, stdout, stderr) -> SubmissionResult: + if isinstance(stdout, bytes): + stdout = stdout.decode() + if isinstance(stderr, bytes): + stderr = stderr.decode() + if exit_code != 0: + return SubmissionResult( + exit_code=exit_code, + stdout=stdout, + stderr=stderr, + status=SubmissionStatus("FAILED"), + ) + job_id = self.extract_job_id(stdout) + status = ( + SubmissionStatus("SUCCESSFUL") + if job_id + else SubmissionStatus("JOB_ID_UNKNOWN") + ) + return SubmissionResult( + job_id=job_id, + exit_code=exit_code, + stdout=stdout, + stderr=stderr, + status=status, + ) + + @abc.abstractmethod + def extract_job_id(self, stdout): + pass + + def parse_cancel_output(self, exit_code, stdout, stderr) -> CancelResult: + """Parse the output of the qdel command.""" + if isinstance(stdout, bytes): + stdout = stdout.decode() + if isinstance(stderr, bytes): + stderr = stderr.decode() + if exit_code != 0: + return CancelResult( + exit_code=exit_code, + stdout=stdout, + stderr=stderr, + status=CancelStatus("FAILED"), + ) + + job_id = self.extract_job_id_from_cancel(stderr) + status = CancelStatus("SUCCESSFUL") + return CancelResult( + job_id=job_id, + exit_code=exit_code, + stdout=stdout, + stderr=stderr, + status=status, + ) + + @abc.abstractmethod + def extract_job_id_from_cancel(self, stderr): + pass + + def _get_jobs_list_cmd( + self, job_ids: list[str] | None = None, user: str | None = None + ) -> str: + if user and job_ids: + self._check_user_and_job_ids_conflict() + + command = self._get_qstat_base_command() + + if user: + command.append(f"-u {user}") + + if job_ids: + job_ids_str = ",".join(job_ids) + command.append(self._get_job_ids_flag(job_ids_str)) + + return " ".join(command) + + def _check_user_and_job_ids_conflict(self): + # Use system_name for more informative error messages + raise ValueError(f"Cannot query by user and job(s) in {self.system_name}") + + @abc.abstractmethod + def _get_qstat_base_command(self) -> list[str]: + pass + + @abc.abstractmethod + def _get_job_ids_flag(self, job_ids_str: str) -> str: + pass + + def _get_job_cmd(self, job_id: str): + cmd = f"qstat -j {job_id}" + return cmd + + def _convert_memory_str(self, memory: str | None) -> int | None: + if not memory: + return None + + match = re.match(r"([0-9]+)([a-zA-Z]*)", memory) + if not match: + raise OutputParsingError("No numbers and units parsed") + memory, units = match.groups() + + # Now we call the methods specific to the child class (PBSIO or SGEIO) + power_labels = self.power_labels + + if not units: + units = self.default_unit + elif units.lower() not in power_labels: + raise OutputParsingError(f"Unknown units {units}") + + try: + v = int(memory) + except ValueError: + raise OutputParsingError + + return v * (1024 ** power_labels[units.lower()]) + + @staticmethod + def _convert_time_to_str(time: int | float | timedelta) -> str: + if not isinstance(time, timedelta): + time = timedelta(seconds=time) + + hours, remainder = divmod(int(time.total_seconds()), 3600) + minutes, seconds = divmod(remainder, 60) + + time_str = f"{hours}:{minutes}:{seconds}" + return time_str + + def _convert_qresources(self, resources: QResources) -> dict: + header_dict = {} + for qr_field, system_field in self._qresources_mapping.items(): + val = getattr(resources, qr_field) + if val is not None: + header_dict[system_field] = val + + if resources.njobs and resources.njobs > 1: + header_dict["array"] = f"1-{resources.njobs}" + + if resources.time_limit: + header_dict["walltime"] = self._convert_time_to_str(resources.time_limit) + self._add_soft_walltime(header_dict, resources) + + if resources.rerunnable is not None: + header_dict["rerunnable"] = "y" if resources.rerunnable else "n" + + # Build select clause logic directly within _convert_qresources + nodes, processes, processes_per_node = resources.get_processes_distribution() + select = None + if resources.process_placement == ProcessPlacement.NO_CONSTRAINTS: + select = f"select={processes}" + if resources.threads_per_process: + select += f":ncpus={resources.threads_per_process}" + select += f":ompthreads={resources.threads_per_process}" + if resources.memory_per_thread: + threads_per_process = resources.threads_per_process or 1 + select += f":mem={threads_per_process * resources.memory_per_thread}mb" + elif resources.process_placement in ( + ProcessPlacement.EVENLY_DISTRIBUTED, + ProcessPlacement.SAME_NODE, + ProcessPlacement.SCATTERED, + ): + select = f"select={nodes}" + if resources.threads_per_process and resources.threads_per_process > 1: + cpus = resources.threads_per_process * processes_per_node + ompthreads = resources.threads_per_process + else: + cpus = processes_per_node + ompthreads = None + select += f":ncpus={cpus}" + select += f":mpiprocs={processes_per_node}" + if ompthreads: + select += f":ompthreads={ompthreads}" + if resources.memory_per_thread: + mem = cpus * resources.memory_per_thread + select += f":mem={mem}mb" + + if resources.process_placement in ( + ProcessPlacement.EVENLY_DISTRIBUTED, + ProcessPlacement.SCATTERED, + ): + header_dict["place"] = "scatter" + elif resources.process_placement == ProcessPlacement.SAME_NODE: + header_dict["place"] = "pack" + else: + raise UnsupportedResourcesError( + f"process placement {resources.process_placement} is not supported for {self.system_name}" + ) + + header_dict["select"] = select + + if resources.email_address: + header_dict["mail_user"] = resources.email_address + header_dict["mail_type"] = "abe" + + if resources.scheduler_kwargs: + header_dict.update(resources.scheduler_kwargs) + + return header_dict + + @abc.abstractmethod + def _add_soft_walltime(self, header_dict: dict, resources: QResources): + """Add soft_walltime if required by child classes (SGE).""" + + @property + def supported_qresources_keys(self) -> list: + supported = list(self._qresources_mapping.keys()) + supported += [ + "njobs", + "time_limit", + "processes", + "processes_per_node", + "process_placement", + "nodes", + "threads_per_process", + "email_address", + "scheduler_kwargs", + ] + return supported diff --git a/src/qtoolkit/io/sge.py b/src/qtoolkit/io/sge.py new file mode 100644 index 0000000..19bb38c --- /dev/null +++ b/src/qtoolkit/io/sge.py @@ -0,0 +1,366 @@ +from __future__ import annotations + +import re +import xml.dom.minidom +import xml.parsers.expat + +from qtoolkit.core.data_objects import QJob, QJobInfo, QResources, QState, QSubState +from qtoolkit.core.exceptions import CommandFailedError, OutputParsingError +from qtoolkit.io.pbs_base import PBSIOBase + +# https://wiki.nikhil.io/Ancient_Sysadmin_Stuff/Sun_Grid_Engine_States/ +# https://manpages.ubuntu.com/manpages/jammy/en/man5/sge_status.5.html +# Jobs Status: +# 'qw' - Queued and waiting, +# 'w' - Job waiting, +# 's' - Job suspended, +# 't' - Job transferring and about to start, +# 'r' - Job running, +# 'h' - Job hold, +# 'R' - Job restarted, +# 'd' - Job has been marked for deletion, +# 'Eqw' - An error occurred with the job. +# 'z' - finished +# +# Category State SGE Letter Code +# Pending: pending qw +# Pending: pending, user hold qw +# Pending: pending, system hold hqw +# Pending: pending, user and system hold hqw +# Pending: pending, user hold, re-queue hRwq +# Pending: pending, system hold, re-queue hRwq +# Pending: pending, user and system hold, re-queue hRwq +# Pending: pending, user hold qw +# Pending: pending, user hold qw +# Running running r +# Running transferring t +# Running running, re-submit Rr +# Running transferring, re-submit Rt +# Suspended job suspended s, ts +# Suspended queue suspended S, tS +# Suspended queue suspended by alarm T, tT +# Suspended all suspended with re-submit Rs, Rts, RS, RtS, RT, RtT +# Error all pending states with error Eqw, Ehqw, EhRqw +# Deleted all running and suspended states with deletion dr, dt, dRr, dRt, +# ds, dS, dT, dRs, +# dRS, dRT + + +class SGEState(QSubState): + # Job states + FINISHED = "z" + QUEUED_WAITING = "qw" + WAITING = "w" + JOB_SUSPENDED = "s" + TRANSFERRING = "t" + RUNNING = "r" + HOLD = "hqw" + RESTARTED = "R" + DELETION = "d" + ERROR_PENDING = "Eqw" + ERROR_PENDING_HOLD = "Ehqw" + ERROR_PENDING_HOLD_REQUEUE = "EhRqw" + DELETION_RUNNING = "dr" + DELETION_TRANSFERRING = "dt" + DELETION_RUNNING_RESUBMIT = "dRr" + DELETION_TRANSFERRING_RESUBMIT = "dRt" + DELETION_SUSPENDED_JOB = "ds" + DELETION_SUSPENDED_QUEUE = "dS" + DELETION_SUSPENDED_ALARM = "dT" + DELETION_SUSPENDED_RESUBMIT_JOB = "dRs" + DELETION_SUSPENDED_RESUBMIT_QUEUE = "dRS" + DELETION_SUSPENDED_RESUBMIT_ALARM = "dRT" + + @property + def qstate(self) -> QState: + return _STATUS_MAPPING[self] # type: ignore + + +_STATUS_MAPPING = { + SGEState.FINISHED: QState.DONE, + SGEState.QUEUED_WAITING: QState.QUEUED, + SGEState.WAITING: QState.QUEUED, + SGEState.HOLD: QState.QUEUED_HELD, + SGEState.ERROR_PENDING: QState.FAILED, + SGEState.ERROR_PENDING_HOLD: QState.FAILED, + SGEState.ERROR_PENDING_HOLD_REQUEUE: QState.FAILED, + SGEState.RUNNING: QState.RUNNING, + SGEState.TRANSFERRING: QState.RUNNING, + SGEState.RESTARTED: QState.RUNNING, + SGEState.JOB_SUSPENDED: QState.SUSPENDED, + SGEState.DELETION: QState.FAILED, + SGEState.DELETION_RUNNING: QState.FAILED, + SGEState.DELETION_TRANSFERRING: QState.FAILED, + SGEState.DELETION_RUNNING_RESUBMIT: QState.FAILED, + SGEState.DELETION_TRANSFERRING_RESUBMIT: QState.FAILED, + SGEState.DELETION_SUSPENDED_JOB: QState.SUSPENDED, + SGEState.DELETION_SUSPENDED_QUEUE: QState.SUSPENDED, + SGEState.DELETION_SUSPENDED_ALARM: QState.SUSPENDED, + SGEState.DELETION_SUSPENDED_RESUBMIT_JOB: QState.SUSPENDED, + SGEState.DELETION_SUSPENDED_RESUBMIT_QUEUE: QState.SUSPENDED, + SGEState.DELETION_SUSPENDED_RESUBMIT_ALARM: QState.SUSPENDED, +} + + +class SGEIO(PBSIOBase): + header_template: str = """ +#$ -cwd $${cwd} +#$ -q $${queue} +#$ -N $${job_name} +#$ -P $${device} +#$ -l $${select} +#$ -l h_rt=$${walltime} +#$ -l s_rt=$${soft_walltime} +#$ -pe $${model} +#$ -binding $${place} +#$ -W group_list=$${group_list} +#$ -M $${mail_user} +#$ -m $${mail_type} +#$ -o $${qout_path} +#$ -e $${qerr_path} +#$ -p $${priority} +#$ -r $${rerunnable} +#$ -t $${array} +$${qverbatim}""" + + SUBMIT_CMD: str | None = "qsub" + CANCEL_CMD: str | None = "qdel" + system_name: str = "SGE" + default_unit: str = "M" + power_labels: dict = {"k": 0, "m": 1, "g": 2, "t": 3} + _qresources_mapping: dict = { + "queue_name": "queue", + "job_name": "job_name", + "priority": "priority", + "output_filepath": "qout_path", + "error_filepath": "qerr_path", + "project": "group_list", + } + + def __init__(self, get_job_executable: str = "qstat"): + super().__init__() + self.get_job_executable = get_job_executable + + def extract_job_id(self, stdout): + match = re.search(r'Your job (\d+) \(".*?"\) has been submitted', stdout) + if not match: + raise OutputParsingError("Failed to parse job ID from stdout") + return match.group(1) + + def extract_job_id_from_cancel(self, stderr): + match = re.search(r"qdel: job (\d+) deleted", stderr) + if not match: + raise OutputParsingError("Failed to parse job ID from stdout") + return match.group(1) + + def parse_job_output(self, exit_code, stdout, stderr) -> QJob | None: # aiida style + if exit_code != 0: + msg = f"command {self.get_job_executable or 'qacct'} failed: {stderr}" + raise CommandFailedError(msg) + if isinstance(stdout, bytes): + stdout = stdout.decode() + if isinstance(stderr, bytes): + stderr = stderr.decode() + + # Check for specific error messages in stderr or stdout + error_patterns = [ + re.compile( + r"Primary job\s+terminated normally, but\s+(\d+)\s+process returned a non-zero exit code", + re.IGNORECASE, + ), + re.compile( + r"mpiexec detected that one or more processes exited with non-zero status", + re.IGNORECASE, + ), + re.compile(r"An error occurred in MPI_Allreduce", re.IGNORECASE), + re.compile( + r"Error: mca_pml_ucx_send_nbr failed: -25, Connection reset by remote peer", + re.IGNORECASE, + ), + re.compile(r"mpi_errors_are_fatal", re.IGNORECASE), + ] + + for pattern in error_patterns: + if pattern.search(stderr) or pattern.search(stdout): + msg = f"command {self.get_job_executable or 'qacct'} failed: {stderr}" + raise CommandFailedError(msg) + + if not stdout.strip(): + return None + + # Check if stdout is in XML format + try: + xmldata = xml.dom.minidom.parseString(stdout) + job_info = xmldata.getElementsByTagName("job_list")[0] + job_id = job_info.getElementsByTagName("JB_job_number")[ + 0 + ].firstChild.nodeValue + job_name = job_info.getElementsByTagName("JB_name")[0].firstChild.nodeValue + owner = job_info.getElementsByTagName("JB_owner")[0].firstChild.nodeValue + state = job_info.getElementsByTagName("state")[0].firstChild.nodeValue + queue_name = job_info.getElementsByTagName("queue_name")[ + 0 + ].firstChild.nodeValue + slots = job_info.getElementsByTagName("slots")[0].firstChild.nodeValue + tasks = job_info.getElementsByTagName("tasks")[0].firstChild.nodeValue + + sge_state = SGEState(state) + job_state = sge_state.qstate + + try: + cpus = int(slots) + nodes = int(tasks) + threads_per_process = int(cpus / nodes) + except ValueError: + cpus = None + nodes = None + threads_per_process = None + + return QJob( + name=job_name, + job_id=job_id, + state=job_state, + sub_state=sge_state, + account=owner, + queue_name=queue_name, + info=QJobInfo( + nodes=nodes, cpus=cpus, threads_per_process=threads_per_process + ), + ) + except Exception: + # Not XML, fallback to plain text + job_info = {} + for line in stdout.split("\n"): + if ":" in line: + key, value = line.split(":", 1) + job_info[key.strip()] = value.strip() + + try: + cpus = int(job_info.get("slots", 1)) + nodes = int(job_info.get("tasks", 1)) + threads_per_process = int(cpus / nodes) + except ValueError: + cpus = None + nodes = None + threads_per_process = None + + state_str = job_info.get("state") + sge_state = SGEState(state_str) if state_str else None + job_state = sge_state.qstate + + return QJob( + name=job_info.get("job_name"), + job_id=job_info.get("job_id"), + state=job_state, + sub_state=sge_state, + account=job_info.get("owner"), + queue_name=job_info.get("queue_name"), + info=QJobInfo( + nodes=nodes, cpus=cpus, threads_per_process=threads_per_process + ), + ) + + def _get_element_text(self, parent, tag_name): + elements = parent.getElementsByTagName(tag_name) + if elements: + return elements[0].childNodes[0].data.strip() + return None + + def _safe_int(self, value: str | None) -> int | None: + if value is None: + return None + try: + return int(value) + except ValueError: + return None + + def _get_qstat_base_command(self) -> list[str]: + return ["qstat", "-ext", "-urg", "-xml"] + + def _get_job_ids_flag(self, job_ids_str: str) -> str: + return f"-j {job_ids_str}" + + def parse_jobs_list_output(self, exit_code, stdout, stderr) -> list[QJob]: + if exit_code != 0: + msg = f"command {self.get_job_executable or 'qacct'} failed: {stderr}" + raise CommandFailedError(msg) + if isinstance(stdout, bytes): + stdout = stdout.decode() + if isinstance(stderr, bytes): + stderr = stderr.decode() + + try: + xmldata = xml.dom.minidom.parseString(stdout) + except xml.parsers.expat.ExpatError: + raise OutputParsingError("XML parsing of stdout failed") + + job_elements = xmldata.getElementsByTagName("job_list") + jobs_list = [] + + for job_element in job_elements: + qjob = QJob() + qjob.job_id = self._get_element_text(job_element, "JB_job_number") + job_state_string = self._get_element_text(job_element, "state") + + try: + sge_job_state = SGEState(job_state_string) + except ValueError: + raise OutputParsingError( + f"Unknown job state {job_state_string} for job id {qjob.job_id}" + ) + + qjob.sub_state = sge_job_state + qjob.state = sge_job_state.qstate + qjob.username = self._get_element_text(job_element, "JB_owner") + qjob.name = self._get_element_text(job_element, "JB_name") + + info = QJobInfo() + info.nodes = self._safe_int( + self._get_element_text(job_element, "num_nodes") + ) + info.cpus = self._safe_int(self._get_element_text(job_element, "num_proc")) + info.memory_per_cpu = self._convert_memory_str( + self._get_element_text(job_element, "hard resource_list.mem_free") + ) + info.partition = self._get_element_text(job_element, "queue") + info.time_limit = self._convert_str_to_time( + self._get_element_text(job_element, "hard resource_list.h_rt") + ) + + qjob.info = info + + jobs_list.append(qjob) + + return jobs_list + + @staticmethod + def _convert_str_to_time(time_str: str | None) -> int | None: + if time_str is None: + return None + + parts = time_str.split(":") + if len(parts) == 3: + hours, minutes, seconds = parts + elif len(parts) == 2: + hours, minutes = "0", parts[0] + seconds = parts[1] + elif len(parts) == 1: + hours, minutes, seconds = "0", "0", parts[0] + else: + raise OutputParsingError(f"Invalid time format: {time_str}") + + try: + return int(hours) * 3600 + int(minutes) * 60 + int(seconds) + except ValueError: + raise OutputParsingError(f"Invalid time format: {time_str}") + + def _add_soft_walltime(self, header_dict: dict, resources: QResources): + header_dict["soft_walltime"] = self._convert_time_to_str( + resources.time_limit * 0.99 + ) + + @property + def supported_qresources_keys(self) -> list: + supported = super().supported_qresources_keys + supported += ["memory_per_thread", "gpus_per_job"] + return supported diff --git a/tests/conftest.py b/tests/conftest.py index 004a414..c82ebd0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -119,7 +119,7 @@ def test_utils(): return TestUtils -@pytest.fixture(scope="session") +@pytest.fixture() # scope="session") def maximalist_qresources(): """A set of QResources options that try to make use of most features""" from qtoolkit.core.data_objects import QResources diff --git a/tests/io/test_sge.py b/tests/io/test_sge.py new file mode 100644 index 0000000..efe354e --- /dev/null +++ b/tests/io/test_sge.py @@ -0,0 +1,278 @@ +from datetime import timedelta +from pathlib import Path + +import pytest +from monty.serialization import loadfn + +from qtoolkit.core.data_objects import ProcessPlacement, QResources, QState +from qtoolkit.core.exceptions import OutputParsingError, UnsupportedResourcesError +from qtoolkit.io.sge import SGEIO, SGEState + +TEST_DIR = Path(__file__).resolve().parents[1] / "test_data" +submit_ref_file = TEST_DIR / "io" / "sge" / "parse_submit_output_inout.yaml" +in_out_submit_ref_list = loadfn(submit_ref_file) +cancel_ref_file = TEST_DIR / "io" / "sge" / "parse_cancel_output_inout.yaml" +in_out_cancel_ref_list = loadfn(cancel_ref_file) +job_ref_file = TEST_DIR / "io" / "sge" / "parse_job_output_inout.yaml" +in_out_job_ref_list = loadfn(job_ref_file) + + +@pytest.fixture(scope="module") +def sge_io(): + return SGEIO() + + +class TestSGEState: + @pytest.mark.parametrize("sge_state", [s for s in SGEState]) + def test_qstate(self, sge_state): + assert isinstance(sge_state.qstate, QState) + assert SGEState("hqw") == SGEState.HOLD + assert SGEState("r") == SGEState.RUNNING + assert SGEState("Eqw") == SGEState.ERROR_PENDING + assert SGEState("dr") == SGEState.DELETION_RUNNING + + +class TestSGEIO: + @pytest.mark.parametrize("in_out_ref", in_out_submit_ref_list) + def test_parse_submit_output(self, sge_io, in_out_ref, test_utils): + parse_cmd_output, sr_ref = test_utils.inkwargs_outref( + in_out_ref, inkey="parse_submit_kwargs", outkey="submission_result_ref" + ) + sr = sge_io.parse_submit_output(**parse_cmd_output) + assert sr == sr_ref + sr = sge_io.parse_submit_output( + exit_code=parse_cmd_output["exit_code"], + stdout=bytes(parse_cmd_output["stdout"], "utf-8"), + stderr=bytes(parse_cmd_output["stderr"], "utf-8"), + ) + assert sr == sr_ref + sr = sge_io.parse_submit_output( + exit_code=parse_cmd_output["exit_code"], + stdout=bytes(parse_cmd_output["stdout"], "ascii"), + stderr=bytes(parse_cmd_output["stderr"], "ascii"), + ) + assert sr == sr_ref + + @pytest.mark.parametrize("in_out_ref", in_out_cancel_ref_list) + def test_parse_cancel_output(self, sge_io, in_out_ref, test_utils): + parse_cmd_output, cr_ref = test_utils.inkwargs_outref( + in_out_ref, inkey="parse_cancel_kwargs", outkey="cancel_result_ref" + ) + cr = sge_io.parse_cancel_output(**parse_cmd_output) + assert cr == cr_ref + cr = sge_io.parse_cancel_output( + exit_code=parse_cmd_output["exit_code"], + stdout=bytes(parse_cmd_output["stdout"], "utf-8"), + stderr=bytes(parse_cmd_output["stderr"], "utf-8"), + ) + assert cr == cr_ref + cr = sge_io.parse_cancel_output( + exit_code=parse_cmd_output["exit_code"], + stdout=bytes(parse_cmd_output["stdout"], "ascii"), + stderr=bytes(parse_cmd_output["stderr"], "ascii"), + ) + assert cr == cr_ref + + @pytest.mark.parametrize("in_out_ref", in_out_job_ref_list) + def test_parse_job_output(self, sge_io, in_out_ref, test_utils): + parse_cmd_output, job_ref = test_utils.inkwargs_outref( + in_out_ref, inkey="parse_job_kwargs", outkey="job_ref" + ) + if "stderr" not in parse_cmd_output: + parse_cmd_output["stderr"] = "" + job = sge_io.parse_job_output(**parse_cmd_output) + assert job == job_ref + job = sge_io.parse_job_output( + exit_code=parse_cmd_output["exit_code"], + stdout=bytes(parse_cmd_output["stdout"], "utf-8"), + stderr=bytes(parse_cmd_output["stderr"], "utf-8"), + ) + assert job == job_ref + job = sge_io.parse_job_output( + exit_code=parse_cmd_output["exit_code"], + stdout=bytes(parse_cmd_output["stdout"], "ascii"), + stderr=bytes(parse_cmd_output["stderr"], "ascii"), + ) + assert job == job_ref + + def test_get_job_cmd(self, sge_io): + cmd = sge_io._get_job_cmd(3) + assert cmd == "qstat -j 3" + cmd = sge_io._get_job_cmd("56") + assert cmd == "qstat -j 56" + + def test_get_jobs_list_cmd(self, sge_io): + with pytest.raises( + ValueError, match=r"Cannot query by user and job\(s\) in SGE" + ): + sge_io._get_jobs_list_cmd(job_ids=["1"], user="johndoe") + cmd = sge_io._get_jobs_list_cmd(user="johndoe") + assert cmd == "qstat -ext -urg -xml -u johndoe" + cmd = sge_io._get_jobs_list_cmd(job_ids=["1", "3", "56", "15"]) + assert cmd == "qstat -ext -urg -xml -j 1,3,56,15" + cmd = sge_io._get_jobs_list_cmd(job_ids=["1"]) + assert cmd == "qstat -ext -urg -xml -j 1" + + def test_convert_str_to_time(self, sge_io): + time_seconds = sge_io._convert_str_to_time("10:51:13") + assert time_seconds == 39073 + time_seconds = sge_io._convert_str_to_time("02:10:02") + assert time_seconds == 7802 + time_seconds = sge_io._convert_str_to_time("10:02") + assert time_seconds == 602 + time_seconds = sge_io._convert_str_to_time("45") + assert time_seconds == 45 + + with pytest.raises(OutputParsingError): + sge_io._convert_str_to_time("2:10:02:10") + + with pytest.raises(OutputParsingError): + sge_io._convert_str_to_time("2:10:a") + + def test_convert_memory_str(self, sge_io): + assert isinstance(sge_io, SGEIO) + memory_kb = sge_io._convert_memory_str(None) + assert memory_kb is None + memory_kb = sge_io._convert_memory_str("") + assert memory_kb is None + + memory_kb = sge_io._convert_memory_str("12M") + assert memory_kb == 12288 + memory_kb = sge_io._convert_memory_str("13K") + assert memory_kb == 13 + memory_kb = sge_io._convert_memory_str("5G") + assert memory_kb == 5242880 + memory_kb = sge_io._convert_memory_str("1T") + assert memory_kb == 1073741824 + + with pytest.raises(OutputParsingError): + sge_io._convert_memory_str("aT") + + def test_convert_time_to_str(self, sge_io): + time_str = sge_io._convert_time_to_str(10) + assert time_str == "0:0:10" + time_str = sge_io._convert_time_to_str(39073) + assert time_str == "10:51:13" + time_str = sge_io._convert_time_to_str(7802) + assert time_str == "2:10:2" + time_str = sge_io._convert_time_to_str(602) + assert time_str == "0:10:2" + + time_str = sge_io._convert_time_to_str(timedelta(seconds=39073)) + assert time_str == "10:51:13" + time_str = sge_io._convert_time_to_str( + timedelta(hours=15, minutes=19, seconds=32) + ) + assert time_str == "15:19:32" + + # test float + time_str = sge_io._convert_time_to_str(602.0) + assert time_str == "0:10:2" + + # test negative + # negative time makes no sense and should not be passed. this test is just to be alerted + # if the output for negative numbers changes + time_str = sge_io._convert_time_to_str(-10) + assert time_str == "-1:59:50" + + def test_check_convert_qresources(self, sge_io): + res = QResources( + queue_name="myqueue", + job_name="myjob", + memory_per_thread=2048, + priority=1, + output_filepath="someoutputpath", + error_filepath="someerrorpath", + njobs=4, + time_limit=39073, + process_placement=ProcessPlacement.EVENLY_DISTRIBUTED, + nodes=4, + processes_per_node=3, + threads_per_process=2, + email_address="john.doe@submit.qtk", + scheduler_kwargs={"tata": "toto", "titi": "tutu"}, + ) + header_dict = sge_io.check_convert_qresources(resources=res) + assert header_dict == { + "queue": "myqueue", + "job_name": "myjob", + "place": "scatter", + "priority": 1, + "qout_path": "someoutputpath", + "qerr_path": "someerrorpath", + "array": "1-4", + "walltime": "10:51:13", + "select": "select=4:ncpus=6:mpiprocs=3:ompthreads=2:mem=12288mb", + "soft_walltime": "10:44:42", + "mail_user": "john.doe@submit.qtk", + "mail_type": "abe", + "tata": "toto", + "titi": "tutu", + } + + res = QResources( + time_limit=39073, + processes=24, + ) + header_dict = sge_io.check_convert_qresources(resources=res) + assert header_dict == { + "walltime": "10:51:13", + "soft_walltime": "10:44:42", + "select": "select=24", # also not sure about this + } + + res = QResources( + njobs=1, + processes=24, + gpus_per_job=4, + ) + header_dict = sge_io.check_convert_qresources(resources=res) + assert header_dict == { + "select": "select=24", + } + + res = QResources( + processes=5, + rerunnable=True, + ) + with pytest.raises( + UnsupportedResourcesError, match=r"Keys not supported: rerunnable" + ): + sge_io.check_convert_qresources(res) + + def test_submission_script(self, sge_io, maximalist_qresources): + # remove unsupported SGE options + maximalist_qresources.rerunnable = None + maximalist_qresources.project = None + maximalist_qresources.account = None + maximalist_qresources.qos = None + maximalist_qresources.process_placement = ProcessPlacement.EVENLY_DISTRIBUTED + + # Set `processes` to None to avoid the conflict + maximalist_qresources.processes = None + + # generate the SGE submission script + script_qresources = sge_io.get_submission_script( + commands=["ls -l"], options=maximalist_qresources + ) + + # assert the correctness of the generated script + assert ( + script_qresources.split("\n") + == """#!/bin/bash + +#$ -q test_queue +#$ -N test_job +#$ -l select=1:ncpus=1:mpiprocs=1:mem=1000mb +#$ -l h_rt=0:1:40 +#$ -l s_rt=0:1:39 +#$ -binding scatter +#$ -M test_email_address@email.address +#$ -m abe +#$ -o test_output_filepath +#$ -e test_error_filepath +#$ -p 1 +ls -l""".split( + "\n" + ) + ) diff --git a/tests/io/test_slurm.py b/tests/io/test_slurm.py index edc7cee..21f3568 100644 --- a/tests/io/test_slurm.py +++ b/tests/io/test_slurm.py @@ -279,8 +279,11 @@ def test_submission_script(self, slurm_io, maximalist_qresources): script_qresources = slurm_io.get_submission_script( commands=["ls -l"], options=maximalist_qresources ) - assert script_qresources.split( - "\n" - ) == "#!/bin/bash\n\n#SBATCH --partition=test_queue\n#SBATCH --job-name=test_job\n#SBATCH --nodes=1\n#SBATCH --ntasks=1\n#SBATCH --ntasks-per-node=1\n#SBATCH --cpus-per-task=1\n#SBATCH --mem-per-cpu=1000\n#SBATCH --time=0-0:1:40\n#SBATCH --account=test_account\n#SBATCH --mail-user=test_email_address@email.address\n#SBATCH --mail-type=ALL\n#SBATCH --gres=gpu:1\n#SBATCH --output=test_output_filepath\n#SBATCH --error=test_error_filepath\n#SBATCH --qos=test_qos\n#SBATCH --priority=1\nls -l".split( - "\n" - ) + assert script_qresources.split("\n") == ( + "#!/bin/bash\n\n#SBATCH --partition=test_queue\n#SBATCH --job-name=test_job\n#SBATCH --nodes=1\n#SBATCH " + "--ntasks=1\n#SBATCH --ntasks-per-node=1\n#SBATCH --cpus-per-task=1\n#SBATCH " + "--mem-per-cpu=1000\n#SBATCH --time=0-0:1:40\n#SBATCH --account=test_account\n#SBATCH " + "--mail-user=test_email_address@email.address\n#SBATCH --mail-type=ALL\n#SBATCH --gres=gpu:1\n#SBATCH " + "--output=test_output_filepath\n#SBATCH --error=test_error_filepath\n#SBATCH --qos=test_qos\n#SBATCH " + "--priority=1\nls -l" + ).split("\n") diff --git a/tests/test_data/io/sge/create_parse_cancel_output_inout.py b/tests/test_data/io/sge/create_parse_cancel_output_inout.py new file mode 100644 index 0000000..8548471 --- /dev/null +++ b/tests/test_data/io/sge/create_parse_cancel_output_inout.py @@ -0,0 +1,102 @@ +import json + +import yaml + +from qtoolkit.io.sge import SGEIO + +sge_io = SGEIO() + +mylist = [] + +# First case: successful termination +return_code = 0 +stdout = b"" +stderr = b"qdel: job 267 deleted\n" + +cr = sge_io.parse_cancel_output(exit_code=return_code, stdout=stdout, stderr=stderr) + +a = { + "parse_cancel_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "cancel_result_ref": json.dumps(cr.as_dict()), +} +mylist.append(a) + +# Second case: no job identification provided +return_code = 1 +stdout = b"" +stderr = b"qdel: No job id specified\n" + +cr = sge_io.parse_cancel_output(exit_code=return_code, stdout=stdout, stderr=stderr) + +a = { + "parse_cancel_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "cancel_result_ref": json.dumps(cr.as_dict()), +} +mylist.append(a) + +# Third case: access/permission denied +return_code = 210 +stdout = b"" +stderr = b"qdel: job 1 access denied\n" + +cr = sge_io.parse_cancel_output(exit_code=return_code, stdout=stdout, stderr=stderr) + +a = { + "parse_cancel_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "cancel_result_ref": json.dumps(cr.as_dict()), +} +mylist.append(a) + +# Fourth case: invalid job id +return_code = 1 +stdout = b"" +stderr = b"qdel: Invalid job id a\n" + +cr = sge_io.parse_cancel_output(exit_code=return_code, stdout=stdout, stderr=stderr) + +a = { + "parse_cancel_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "cancel_result_ref": json.dumps(cr.as_dict()), +} +mylist.append(a) + +# Fifth case: job already completed +return_code = 0 +stdout = b"" +stderr = b"qdel: job 269 deleted\nqdel: job 269 already completed\n" + +cr = sge_io.parse_cancel_output(exit_code=return_code, stdout=stdout, stderr=stderr) + +a = { + "parse_cancel_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "cancel_result_ref": json.dumps(cr.as_dict()), +} +mylist.append(a) + +# Sixth case: invalid job id specified +return_code = 0 +stdout = b"" +stderr = b"qdel: job 2675 deleted\nqdel: Invalid job id specified\n" + +cr = sge_io.parse_cancel_output(exit_code=return_code, stdout=stdout, stderr=stderr) + +a = { + "parse_cancel_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "cancel_result_ref": json.dumps(cr.as_dict()), +} +mylist.append(a) + +with open("parse_cancel_output_inout.yaml", "w") as f: + yaml.dump(mylist, f, sort_keys=False) diff --git a/tests/test_data/io/sge/create_parse_job_output_inout.py b/tests/test_data/io/sge/create_parse_job_output_inout.py new file mode 100644 index 0000000..9f26335 --- /dev/null +++ b/tests/test_data/io/sge/create_parse_job_output_inout.py @@ -0,0 +1,92 @@ +import json + +import yaml + +from qtoolkit.core.exceptions import OutputParsingError +from qtoolkit.io.sge import SGEIO + +sge_io = SGEIO() + +mylist = [] + +# First case: successful job parsing +return_code = 0 +stdout = b""" + + + 270 + submit.script + matgenix-dwa + matgenix-dwa + (null) + r + 0 + 2023-10-11T11:08:17 + main.q + 1 + 1 + 00:05:00 + 96G + + +""" +stderr = b"" +job = sge_io.parse_job_output(exit_code=return_code, stdout=stdout, stderr=stderr) +a = { + "parse_job_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "job_ref": json.dumps(job.as_dict()), +} +mylist.append(a) + +# Second case: job parsing with invalid fields +return_code = 0 +stdout = b""" + + + 270 + submit.script + matgenix-dwa + matgenix-dwa + (null) + r + 0 + 2023-10-11T11:08:17 + main.q + a + 1 + a + 96G + + +""" +stderr = b"" +try: + job = sge_io.parse_job_output(exit_code=return_code, stdout=stdout, stderr=stderr) + job_dict = job.as_dict() +except OutputParsingError as e: + job_dict = {"error": str(e)} +a = { + "parse_job_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "job_ref": json.dumps(job_dict), +} +mylist.append(a) + +# Third case: empty stdout and stderr +return_code = 0 +stdout = b"" +stderr = b"" +job = sge_io.parse_job_output(exit_code=return_code, stdout=stdout, stderr=stderr) +a = { + "parse_job_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "job_ref": json.dumps(job.as_dict() if job is not None else None), +} +mylist.append(a) + +with open("parse_job_output_inout.yaml", "w") as f: + yaml.dump(mylist, f, sort_keys=False) diff --git a/tests/test_data/io/sge/create_parse_submit_output_inout.py b/tests/test_data/io/sge/create_parse_submit_output_inout.py new file mode 100644 index 0000000..b23ad81 --- /dev/null +++ b/tests/test_data/io/sge/create_parse_submit_output_inout.py @@ -0,0 +1,92 @@ +import json + +import yaml + +from qtoolkit.io.sge import SGEIO + +sge_io = SGEIO() + +mylist = [] + +# First case: invalid queue specified +return_code = 1 +stdout = b"" +stderr = ( + b"qsub: Invalid queue specified: abcd\n" + b"qsub: Job submission failed: Invalid queue name specified\n" +) + +sr = sge_io.parse_submit_output( + exit_code=return_code, stdout=stdout.decode(), stderr=stderr.decode() +) + +a = { + "parse_submit_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "submission_result_ref": json.dumps(sr.as_dict()), +} +mylist.append(a) + +# Second case: successful submission +return_code = 0 +stdout = b'Your job 24 ("submit.script") has been submitted\n' +stderr = b"" +sr = sge_io.parse_submit_output( + exit_code=return_code, stdout=stdout.decode(), stderr=stderr.decode() +) +a = { + "parse_submit_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "submission_result_ref": json.dumps(sr.as_dict()), +} +mylist.append(a) + +# Third case: another successful submission +return_code = 0 +stdout = b'Your job 15 ("submit.script") has been submitted\n' +stderr = b"" +sr = sge_io.parse_submit_output( + exit_code=return_code, stdout=stdout.decode(), stderr=stderr.decode() +) +a = { + "parse_submit_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "submission_result_ref": json.dumps(sr.as_dict()), +} +mylist.append(a) + +# Fourth case: successful job allocation +return_code = 0 +stdout = b'Your job 10 ("submit.script") has been submitted\n' +stderr = b"" +sr = sge_io.parse_submit_output( + exit_code=return_code, stdout=stdout.decode(), stderr=stderr.decode() +) +a = { + "parse_submit_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "submission_result_ref": json.dumps(sr.as_dict()), +} +mylist.append(a) + +# Fifth case: another successful job allocation +return_code = 0 +stdout = b'Your job 124 ("submit.script") has been submitted\n' +stderr = b"" +sr = sge_io.parse_submit_output( + exit_code=return_code, stdout=stdout.decode(), stderr=stderr.decode() +) +a = { + "parse_submit_kwargs": json.dumps( + {"exit_code": return_code, "stdout": stdout.decode(), "stderr": stderr.decode()} + ), + "submission_result_ref": json.dumps(sr.as_dict()), +} +mylist.append(a) + +with open("parse_submit_output_inout.yaml", "w") as f: + yaml.dump(mylist, f, sort_keys=False) diff --git a/tests/test_data/io/sge/parse_cancel_output_inout.yaml b/tests/test_data/io/sge/parse_cancel_output_inout.yaml new file mode 100644 index 0000000..1d9d105 --- /dev/null +++ b/tests/test_data/io/sge/parse_cancel_output_inout.yaml @@ -0,0 +1,12 @@ +- parse_cancel_kwargs: '{"exit_code": 0, "stdout": "", "stderr": "qdel: job 267 deleted\n"}' + cancel_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "CancelResult", "@version": "0.1.1", "job_id": "267", "step_id": null, "exit_code": 0, "stdout": "", "stderr": "qdel: job 267 deleted\n", "status": {"@module": "qtoolkit.core.data_objects", "@class": "CancelStatus", "@version": "0.1.1", "value": "SUCCESSFUL"}}' +- parse_cancel_kwargs: '{"exit_code": 1, "stdout": "", "stderr": "qdel: No job id specified\n"}' + cancel_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "CancelResult", "@version": "0.1.1", "job_id": null, "step_id": null, "exit_code": 1, "stdout": "", "stderr": "qdel: No job id specified\n", "status": {"@module": "qtoolkit.core.data_objects", "@class": "CancelStatus", "@version": "0.1.1", "value": "FAILED"}}' +- parse_cancel_kwargs: '{"exit_code": 210, "stdout": "", "stderr": "qdel: job 1 access denied\n"}' + cancel_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "CancelResult", "@version": "0.1.1", "job_id": null, "step_id": null, "exit_code": 210, "stdout": "", "stderr": "qdel: job 1 access denied\n", "status": {"@module": "qtoolkit.core.data_objects", "@class": "CancelStatus", "@version": "0.1.1", "value": "FAILED"}}' +- parse_cancel_kwargs: '{"exit_code": 1, "stdout": "", "stderr": "qdel: Invalid job id a\n"}' + cancel_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "CancelResult", "@version": "0.1.1", "job_id": null, "step_id": null, "exit_code": 1, "stdout": "", "stderr": "qdel: Invalid job id a\n", "status": {"@module": "qtoolkit.core.data_objects", "@class": "CancelStatus", "@version": "0.1.1", "value": "FAILED"}}' +- parse_cancel_kwargs: '{"exit_code": 0, "stdout": "", "stderr": "qdel: job 269 deleted\nqdel: job 269 already completed\n"}' + cancel_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "CancelResult", "@version": "0.1.1", "job_id": "269", "step_id": null, "exit_code": 0, "stdout": "", "stderr": "qdel: job 269 deleted\nqdel: job 269 already completed\n", "status": {"@module": "qtoolkit.core.data_objects", "@class": "CancelStatus", "@version": "0.1.1", "value": "SUCCESSFUL"}}' +- parse_cancel_kwargs: '{"exit_code": 0, "stdout": "", "stderr": "qdel: job 2675 deleted\nqdel: Invalid job id specified\n"}' + cancel_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "CancelResult", "@version": "0.1.1", "job_id": "2675", "step_id": null, "exit_code": 0, "stdout": "", "stderr": "qdel: job 2675 deleted\nqdel: Invalid job id specified\n", "status": {"@module": "qtoolkit.core.data_objects", "@class": "CancelStatus", "@version": "0.1.1", "value": "SUCCESSFUL"}}' diff --git a/tests/test_data/io/sge/parse_job_output_inout.yaml b/tests/test_data/io/sge/parse_job_output_inout.yaml new file mode 100644 index 0000000..9e4bc85 --- /dev/null +++ b/tests/test_data/io/sge/parse_job_output_inout.yaml @@ -0,0 +1,6 @@ +- parse_job_kwargs: '{"exit_code": 0, "stdout": "job_id: 270\njob_name: submit.script\nowner: matgenix-dwa\npriority: 4294901497\nstate: r\nstart_time: 2023-10-11T11:08:17\nend_time: 2023-10-11T11:13:17\nqueue_name: main\nslots: 1\nhard_wallclock: UNLIMITED\nstdout_path_list: /home/matgenix-dwa/software/qtoolkit/tests/test_data/io/sge/sge-270.out\nstderr_path_list: /home/matgenix-dwa/software/qtoolkit/tests/test_data/io/sge/sge-270.out\n"}' + job_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "QJob", "@version": "0.1.1", "name": "submit.script", "job_id": "270", "exit_status": null, "state": {"@module": "qtoolkit.core.data_objects", "@class": "QState", "@version": "0.1.1", "value": "RUNNING"}, "sub_state": {"@module": "qtoolkit.io.sge", "@class": "SGEState", "@version": "0.1.1", "value": "r"}, "info": {"@module": "qtoolkit.core.data_objects", "@class": "QJobInfo", "@version": "0.1.1", "memory": null, "memory_per_cpu": null, "nodes": 1, "cpus": 1, "threads_per_process": 1, "time_limit": null}, "account": "matgenix-dwa", "runtime": null, "queue_name": "main"}' +- parse_job_kwargs: '{"exit_code": 0, "stdout": "job_id: 270\njob_name: submit.script\nowner: matgenix-dwa\npriority: 4294901497\nstate: r\nstart_time: 2023-10-11T11:08:17\nend_time: 2023-10-11T11:13:17\nqueue_name: main\nslots: a\nhard_wallclock: a\nstdout_path_list: /home/matgenix-dwa/software/qtoolkit/tests/test_data/io/sge/sge-270.out\nstderr_path_list: /home/matgenix-dwa/software/qtoolkit/tests/test_data/io/sge/sge-270.out\n"}' + job_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "QJob", "@version": "0.1.1", "name": "submit.script", "job_id": "270", "exit_status": null, "state": {"@module": "qtoolkit.core.data_objects", "@class": "QState", "@version": "0.1.1", "value": "RUNNING"}, "sub_state": {"@module": "qtoolkit.io.sge", "@class": "SGEState", "@version": "0.1.1", "value": "r"}, "info": {"@module": "qtoolkit.core.data_objects", "@class": "QJobInfo", "@version": "0.1.1", "memory": null, "memory_per_cpu": null, "nodes": null, "cpus": null, "threads_per_process": null, "time_limit": null}, "account": "matgenix-dwa", "runtime": null, "queue_name": "main"}' +- parse_job_kwargs: '{"exit_code": 0, "stdout": "", "stderr": ""}' + job_ref: 'null' diff --git a/tests/test_data/io/sge/parse_submit_output_inout.yaml b/tests/test_data/io/sge/parse_submit_output_inout.yaml new file mode 100644 index 0000000..59b4350 --- /dev/null +++ b/tests/test_data/io/sge/parse_submit_output_inout.yaml @@ -0,0 +1,18 @@ +- parse_submit_kwargs: '{"exit_code": 1, "stdout": "", "stderr": "qsub: error: invalid queue specified: abcd\nqsub: error: Batch job submission failed: Invalid queue name specified\n"}' + submission_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "SubmissionResult", "@version": "0.0.1+d20230127", "job_id": null, "step_id": null, "exit_code": 1, "stdout": "", "stderr": "qsub: error: invalid queue specified: abcd\nqsub: error: Batch job submission failed: Invalid queue name specified\n", "status": {"@module": "qtoolkit.core.data_objects", "@class": "SubmissionStatus", "@version": "0.0.1+d20230127", "value": "FAILED"}}' +- parse_submit_kwargs: '{"exit_code": 0, "stdout": "Your job 24 (\"submit.script\") has been submitted\n", "stderr": ""}' + submission_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "SubmissionResult", "@version": "0.0.1+d20230127", "job_id": "24", "step_id": null, "exit_code": 0, "stdout": "Your job 24 (\"submit.script\") has been submitted\n", "stderr": "", "status": {"@module": "qtoolkit.core.data_objects", "@class": "SubmissionStatus", "@version": "0.0.1+d20230127", "value": "SUCCESSFUL"}}' +- parse_submit_kwargs: '{"exit_code": 0, "stdout": "Your job 15 (\"submit.script\") has been submitted\n", "stderr": ""}' + submission_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "SubmissionResult", "@version": "0.0.1+d20230127", "job_id": "15", "step_id": null, "exit_code": 0, "stdout": "Your job 15 (\"submit.script\") has been submitted\n", "stderr": "", "status": {"@module": "qtoolkit.core.data_objects", "@class": "SubmissionStatus", "@version": "0.0.1+d20230127", "value": "SUCCESSFUL"}}' +- parse_submit_kwargs: '{"exit_code": 0, "stdout": "Your job 10 (\"submit.script\") has been submitted\n", "stderr": ""}' + submission_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "SubmissionResult", "@version": "0.0.1+d20230127", "job_id": "10", "step_id": null, "exit_code": 0, "stdout": "Your job 10 (\"submit.script\") has been submitted\n", "stderr": "", "status": {"@module": "qtoolkit.core.data_objects", "@class": "SubmissionStatus", "@version": "0.0.1+d20230127", "value": "SUCCESSFUL"}}' +- parse_submit_kwargs: '{"exit_code": 0, "stdout": "Your job 124 (\"submit.script\") has been submitted\n", "stderr": ""}' + submission_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "SubmissionResult", "@version": "0.0.1+d20230127", "job_id": "124", "step_id": null, "exit_code": 0, "stdout": "Your job 124 (\"submit.script\") has been submitted\n", "stderr": "", "status": {"@module": "qtoolkit.core.data_objects", "@class": "SubmissionStatus", "@version": "0.0.1+d20230127", "value": "SUCCESSFUL"}}' +- parse_submit_kwargs: '{"exit_code": 0, "stdout": "Your job 24 (\"submit.script\") has been submitted\n", "stderr": ""}' + submission_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "SubmissionResult", "@version": "0.0.1+d20230127", "job_id": "24", "step_id": null, "exit_code": 0, "stdout": "Your job 24 (\"submit.script\") has been submitted\n", "stderr": "", "status": {"@module": "qtoolkit.core.data_objects", "@class": "SubmissionStatus", "@version": "0.0.1+d20230127", "value": "SUCCESSFUL"}}' +- parse_submit_kwargs: '{"exit_code": 0, "stdout": "Your job 15 (\"submit.script\") has been submitted\n", "stderr": ""}' + submission_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "SubmissionResult", "@version": "0.0.1+d20230127", "job_id": "15", "step_id": null, "exit_code": 0, "stdout": "Your job 15 (\"submit.script\") has been submitted\n", "stderr": "", "status": {"@module": "qtoolkit.core.data_objects", "@class": "SubmissionStatus", "@version": "0.0.1+d20230127", "value": "SUCCESSFUL"}}' +- parse_submit_kwargs: '{"exit_code": 0, "stdout": "Your job 10 (\"submit.script\") has been submitted\n", "stderr": ""}' + submission_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "SubmissionResult", "@version": "0.0.1+d20230127", "job_id": "10", "step_id": null, "exit_code": 0, "stdout": "Your job 10 (\"submit.script\") has been submitted\n", "stderr": "", "status": {"@module": "qtoolkit.core.data_objects", "@class": "SubmissionStatus", "@version": "0.0.1+d20230127", "value": "SUCCESSFUL"}}' +- parse_submit_kwargs: '{"exit_code": 0, "stdout": "Your job 124 (\"submit.script\") has been submitted\n", "stderr": ""}' + submission_result_ref: '{"@module": "qtoolkit.core.data_objects", "@class": "SubmissionResult", "@version": "0.0.1+d20230127", "job_id": "124", "step_id": null, "exit_code": 0, "stdout": "Your job 124 (\"submit.script\") has been submitted\n", "stderr": "", "status": {"@module": "qtoolkit.core.data_objects", "@class": "SubmissionStatus", "@version": "0.0.1+d20230127", "value": "SUCCESSFUL"}}'