Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of SGE interface #43

Merged
merged 37 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
6fd2c7c
Initial commit for SGE interface
QuantumChemist Aug 2, 2024
e4a57ef
adding extremely generic code
QuantumChemist Aug 2, 2024
f602de7
adding SGEIO and SGEState to init
QuantumChemist Aug 2, 2024
a119af5
add very generic tests based on the slurm tests (not all do pass)
QuantumChemist Aug 2, 2024
d77800e
finalized first implemetation
QuantumChemist Aug 7, 2024
a1911e3
Update src/qtoolkit/io/sge.py
QuantumChemist Aug 16, 2024
982627d
made nodes indeterminated
QuantumChemist Sep 19, 2024
d782fb0
replaced not working links
QuantumChemist Sep 19, 2024
28db648
removed queue states
QuantumChemist Sep 19, 2024
6f58cd2
removed queue states
QuantumChemist Sep 19, 2024
a8ed691
remapped some job states to QState.FAILED and QState.SUSPENDED, added…
QuantumChemist Sep 19, 2024
e153da1
changed OutputParsingError to CommandFailedError
QuantumChemist Sep 19, 2024
ae01b4f
set cpus and nodes to None in case of ValueError
QuantumChemist Sep 19, 2024
23dae18
using sge_state.qstate
QuantumChemist Sep 19, 2024
28732af
added get_job_executable to SGEIO class initialization
QuantumChemist Sep 19, 2024
58ccd31
split_separator not needed
QuantumChemist Sep 19, 2024
e6e67b3
making the message for not being able to query per job_list more clearer
QuantumChemist Sep 19, 2024
85afc3b
removing account from _qresources_mapping
QuantumChemist Sep 19, 2024
27346a5
fixing unit tests
QuantumChemist Sep 19, 2024
08b513c
fixed unit tests
QuantumChemist Sep 19, 2024
2a74639
Merge branch 'Matgenix:develop' into sge
QuantumChemist Sep 19, 2024
573641e
added test_submission_script with adjusted settings for SGE
QuantumChemist Sep 19, 2024
8d31364
seeting swt to 0.99*hwt
QuantumChemist Sep 20, 2024
9a000a5
introduced parent class PBEIOBase to unify PBSIO and SGEIO functional…
QuantumChemist Sep 20, 2024
fee9e75
changing _convert_str_to_time for PBSIOBase
QuantumChemist Sep 20, 2024
ffbc462
fixing unit tests because of soft walltime changes
QuantumChemist Sep 20, 2024
eaa9f33
implement that in SGE one can also query by a job_list and move simil…
QuantumChemist Sep 20, 2024
09e3748
adjust test_get_jobs_list_cmd for the changes
QuantumChemist Sep 20, 2024
e34a169
renamed _get_base_command to _get_qstat_base_command
QuantumChemist Sep 25, 2024
ad8077f
moved _qresources_mapping to the subclasses
QuantumChemist Sep 25, 2024
3fae987
moved _qresources_mapping to the subclasses
QuantumChemist Sep 25, 2024
a46aebd
moved _qresources_mapping to the subclasses
QuantumChemist Sep 25, 2024
74d1867
moverd system_name, default_unit and power_labels to class attributes
QuantumChemist Sep 25, 2024
e968913
remove scope=session from maximalist_qresources
QuantumChemist Sep 25, 2024
5b672ad
made instance attributes to class attributes
QuantumChemist Sep 30, 2024
c85b3ea
Merge branch 'Matgenix:develop' into sge
QuantumChemist Oct 24, 2024
383aada
Update sge.py
QuantumChemist Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/qtoolkit/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from qtoolkit.io.base import BaseSchedulerIO
from qtoolkit.io.pbs import PBSIO, PBSState
from qtoolkit.io.sge import SGEIO, SGEState
from qtoolkit.io.shell import ShellIO, ShellState
from qtoolkit.io.slurm import SlurmIO, SlurmState

scheduler_mapping = {"slurm": SlurmIO, "pbs": PBSIO, "shell": ShellIO}
scheduler_mapping = {"slurm": SlurmIO, "pbs": PBSIO, "sge": SGEIO, "shell": ShellIO}
257 changes: 26 additions & 231 deletions src/qtoolkit/io/pbs.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,10 @@
from __future__ import annotations

import re
from datetime import timedelta

from qtoolkit.core.data_objects import (
CancelResult,
CancelStatus,
ProcessPlacement,
QJob,
QJobInfo,
QResources,
QState,
QSubState,
SubmissionResult,
SubmissionStatus,
)
from qtoolkit.core.exceptions import OutputParsingError, UnsupportedResourcesError
from qtoolkit.io.base import BaseSchedulerIO

from qtoolkit.core.data_objects import QJob, QJobInfo, QState, QSubState
from qtoolkit.core.exceptions import OutputParsingError
from qtoolkit.io.pbs_base import PBSIOBase

# States in PBS from qstat's man.
# B Array job: at least one subjob has started.
Expand Down Expand Up @@ -79,7 +67,7 @@ def qstate(self) -> QState:
}


class PBSIO(BaseSchedulerIO):
class PBSIO(PBSIOBase):
header_template: str = """
#PBS -q $${queue}
#PBS -N $${job_name}
Expand All @@ -100,90 +88,37 @@ class PBSIO(BaseSchedulerIO):

SUBMIT_CMD: str | None = "qsub"
CANCEL_CMD: str | None = "qdel"
system_name: str = "PBS"
default_unit: str = "mb"
power_labels: dict = {"kb": 0, "mb": 1, "gb": 2, "tb": 3}
_qresources_mapping: dict = {
"queue_name": "queue",
"job_name": "job_name",
"account": "account",
"priority": "priority",
"output_filepath": "qout_path",
"error_filepath": "qerr_path",
"project": "group_list",
}

def parse_submit_output(self, exit_code, stdout, stderr) -> SubmissionResult:
if isinstance(stdout, bytes):
stdout = stdout.decode()
if isinstance(stderr, bytes):
stderr = stderr.decode()
if exit_code != 0:
return SubmissionResult(
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
status=SubmissionStatus("FAILED"),
)
job_id = stdout.strip()
status = (
SubmissionStatus("SUCCESSFUL")
if job_id
else SubmissionStatus("JOB_ID_UNKNOWN")
)
return SubmissionResult(
job_id=job_id,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
status=status,
)

def parse_cancel_output(self, exit_code, stdout, stderr) -> CancelResult:
"""Parse the output of the scancel command."""
# Possible error messages:
# qdel: Unknown Job Id 100
# qdel: Job has finished 1004
# Correct execution: no output
if isinstance(stdout, bytes):
stdout = stdout.decode()
if isinstance(stderr, bytes):
stderr = stderr.decode()
if exit_code != 0:
return CancelResult(
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
status=CancelStatus("FAILED"),
)

# PBS does not return the job id if the job is successfully deleted
status = CancelStatus("SUCCESSFUL")
return CancelResult(
job_id=None,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
status=status,
)

def _get_job_cmd(self, job_id: str):
cmd = f"qstat -f {job_id}"
def extract_job_id(self, stdout):
return stdout.strip()

return cmd
def extract_job_id_from_cancel(self, stderr):
# PBS doesn't return the job ID if successfully canceled, so return None
return None

def parse_job_output(self, exit_code, stdout, stderr) -> QJob | None:
out = self.parse_jobs_list_output(exit_code, stdout, stderr)
if out:
return out[0]
return None

def _get_jobs_list_cmd(
self, job_ids: list[str] | None = None, user: str | None = None
) -> str:
if user and job_ids:
raise ValueError("Cannot query by user and job(s) in PBS")

command = [
"qstat",
"-f",
]
def _get_qstat_base_command(self) -> list[str]:
return ["qstat", "-f"]

if user:
command.append(f"-u {user}")

if job_ids:
command.append(" ".join(job_ids))

return " ".join(command)
def _get_job_ids_flag(self, job_ids_str: str) -> str:
return job_ids_str

def parse_jobs_list_output(self, exit_code, stdout, stderr) -> list[QJob]:
if isinstance(stdout, bytes):
Expand Down Expand Up @@ -316,143 +251,3 @@ def _convert_str_to_time(time_str: str | None):
raise OutputParsingError()

return time[3] * 86400 + time[2] * 3600 + time[1] * 60 + time[0]

@staticmethod
def _convert_memory_str(memory: str | None) -> int | None:
if not memory:
return None

match = re.match(r"([0-9]+)([a-zA-Z]*)", memory)
if not match:
raise OutputParsingError("No numbers and units parsed")
memory, units = match.groups()

power_labels = {"kb": 0, "mb": 1, "gb": 2, "tb": 3}

if not units:
units = "mb"
elif units not in power_labels:
raise OutputParsingError(f"Unknown units {units}")
try:
v = int(memory)
except ValueError:
raise OutputParsingError

return v * (1024 ** power_labels[units])

# helper attribute to match the values defined in QResources and
# the dictionary that should be passed to the template
_qresources_mapping = {
"queue_name": "queue",
"job_name": "job_name",
"account": "account",
"priority": "priority",
"output_filepath": "qout_path",
"error_filepath": "qerr_path",
"project": "group_list",
}

@staticmethod
def _convert_time_to_str(time: int | float | timedelta) -> str:
if not isinstance(time, timedelta):
time = timedelta(seconds=time)

hours, remainder = divmod(int(time.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)

time_str = f"{hours}:{minutes}:{seconds}"
return time_str

def _convert_qresources(self, resources: QResources) -> dict:
"""
Converts a QResources instance to a dict that will be used to fill in the
header of the submission script.
"""

header_dict = {}
for qr_field, pbs_field in self._qresources_mapping.items():
val = getattr(resources, qr_field)
if val is not None:
header_dict[pbs_field] = val

if resources.njobs and resources.njobs > 1:
header_dict["array"] = f"1-{resources.njobs}"

if resources.time_limit:
header_dict["walltime"] = self._convert_time_to_str(resources.time_limit)

if resources.rerunnable is not None:
header_dict["rerunnable"] = "y" if resources.rerunnable else "n"

nodes, processes, processes_per_node = resources.get_processes_distribution()
select = None
if resources.process_placement == ProcessPlacement.NO_CONSTRAINTS:
select = f"select={processes}"
if resources.threads_per_process:
select += f":ncpus={resources.threads_per_process}"
select += f":ompthreads={resources.threads_per_process}"
if resources.memory_per_thread:
threads_per_process = resources.threads_per_process or 1
select += f":mem={threads_per_process * resources.memory_per_thread}mb"
elif resources.process_placement in (
ProcessPlacement.EVENLY_DISTRIBUTED,
ProcessPlacement.SAME_NODE,
ProcessPlacement.SCATTERED,
):
select = f"select={nodes}"
if resources.threads_per_process and resources.threads_per_process > 1:
cpus = resources.threads_per_process * processes_per_node
ompthreads = resources.threads_per_process
else:
cpus = processes_per_node
ompthreads = None
select += f":ncpus={cpus}"
select += f":mpiprocs={processes_per_node}"
if ompthreads:
select += f":ompthreads={ompthreads}"
if resources.memory_per_thread:
mem = cpus * resources.memory_per_thread
select += f":mem={mem}mb"

if resources.process_placement in (
ProcessPlacement.EVENLY_DISTRIBUTED,
ProcessPlacement.SCATTERED,
):
header_dict["place"] = "scatter"
elif resources.process_placement == ProcessPlacement.SAME_NODE:
header_dict["place"] = "pack"
else:
msg = f"process placement {resources.process_placement} is not supported for PBS"
raise UnsupportedResourcesError(msg)

header_dict["select"] = select

if resources.email_address:
header_dict["mail_user"] = resources.email_address
header_dict["mail_type"] = "abe"

if resources.scheduler_kwargs:
header_dict.update(resources.scheduler_kwargs)

return header_dict

@property
def supported_qresources_keys(self) -> list:
"""
List of attributes of QResources that are correctly handled by the
_convert_qresources method. It is used to validate that the user
does not pass an unsupported value, expecting to have an effect.
"""
supported = list(self._qresources_mapping.keys())
supported += [
"njobs",
"time_limit",
"processes",
"processes_per_node",
"process_placement",
"nodes",
"threads_per_process",
"email_address",
"scheduler_kwargs",
]
return supported
Loading
Loading