From 05804256692c4f81cb2a7e73c19adb78334cd229 Mon Sep 17 00:00:00 2001 From: Andres Tanasijczuk Date: Fri, 27 Nov 2020 14:40:47 +0100 Subject: [PATCH] Use squeue when checking if jobs were cancelled. Remove extern job steps from sacct output. --- lib/SlurmDagman/__init__.py | 2 +- lib/SlurmDagman/process/worker.py | 35 ++++++++++++++----------------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/lib/SlurmDagman/__init__.py b/lib/SlurmDagman/__init__.py index b30a177..25906e6 100644 --- a/lib/SlurmDagman/__init__.py +++ b/lib/SlurmDagman/__init__.py @@ -1,3 +1,3 @@ #!/usr/bin/env python -__version__ = "0.1.1" +__version__ = "0.1.2" diff --git a/lib/SlurmDagman/process/worker.py b/lib/SlurmDagman/process/worker.py index bcbf40d..a6e6fb2 100644 --- a/lib/SlurmDagman/process/worker.py +++ b/lib/SlurmDagman/process/worker.py @@ -386,8 +386,9 @@ def __sacct(self): cmd = ['sacct', '--noheader', '-P', '--wckeys=%s' % (self.wckey), '--format=JobID,JobName,State,NodeList,ExitCode', '--jobs=%s' % (','.join(self.queued_job_ids)), '--starttime=%s' % (self.start_time)] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() - out = out.decode('utf-8').strip() - err = err.decode('utf-8').strip() + out = out.decode('utf-8').strip().split('\n') + err = err.decode('utf-8').strip().split('\n') + out = [l for l in out if l and l.find('|batch|') == -1 and l.find('|extern|') == -1] return out, err @@ -395,8 +396,9 @@ def __squeue(self): cmd = ['squeue', '--noheader', '--format="%i|%T|%w|%N"'] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() - out = out.decode('utf-8').strip() - err = err.decode('utf-8').strip() + out = out.decode('utf-8').strip().split('\n') + err = err.decode('utf-8').strip().split('\n') + out = [l for l in out if l and l.find('|%s|' % (self.wckey)) > 0] return out, err @@ -404,15 +406,12 @@ def __monitor(self): num_nodes_running = 0 num_nodes_pending = 0 num_nodes_unknown = 0 - sacct_out, sacct_err = self.__sacct() - sacct_out = [l for l in sacct_out.split('\n') if l and l.find('|batch|') == -1] - squeue_out, squeue_err = self.__squeue() - squeue_out = [l for l in squeue_out.split('\n') if l and l.find('|%s|' % (self.wckey)) > 0] + sacct_out, _ = self.__sacct() + squeue_out, _ = self.__squeue() squeue_result = {} for job in squeue_out: job_id, status, wckey, computing_node = job.split('|') - if wckey == self.wckey: # this if is redundant with the l.find(self.wckey) > 0 above, but doesn't heart to have it - squeue_result[job_id] = {'status': status, 'computing_node': computing_node} + squeue_result[job_id] = {'status': status, 'computing_node': computing_node} nodes_done = set() for job in sacct_out: job_id, job_name, status, computing_node, exit_code = job.split('|') @@ -552,17 +551,15 @@ def __cancel_dag(self): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() time.sleep(60) - sacct_out, sacct_err = self.__sacct() - sacct_out = [l for l in sacct_out.split('\n') if l and l.find('|batch|') == -1] - nodes_not_cancelled = [l for l in sacct_out if l.find('CANCELLED') == -1 and l.find('COMPLETED') == -1] - if sacct_err or nodes_not_cancelled: + squeue_out, squeue_err = self.__squeue() + if squeue_out or squeue_err: if cancel_retry_num > max_num_cancel_retries: - msg = 'After %i trials to cancel the queued DAG nodes, %i nodes are still queued.' % (cancel_retry_num, len(nodes_not_cancelled)) + msg = 'After %i trials to cancel the queued DAG nodes, %i nodes are still queued.' % (cancel_retry_num, len(squeue_out)) msg += ' Aborting the cancel.' - msg += ' This is the output of sacct:' - msg += '\nStdout:\n%s' % (sacct_out) - if sacct_err: - msg += '\nStderr:\n%s' % (sacct_err) + msg += ' This is the output of squeue:' + msg += '\nStdout:\n%s' % ('\n'.join(squeue_out)) + if squeue_err: + msg += '\nStderr:\n%s' % ('\n'.join(squeue_err)) raise Exception(msg) cancel_retry_num += 1 else: