Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix parse_jobs_list_output parsing issues with SGE #52

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 47 additions & 19 deletions src/qtoolkit/io/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ def parse_job_output(self, exit_code, stdout, stderr) -> QJob | None: # aiida s
),
)

def _get_element_text(self, parent, tag_name):
def _get_element_text(self, parent, tag_name, default=None):
elements = parent.getElementsByTagName(tag_name)
if elements:
if elements and elements[0].childNodes:
return elements[0].childNodes[0].data.strip()
return None
return default

def _safe_int(self, value: str | None) -> int | None:
if value is None:
Expand Down Expand Up @@ -294,41 +294,69 @@ def parse_jobs_list_output(self, exit_code, stdout, stderr) -> list[QJob]:
except xml.parsers.expat.ExpatError:
raise OutputParsingError("XML parsing of stdout failed")

job_elements = xmldata.getElementsByTagName("job_list")
jobs_list = []
# Determine the structure of the XML
if xmldata.getElementsByTagName("job_list"):
# Handle job_list_u.xml structure
job_elements = xmldata.getElementsByTagName("job_list")
elif xmldata.getElementsByTagName("element"):
# Handle job_list.xml structure
job_elements = xmldata.getElementsByTagName("element")
else:
raise OutputParsingError("No recognizable job elements found in XML data")

for job_element in job_elements:
qjob = QJob()
qjob.job_id = self._get_element_text(job_element, "JB_job_number")
job_state_string = self._get_element_text(job_element, "state")
qjob.job_id = self._get_element_text(
job_element, "JB_job_number", default="Unknown"
)
qjob.name = self._get_element_text(
job_element, "JB_job_name", default="Unknown"
)
qjob.username = self._get_element_text(
job_element, "JB_owner", default="Unknown"
)

# Determine job state
job_state_string = self._get_element_text(
job_element, "state", default=None
)
if job_state_string is None:
job_state_string = self._get_element_text(
job_element, "JAT_status", default="Unknown"
)

try:
sge_job_state = SGEState(job_state_string)
qjob.sub_state = sge_job_state
qjob.state = sge_job_state.qstate
except ValueError:
raise OutputParsingError(
f"Unknown job state {job_state_string} for job id {qjob.job_id}"
print(
f"Unknown job state: {job_state_string} for job ID: {qjob.job_id}"
)
qjob.sub_state = None
qjob.state = None

qjob.sub_state = sge_job_state
qjob.state = sge_job_state.qstate
qjob.username = self._get_element_text(job_element, "JB_owner")
qjob.name = self._get_element_text(job_element, "JB_name")

# Collect additional job information
info = QJobInfo()
info.nodes = self._safe_int(
self._get_element_text(job_element, "num_nodes")
self._get_element_text(job_element, "num_nodes", default="0")
)
info.cpus = self._safe_int(
self._get_element_text(job_element, "num_proc", default="0")
)
info.cpus = self._safe_int(self._get_element_text(job_element, "num_proc"))
info.memory_per_cpu = self._convert_memory_str(
self._get_element_text(job_element, "hard resource_list.mem_free")
self._get_element_text(
job_element, "hard resource_list.mem_free", default="0"
)
)
info.partition = self._get_element_text(job_element, "queue")
info.time_limit = self._convert_str_to_time(
self._get_element_text(job_element, "hard resource_list.h_rt")
self._get_element_text(
job_element, "hard resource_list.h_rt", default="0"
)
)

qjob.info = info

jobs_list.append(qjob)

return jobs_list
Expand Down
Loading