Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slurm and QResources updates #38

Merged
merged 8 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/qtoolkit/core/data_objects.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import abc
from dataclasses import dataclass
from dataclasses import dataclass, fields
from pathlib import Path

from qtoolkit.core.base import QTKEnum, QTKObject
Expand Down Expand Up @@ -58,6 +58,20 @@ class QState(QTKEnum):
queue manager (e.g. PBS, SLURM, ...) needs to be
defined.

UNDETERMINED: The job status cannot be determined. This is a permanent
issue, not being solvable by asking again for the job state.
QUEUED: The job is queued for being scheduled and executed.
QUEUED HELD: The job has been placed on hold by the system, the
administrator, or the submitting user.
RUNNING: The job is running on an execution host.
SUSPENDED: The job has been suspended by the user, the system or the
administrator.
REQUEUED: The job was re-queued by the DRM system, and is eligible to run.
REQUEUED HELD: The job was re-queued by the DRM system, and is currently
placed on hold by the system, the administrator, or the submitting user.
DONE: The job finished without an error.
FAILED: The job exited abnormally before finishing.

Note that not all these standardized states are available in the
actual queue manager implementations.
"""
Expand Down Expand Up @@ -179,14 +193,36 @@ def __post_init__(self):
self.process_placement = ProcessPlacement.NO_CONSTRAINTS # type: ignore # due to QTKEnum
elif self.nodes and self.processes_per_node and not self.processes:
self.process_placement = ProcessPlacement.EVENLY_DISTRIBUTED
else:
elif not self._check_no_values():
msg = (
"When process_placement is None either define only nodes "
"plus processes_per_node or only processes"
"plus processes_per_node or only processes to get a default value. "
"Otherwise all the fields must be empty."
)
raise UnsupportedResourcesError(msg)
self.scheduler_kwargs = self.scheduler_kwargs or {}

def _check_no_values(self) -> bool:
"""
Check if all the attributes are None or empty.
"""
for f in fields(self):
if self.__getattribute__(f.name):
return False

return True

def check_empty(self) -> bool:
"""
Check if the QResouces is empty and its content is coherent.
Raises an error if process_placement is None, but some attributes are set.
"""
if self.process_placement is not None:
return False
if not self._check_no_values():
raise ValueError("process_placement is None, but some values are set")
return True

@classmethod
def no_constraints(cls, processes, **kwargs):
if "nodes" in kwargs or "processes_per_node" in kwargs:
Expand Down
11 changes: 6 additions & 5 deletions src/qtoolkit/io/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def generate_header(self, options: dict | QResources | None) -> str:
options = options or {}

if isinstance(options, QResources):
options = self.check_convert_qresources(options)
if not options.check_empty():
options = self.check_convert_qresources(options)

template = QTemplate(self.header_template)

Expand Down Expand Up @@ -171,12 +172,12 @@ def check_convert_qresources(self, resources: QResources) -> dict:
Also checks that passed values are declared to be handled by the corresponding
subclass.
"""
not_none = set()
not_empty = set()
for field in fields(resources):
if getattr(resources, field.name) is not None:
not_none.add(field.name)
if getattr(resources, field.name):
not_empty.add(field.name)

unsupported_options = not_none.difference(self.supported_qresources_keys)
unsupported_options = not_empty.difference(self.supported_qresources_keys)

if unsupported_options:
msg = f"Keys not supported: {', '.join(sorted(unsupported_options))}"
Expand Down
8 changes: 6 additions & 2 deletions src/qtoolkit/io/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,13 @@ def _convert_qresources(self, resources: QResources) -> dict:
"""
Converts a QResources instance to a dict that will be used to fill in the
header of the submission script.
Not implemented for ShellIO
Only an empty QResources is accepted in ShellIO.
"""
raise UnsupportedResourcesError # pragma: no cover
if not resources.check_empty():
gpetretto marked this conversation as resolved.
Show resolved Hide resolved
raise UnsupportedResourcesError(
"Only empty QResources is supported"
) # pragma: no cover
return {}

@property
def supported_qresources_keys(self) -> list:
Expand Down
24 changes: 23 additions & 1 deletion src/qtoolkit/io/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@


