Skip to content

Commit

Permalink
Merge pull request #29 from Matgenix/unit_tests
Browse files Browse the repository at this point in the history
Unit tests
  • Loading branch information
gpetretto authored Jan 25, 2024
2 parents 6184be9 + 10b8838 commit f960941
Show file tree
Hide file tree
Showing 16 changed files with 1,031 additions and 235 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ filterwarnings = [
]

[tool.coverage.run]
include = ["src/*"]
parallel = true
branch = true

Expand All @@ -107,6 +106,7 @@ exclude_lines = [
'^\s*assert False(,|$)',
'if typing.TYPE_CHECKING:',
'^\s*@overload( |$)',
'# pragma: no cover',
]

[tool.autoflake]
Expand Down
2 changes: 1 addition & 1 deletion src/qtoolkit/core/data_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __repr__(self):
@property
@abc.abstractmethod
def qstate(self) -> QState:
raise NotImplementedError
raise NotImplementedError # pragma: no cover


class ProcessPlacement(QTKEnum):
Expand Down
139 changes: 5 additions & 134 deletions src/qtoolkit/io/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_identifiers(self) -> list:
named is None
and mo.group("invalid") is None
and mo.group("escaped") is None
):
): # pragma: no cover - no complex patterns, part of python stdlib 3.11
# If all the groups are None, there must be
# another group we're not expecting
raise ValueError("Unrecognized named group in pattern", self.pattern)
Expand All @@ -47,142 +47,12 @@ class BaseSchedulerIO(QTKObject, abc.ABC):

shebang: str = "#!/bin/bash"

# config: QueueConfig = None

# scheduler = None,
# name = None,
# cores = None,
# memory = None,
# processes = None,
# nanny = True,
# protocol = None,
# security = None,
# interface = None,
# death_timeout = None,
# local_directory = None,
# extra = None,
# worker_extra_args = None,
# job_extra = None,
# job_extra_directives = None,
# env_extra = None,
# job_script_prologue = None,
# header_skip = None,
# job_directives_skip = None,
# log_directory = None,
# shebang = None,
# python = sys.executable,
# job_name = None,
# config_name = None,

"""ABIPY
Args:
qname: Name of the queue.
qparams: Dictionary with the parameters used in the template.
setup: String or list of commands to execute during the initial setup.
modules: String or list of modules to load before running the application.
shell_env: Dictionary with the environment variables to export before
running the application.
omp_env: Dictionary with the OpenMP variables.
pre_run: String or list of commands to execute before launching the
calculation.
post_run: String or list of commands to execute once the calculation is
completed.
mpi_runner: Path to the MPI runner or :class:`MpiRunner` instance.
None if not used
mpi_runner_options: Optional string with options passed to the mpi_runner.
max_num_launches: Maximum number of submissions that can be done for a
specific task. Defaults to 5
qverbatim:
min_cores, max_cores, hint_cores: Minimum, maximum, and hint limits of
number of cores that can be used
min_mem_per_proc=Minimum memory per process in megabytes.
max_mem_per_proc=Maximum memory per process in megabytes.
timelimit: initial time limit in seconds
timelimit_hard: hard limelimit for this queue
priority: Priority level, integer number > 0
condition: Condition object (dictionary)
"""

def get_submission_script(
self,
commands: str | list[str],
options: dict | QResources | None = None,
) -> str:
"""
This is roughly what/how it is done in the existing solutions.
abipy: done with a str template (using $$ as a delimiter).
Remaining "$$" delimiters are then removed at the end.
It uses a ScriptEditor object to add/modify things to the templated script.
The different steps of "get_script_str(...)" in abipy are summarized here:
- _header, based on the str template (includes the shebang line and all
#SBATCH, #PBS, ... directives)
- change directory (added by the script editor)
- setup section, list of commands executed before running (added
by the script editor)
- load modules section, list of modules to be loaded before running
(added by the script editor)
- setting of openmp environment variables (added by the script editor)
- setting of shell environment variables (added by the script editor)
- prerun, i.e. commands to run before execution, again? (added by
the script editor)
- run line (added by the script editor)
- postrun (added by the script editor)
aiida: done with a class template (JobTemplate) that should contain
the required info to generate the job header. Other class templates
are also used inside the generation, e.g. JobTemplateCodesInfo, which
defines the command(s) to be run. The JobTemplate is only used as a
container of the information and the script is generated not using
templating but rather directly using python methods based on that
"JobTemplate" container. Actually this JobTemplate is based on the
DRMAA v2 specifications and many other objects are based on that too
(e.g. machine, slots, etc ...).
The different steps of "get_submit_script(...)" in aiida are summarized here:
- shebang line
- _header
- all #SBATCH, #PBS etc ... lines defining the resources and other
info for the queuing system
- some custom lines if it is not dealt with by the template
- environment variables
- prepend_text (something to be written before the run lines)
- _run_line (defines the code execution(s) based on a CodeInfo object).
There can be several codes run.
- append_text (something to be written after the run lines)
- _footer (some post commands done after the run) [note this is only
done/needed for LSF in aiida]
fireworks: done with a str template. similar to abipy (actually abipy took
its initial concept from fireworks)
dask_jobqueue: quite obscure ... the job header is done in the init of a
given JobCluster (e.g. SLURMCluster) based on something in the actual
Job object itself. Dask is not really meant to be our use case anyway.
dpdispatcher: uses python's format() with 5 templates, combined into
another python's format "script" template.
Here are the steps:
- header (includes shebang and #SBATCH, #PBS, ... directives)
- custom directives
- script environment (modules, environment variables, source
somefiles, ...)
- run command
- append script lines
In the templates of the different steps, there are some dpdispatcher's
specific things (e.g. tag a job as finished by touching a file, ...)
jobqueues: Some queues are using pure python (PBS, LSF, ...), some are
using jinja2 templates (SLURM and SGE). Directly written to file.
myqueue: the job queue directives are directly passed to the submit
command (no #SBATCH, #PBS, ...).
troika: uses a generic generator with a list of directives as well
as a directive prefix. These directives are defined in specific
files for each type of job queue.
"""
"""Get the submission script for the given commands and options."""
script_blocks = [self.shebang]
if header := self.generate_header(options):
script_blocks.append(header)
Expand Down Expand Up @@ -213,7 +83,7 @@ def generate_header(self, options: dict | QResources | None) -> str:
keys = set(options.keys())
extra = keys.difference(template.get_identifiers())
if extra:
msg = f"The following keys are not present in the template: {', '.join(extra)}"
msg = f"The following keys are not present in the template: {', '.join(sorted(extra))}"
raise ValueError(msg)

