Skip to content

Commit

Permalink
JSON output for hq job list and hq submit (#24)
Browse files Browse the repository at this point in the history
Instead of using regex to parse the command output, HQ provide output
mode JSON which we can get output to a dict to avoid using regex.
  • Loading branch information
unkcpz authored Jul 17, 2024
1 parent 81fc406 commit d53617a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 37 deletions.
71 changes: 36 additions & 35 deletions aiida_hyperqueue/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
###########################################################################
"""Plugin for the HyperQueue meta scheduler."""

import re
import json
import typing as t

from aiida.common.extendeddicts import AttributeDict
Expand Down Expand Up @@ -148,7 +148,7 @@ def _get_submit_command(self, submit_script: str) -> str:
submit_script: the path of the submit script relative to the working
directory.
"""
submit_command = f"hq submit {submit_script}"
submit_command = f"hq submit {submit_script} --output-mode=json"

self.logger.info(f"Submitting with: {submit_command}")

Expand Down Expand Up @@ -178,23 +178,18 @@ def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str:
f"in _parse_submit_output{transport_string}: there was some text in stderr: {stderr}"
)

job_id_pattern = re.compile(
r"Job\ssubmitted\ssuccessfully,\sjob\sID:\s(?P<jobid>\d+)"
)

for line in stdout.split("\n"):
match = job_id_pattern.match(line.strip())
if match:
return match.group("jobid")

# If no valid line is found, log and raise an error
self.logger.error(
f"in _parse_submit_output{transport_string}: unable to find the job id: {stdout}"
)
raise SchedulerError(
"Error during submission, could not retrieve the jobID from "
"hq submit output; see log for more info."
)
hq_job_dict = json.loads(stdout)
try:
return str(hq_job_dict["id"])
except KeyError:
# If no valid line is found, log and raise an error
self.logger.error(
f"in _parse_submit_output{transport_string}: unable to find the job id: {stdout}"
)
raise SchedulerError(
"Error during submission, could not retrieve the jobID from "
"hq submit output; see log for more info."
)

def _get_joblist_command(
self, jobs: t.Optional[list] = None, user: t.Optional[str] = None
Expand All @@ -205,7 +200,7 @@ def _get_joblist_command(
These could in principle be passed to the ``hq job`` command, but this has an entirely different format.
"""

return "hq job list --filter waiting,running"
return "hq job list --filter waiting,running --output-mode=json"

def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list:
"""Parse the stdout for the joblist command.
Expand All @@ -222,24 +217,30 @@ def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list:

if stderr.strip():
self.logger.warning(
f"hq job list returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'"
f"`hq job list` returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'"
)

job_info_pattern = re.compile(
r"\|\s+(?P<id>[\d]+)\s\|\s+(?P<name>[^|]+)\s+\|\s(?P<state>[\w]+)\s+\|\s(?P<tasks>[\d]+)\s+\|"
)
job_info_list = []
# convert hq returned job list to job info list
# HQ support 1 hq job with multiple tasks.
# Since the way aiida-hq using hq is 1-1 match between hq job and hq task, we only parse 1 task as aiida job.
hq_job_info_list = json.loads(stdout)

for line in stdout.split("\n"):
match = job_info_pattern.match(line)
if match:
job_dict = match.groupdict()
job_info = JobInfo()
job_info.job_id = job_dict["id"]
job_info.title = job_dict["name"]
job_info.job_state = _MAP_STATUS_HYPERQUEUE[job_dict["state"].upper()]
# TODO: In principle more detailed information can be parsed for each job by `hq job info`, such as cpu, wall_time etc.
job_info_list.append(job_info)
job_info_list = []
for hq_job_dict in hq_job_info_list:
job_info = JobInfo()
job_info.job_id = hq_job_dict["id"]
job_info.title = hq_job_dict["name"]
stats: t.List[str] = [
stat for stat, v in hq_job_dict["task_stats"].items() if v > 0
]
if hq_job_dict["task_count"] != 1 or len(stats) != 1:
self.logger.error("not able to parse hq job with multiple tasks.")
else:
job_info.job_state = _MAP_STATUS_HYPERQUEUE[stats[0].upper()]

job_info_list.append(job_info)

# TODO: In principle more detailed information can be parsed for each job by `hq job info`, such as cpu, wall_time etc.

return job_info_list

Expand Down
6 changes: 4 additions & 2 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_submit_command():
"""Test submit command"""
scheduler = HyperQueueScheduler()

assert scheduler._get_submit_command("job.sh") == "hq submit job.sh"
assert "hq submit job.sh" in scheduler._get_submit_command("job.sh")


def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script):
Expand All @@ -79,7 +79,9 @@ def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script):
Path("_aiidasubmit.sh").write_text(valid_submit_script)

process = hq_env.command(
["submit", "_aiidasubmit.sh"], wait=False, ignore_stderr=True
["submit", "--output-mode=json", "_aiidasubmit.sh"],
wait=False,
ignore_stderr=True,
)
stdout = process.communicate()[0].decode()
stderr = ""
Expand Down

0 comments on commit d53617a

Please sign in to comment.