class SlurmState(QSubState):
BOOT_FAIL = "BOOT_FAIL", "BF"
CANCELLED = "CANCELLED", "CA"
COMPLETING = "COMPLETING", "CG"
COMPLETED = "COMPLETED", "CD"
Expand All @@ -97,7 +98,17 @@ class SlurmState(QSubState):
NODE_FAIL = "NODE_FAIL", "NF"
OUT_OF_MEMORY = "OUT_OF_MEMORY", "OOM"
PENDING = "PENDING", "PD"
PREEMPTED = "PREEMPTED", "PR"
RESV_DEL_HOLD = "RESV_DEL_HOLD", "RD"
REQUEUE_FED = "REQUEUE_FED", "RF"
REQUEUE_HOLD = "REQUEUE_HOLD", "RH"
RESIZING = "RESIZING", "RS"
REVOKED = "REVOKED", "RV"
RUNNING = "RUNNING", "R"
SIGNALING = "SIGNALING", "SI"
SPECIAL_EXIT = "SPECIAL_EXIT", "SE"
STAGE_OUT = "STAGE_OUT", "SO"
STOPPED = "STOPPED", "ST"
SUSPENDED = "SUSPENDED", "S"
TIMEOUT = "TIMEOUT", "TO"

Expand All @@ -108,7 +119,8 @@ def qstate(self) -> QState:


_STATUS_MAPPING = {
SlurmState.CANCELLED: QState.SUSPENDED, # Should this be failed ?
SlurmState.BOOT_FAIL: QState.FAILED,
SlurmState.CANCELLED: QState.FAILED,
SlurmState.COMPLETING: QState.RUNNING,
SlurmState.COMPLETED: QState.DONE,
SlurmState.CONFIGURING: QState.QUEUED,
Expand All @@ -117,7 +129,17 @@ def qstate(self) -> QState:
SlurmState.NODE_FAIL: QState.FAILED,
SlurmState.OUT_OF_MEMORY: QState.FAILED,
SlurmState.PENDING: QState.QUEUED,
SlurmState.PREEMPTED: QState.FAILED,
SlurmState.RESV_DEL_HOLD: QState.QUEUED_HELD,
SlurmState.REQUEUE_FED: QState.REQUEUED, # ambiguous conversion. Could also be QUEUED,
SlurmState.REQUEUE_HOLD: QState.REQUEUED, # QUEUED_HELD or SUSPENDED
SlurmState.RESIZING: QState.RUNNING,
SlurmState.REVOKED: QState.FAILED,
SlurmState.RUNNING: QState.RUNNING,
SlurmState.SIGNALING: QState.RUNNING,
SlurmState.SPECIAL_EXIT: QState.FAILED,
SlurmState.STAGE_OUT: QState.RUNNING,
SlurmState.STOPPED: QState.RUNNING,
SlurmState.SUSPENDED: QState.SUSPENDED,
SlurmState.TIMEOUT: QState.FAILED,
}
Expand Down
30 changes: 30 additions & 0 deletions tests/core/test_data_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,17 @@ def test_no_process_placement(self):
):
QResources(processes=8, processes_per_node=2)

with pytest.raises(
UnsupportedResourcesError,
match=r"When process_placement is None either define only nodes "
r"plus processes_per_node or only processes to get a default value. "
r"Otherwise all the fields must be empty.",
):
QResources(project="xxx")

# This is acceptable for empty process placement and no details passed
assert QResources()

@pytest.mark.skipif(monty is None, reason="monty is not installed")
def test_msonable(self, test_utils):
qr1 = QResources(
Expand Down Expand Up @@ -534,6 +545,25 @@ def test_get_processes_distribution(self):
)
proc_distr = qr.get_processes_distribution()
assert proc_distr == ["a", "b", "c"]
qr = QResources(
process_placement=None,
)
proc_distr = qr.get_processes_distribution()
assert proc_distr == [None, None, None]

def test_is_empty(self):
qr = QResources()
assert qr.check_empty()

qr = QResources(process_placement=ProcessPlacement.NO_CONSTRAINTS, processes=10)
assert not qr.check_empty()

qr = QResources(process_placement=None)
qr.processes = 10
with pytest.raises(
ValueError, match="process_placement is None, but some values are set"
):
qr.check_empty()


class TestQJobInfo:
Expand Down
16 changes: 15 additions & 1 deletion tests/io/test_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,24 @@ def test_check_convert_qresources(self, shell_io):
qr = QResources(processes=1)
with pytest.raises(
UnsupportedResourcesError,
match=r"Keys not supported: process_placement, processes, scheduler_kwargs",
match=r"Keys not supported: process_placement, processes",
):
shell_io.check_convert_qresources(qr)

qr = QResources()
assert shell_io.check_convert_qresources(qr) == {}

def test_convert_qresources(self, shell_io):
qr = QResources(processes=1)
with pytest.raises(
UnsupportedResourcesError,
match=r"Only empty QResources is supported",
):
shell_io._convert_qresources(qr)

qr = QResources()
assert shell_io._convert_qresources(qr) == {}

def test_header(self, shell_io):
# check that the required elements are properly handled in header template
options = {
Expand Down
Loading