unclean_header = template.safe_substitute(options)
Expand Down Expand Up @@ -271,8 +141,9 @@ def get_cancel_cmd(self, job: QJob | int | str) -> str:
"""
job_id = job.job_id if isinstance(job, QJob) else job
if job_id is None or job_id == "":
received = None if job_id is None else "'' (empty string)"
raise ValueError(
f"The id of the job to be cancelled should be defined. Received: {job_id}"
f"The id of the job to be cancelled should be defined. Received: {received}"
)
return f"{self.CANCEL_CMD} {job_id}"

Expand Down
17 changes: 16 additions & 1 deletion src/qtoolkit/io/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,21 @@ def _get_job_cmd(self, job_id: str):
return cmd

def parse_job_output(self, exit_code, stdout, stderr) -> QJob | None:
"""Parse the output of the ps command and return the corresponding QJob object.
If the ps command returns multiple shell jobs, only the first corresponding
QJob is returned.
#TODO: should we check that there is only one job here ?
Parameters
----------
exit_code : int
Exit code of the ps command.
stdout : str
Standard output of the ps command.
stderr : str
Standard error of the ps command.
"""
out = self.parse_jobs_list_output(exit_code, stdout, stderr)
if out:
return out[0]
Expand Down Expand Up @@ -242,7 +257,7 @@ def _convert_qresources(self, resources: QResources) -> dict:
header of the submission script.
Not implemented for ShellIO
"""
raise UnsupportedResourcesError
raise UnsupportedResourcesError # pragma: no cover

@property
def supported_qresources_keys(self) -> list:
Expand Down
23 changes: 14 additions & 9 deletions src/qtoolkit/io/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ def parse_cancel_output(self, exit_code, stdout, stderr) -> CancelResult:
status = (
CancelStatus("SUCCESSFUL") if job_id else CancelStatus("JOB_ID_UNKNOWN")
)
# TODO: when cancelling a job already completed or cancelled, exit_code is 0
# should we set the CancelStatus to FAILED ? Same if the job does not exist.
return CancelResult(
job_id=job_id,
exit_code=exit_code,
Expand All @@ -259,9 +261,9 @@ def _get_job_cmd(self, job_id: str):
if self.get_job_executable == "scontrol":
# -o is to get the output as a one-liner
cmd = f"SLURM_TIME_FORMAT='standard' scontrol show job -o {job_id}"
elif self.get_job_executable == "sacct":
elif self.get_job_executable == "sacct": # pragma: no cover
raise NotImplementedError("sacct for get_job not yet implemented.")
else:
else: # pragma: no cover
raise RuntimeError(
f'"{self.get_job_executable}" is not a valid get_job_executable.'
)
Expand All @@ -279,9 +281,9 @@ def parse_job_output(self, exit_code, stdout, stderr) -> QJob | None:

if self.get_job_executable == "scontrol":
parsed_output = self._parse_scontrol_cmd_output(stdout=stdout)
elif self.get_job_executable == "sacct":
elif self.get_job_executable == "sacct": # pragma: no cover
raise NotImplementedError("sacct for get_job not yet implemented.")
else:
else: # pragma: no cover
raise RuntimeError(
f'"{self.get_job_executable}" is not a valid get_job_executable.'
)
Expand All @@ -294,27 +296,27 @@ def parse_job_output(self, exit_code, stdout, stderr) -> QJob | None:

try:
memory_per_cpu = self._convert_memory_str(parsed_output["MinMemoryCPU"])
except OutputParsingError:
except (OutputParsingError, KeyError):
memory_per_cpu = None

try:
nodes = int(parsed_output["NumNodes"])
except ValueError:
except (ValueError, KeyError):
nodes = None

try:
cpus = int(parsed_output["NumCPUs"])
except ValueError:
except (ValueError, KeyError):
cpus = None

try:
cpus_task = int(parsed_output["CPUs/Task"])
except ValueError:
except (ValueError, KeyError):
cpus_task = None

try:
time_limit = self._convert_str_to_time(parsed_output["TimeLimit"])
except OutputParsingError:
except (OutputParsingError, KeyError):
time_limit = None

info = QJobInfo(
Expand Down Expand Up @@ -497,6 +499,9 @@ def _convert_memory_str(memory: str | None) -> int | None:
if not memory:
return None

# TODO: @GP not sure I get what is this line here
# Shouldn't it be all(u not in memory for u in ("K", "M", "G", "T"))?
# Or not any(u in memory for u in ("K", "M", "G", "T"))?
if all(u in memory for u in ("K", "M", "G", "T")):
# assume Mb
units = "M"
Expand Down
Loading

0 comments on commit f960941

Please sign in to comment.