Skip to content

Commit

Permalink
Use squeue when checking if jobs were cancelled. Remove extern job st…
Browse files Browse the repository at this point in the history
…eps from sacct output.
  • Loading branch information
Andres Tanasijczuk committed Nov 27, 2020
1 parent 85e0d81 commit 0580425
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 20 deletions.
2 changes: 1 addition & 1 deletion lib/SlurmDagman/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env python

__version__ = "0.1.1"
__version__ = "0.1.2"
35 changes: 16 additions & 19 deletions lib/SlurmDagman/process/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,33 +386,32 @@ def __sacct(self):
cmd = ['sacct', '--noheader', '-P', '--wckeys=%s' % (self.wckey), '--format=JobID,JobName,State,NodeList,ExitCode', '--jobs=%s' % (','.join(self.queued_job_ids)), '--starttime=%s' % (self.start_time)]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
out = out.decode('utf-8').strip()
err = err.decode('utf-8').strip()
out = out.decode('utf-8').strip().split('\n')
err = err.decode('utf-8').strip().split('\n')
out = [l for l in out if l and l.find('|batch|') == -1 and l.find('|extern|') == -1]
return out, err


def __squeue(self):
cmd = ['squeue', '--noheader', '--format="%i|%T|%w|%N"']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
out = out.decode('utf-8').strip()
err = err.decode('utf-8').strip()
out = out.decode('utf-8').strip().split('\n')
err = err.decode('utf-8').strip().split('\n')
out = [l for l in out if l and l.find('|%s|' % (self.wckey)) > 0]
return out, err


def __monitor(self):
num_nodes_running = 0
num_nodes_pending = 0
num_nodes_unknown = 0
sacct_out, sacct_err = self.__sacct()
sacct_out = [l for l in sacct_out.split('\n') if l and l.find('|batch|') == -1]
squeue_out, squeue_err = self.__squeue()
squeue_out = [l for l in squeue_out.split('\n') if l and l.find('|%s|' % (self.wckey)) > 0]
sacct_out, _ = self.__sacct()
squeue_out, _ = self.__squeue()
squeue_result = {}
for job in squeue_out:
job_id, status, wckey, computing_node = job.split('|')
if wckey == self.wckey: # this if is redundant with the l.find(self.wckey) > 0 above, but doesn't heart to have it
squeue_result[job_id] = {'status': status, 'computing_node': computing_node}
squeue_result[job_id] = {'status': status, 'computing_node': computing_node}
nodes_done = set()
for job in sacct_out:
job_id, job_name, status, computing_node, exit_code = job.split('|')
Expand Down Expand Up @@ -552,17 +551,15 @@ def __cancel_dag(self):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
time.sleep(60)
sacct_out, sacct_err = self.__sacct()
sacct_out = [l for l in sacct_out.split('\n') if l and l.find('|batch|') == -1]
nodes_not_cancelled = [l for l in sacct_out if l.find('CANCELLED') == -1 and l.find('COMPLETED') == -1]
if sacct_err or nodes_not_cancelled:
squeue_out, squeue_err = self.__squeue()
if squeue_out or squeue_err:
if cancel_retry_num > max_num_cancel_retries:
msg = 'After %i trials to cancel the queued DAG nodes, %i nodes are still queued.' % (cancel_retry_num, len(nodes_not_cancelled))
msg = 'After %i trials to cancel the queued DAG nodes, %i nodes are still queued.' % (cancel_retry_num, len(squeue_out))
msg += ' Aborting the cancel.'
msg += ' This is the output of sacct:'
msg += '\nStdout:\n%s' % (sacct_out)
if sacct_err:
msg += '\nStderr:\n%s' % (sacct_err)
msg += ' This is the output of squeue:'
msg += '\nStdout:\n%s' % ('\n'.join(squeue_out))
if squeue_err:
msg += '\nStderr:\n%s' % ('\n'.join(squeue_err))
raise Exception(msg)
cancel_retry_num += 1
else:
Expand Down

0 comments on commit 0580425

Please sign in to comment.