Skip to content

Commit

Permalink
Merge pull request #3 from mdebony/maxPending
Browse files Browse the repository at this point in the history
Add a max pending job option
  • Loading branch information
AndresTanasijczuk authored Jun 23, 2022
2 parents 9c9c331 + c010f8c commit c3b31d1
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 11 deletions.
9 changes: 8 additions & 1 deletion bin/slurm_dagman
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ SLURM_DAGMAN_WORKER = None
def init():
global SLURM_DAGMAN_WORKER
try:
SLURM_DAGMAN_WORKER = Worker(options['dag_file'], options['outfile'], options['proxy_file'], options['sleep_time'], options['max_jobs_queued'], options['max_jobs_submit'], options['submit_wait_time'])
SLURM_DAGMAN_WORKER = Worker(dag_file=options['dag_file'],
outfile=options['outfile'],
proxy=options['proxy_file'],
sleep_time=options['sleep_time'],
max_jobs_queued=options['max_jobs_queued'],
max_jobs_pending=options['max_jobs_pending'],
max_jobs_submit=options['max_jobs_submit'],
submit_wait_time=options['submit_wait_time'])
except Exception:
print('Error running slurm_dagman:\n%s' % (traceback.format_exc()))
sys.exit(1)
Expand Down
1 change: 1 addition & 0 deletions bin/slurm_submit_dag
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ cmd = [slurm_dagman,
'--outfile', options['outfile'],
'--sleep-time', '%i' % (options['sleep_time']),
'--max-jobs-queued', '%i' % (options['max_jobs_queued']),
'--max-jobs-pending', '%i' % (options['max_jobs_pending']),
'--max-jobs-submit', '%i' % (options['max_jobs_submit']),
'--submit-wait-time', '%i' % (options['submit_wait_time']),
]
Expand Down
8 changes: 8 additions & 0 deletions etc/SlurmDagman.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@
# default: 500 (0 or negative = no limit)
#max_jobs_queued =

# Default value for the command line option '--max-jobs-pending'
# of the slurm_submit_dag command (and the slurm_dagman executable).
# Maximum allowed number of jobs that (each instance of) slurm_dagman
# can be currently pending. Only the general Slurm queue is considered,
# not a queue per partition.
# default: 500 (0 or negative = no limit)
#max_jobs_pending =

# Default value for the command line option '--max-jobs-submit'
# of the slurm_submit_dag command (and the slurm_dagman executable).
# Maximum number of jobs that (each instance of) slurm_dagman can
Expand Down
1 change: 1 addition & 0 deletions lib/SlurmDagman/config/defaults/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
DEFAULTS['DAGMAN'] = OrderedDict()
DEFAULTS['DAGMAN']['sleep_time'] = '120'
DEFAULTS['DAGMAN']['max_jobs_queued'] = '500'
DEFAULTS['DAGMAN']['max_jobs_pending'] = '0'
DEFAULTS['DAGMAN']['max_jobs_submit'] = '0'
DEFAULTS['DAGMAN']['submit_wait_time'] = '2'
7 changes: 7 additions & 0 deletions lib/SlurmDagman/process/command/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ def exit(msg):
default = int(package_config.get_param('DAGMAN', 'max_jobs_queued')),
help = "(maximum number of jobs that can be put in the slurm queue)")

parser.add_argument("--max-jobs-pending",
type = int,
dest = "max_jobs_pending",
default = int(package_config.get_param('DAGMAN', 'max_jobs_pending')),
help = "(maximum number of jobs that are pending in the slurm queue)")

parser.add_argument("--max-jobs-submit",
type = int,
dest = "max_jobs_submit",
Expand Down Expand Up @@ -153,4 +159,5 @@ def exit(msg):
options['sleep_time'] = max(args.sleep_time, 0)
options['max_jobs_queued'] = max(args.max_jobs_queued, 0)
options['max_jobs_submit'] = max(args.max_jobs_submit, 0)
options['max_jobs_pending'] = max(args.max_jobs_pending, 0)
options['submit_wait_time'] = max(args.submit_wait_time, 0)
30 changes: 20 additions & 10 deletions lib/SlurmDagman/process/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@

class Worker(object):

def __init__(self, dag_file=None, outfile=None, proxy=None,
sleep_time=None, max_jobs_queued=None, max_jobs_submit=None, submit_wait_time=None):
def __init__(self, dag_file=None, outfile=None, proxy=None, sleep_time=None, max_jobs_queued=None,
max_jobs_pending=None, max_jobs_submit=None, submit_wait_time=None):
super(Worker, self).__init__()
self.outfile = outfile
self.__set_logging()
Expand All @@ -51,7 +51,7 @@ def __init__(self, dag_file=None, outfile=None, proxy=None,
self.__init_process_config()
self.process_config.set_params(process_config.get_params())
self.process_config_file = get_dag_file_rootname(dag_file) + '.slurm_dagman.cfg'
self.set_params(sleep_time, max_jobs_queued, max_jobs_submit, submit_wait_time)
self.set_params(sleep_time, max_jobs_queued, max_jobs_pending, max_jobs_submit, submit_wait_time)
self.dag_done = {}
self.num_nodes_total = 0
self.num_nodes_done = 0
Expand Down Expand Up @@ -84,8 +84,8 @@ def __set_logging(self):


def __init_params(self):
self.sleep_time, self.max_jobs_queued, self.max_jobs_submit, self.submit_wait_time, \
self.drain, self.cancel \
self.sleep_time, self.max_jobs_queued, self.max_jobs_pending, self.max_jobs_submit, self.submit_wait_time, \
self.drain, self.cancel\
= list(self.__get_config_params(process_config).values())


Expand All @@ -105,6 +105,7 @@ def __get_config_params(self, config, fallback=False, sanitize=True):
params = OrderedDict()
params['sleep_time'] = self.__get_config_param(config, 'DAGMAN', 'sleep_time', fallback, 'int')
params['max_jobs_queued'] = self.__get_config_param(config, 'DAGMAN', 'max_jobs_queued', fallback, 'int')
params['max_jobs_pending'] = self.__get_config_param(config, 'DAGMAN', 'max_jobs_pending', fallback, 'int')
params['max_jobs_submit'] = self.__get_config_param(config, 'DAGMAN', 'max_jobs_submit', fallback, 'int')
params['submit_wait_time'] = self.__get_config_param(config, 'DAGMAN', 'submit_wait_time', fallback, 'int')
params['drain'] = self.__get_config_param(config, 'DAGMAN', 'drain', fallback, 'boolean')
Expand Down Expand Up @@ -136,12 +137,14 @@ def reset_params(self):
self.__init_params()


def set_params(self, sleep_time=None, max_jobs_queued=None, max_jobs_submit=None, submit_wait_time=None,
drain=None, cancel=None):
def set_params(self, sleep_time=None, max_jobs_queued=None, max_jobs_pending=None, max_jobs_submit=None,
submit_wait_time=None, drain=None, cancel=None):
if sleep_time is not None:
self.sleep_time = self.__replace_negative_int_by_zero(sleep_time)
if max_jobs_queued is not None:
self.max_jobs_queued = self.__replace_negative_int_by_zero(max_jobs_queued)
if max_jobs_pending is not None:
self.max_jobs_pending = self.__replace_negative_int_by_zero(max_jobs_pending)
if max_jobs_submit is not None:
self.max_jobs_submit = self.__replace_negative_int_by_zero(max_jobs_submit)
if submit_wait_time is not None:
Expand All @@ -157,6 +160,7 @@ def set_params(self, sleep_time=None, max_jobs_queued=None, max_jobs_submit=None
def __set_process_config_params(self):
self.process_config.set_param('DAGMAN', 'sleep_time', self.sleep_time)
self.process_config.set_param('DAGMAN', 'max_jobs_queued', self.max_jobs_queued)
self.process_config.set_param('DAGMAN', 'max_jobs_pending', self.max_jobs_pending)
self.process_config.set_param('DAGMAN', 'max_jobs_submit', self.max_jobs_submit )
self.process_config.set_param('DAGMAN', 'submit_wait_time', self.submit_wait_time )
self.process_config.set_param('DAGMAN', 'drain', self.drain)
Expand Down Expand Up @@ -197,14 +201,16 @@ def __parse_process_config(self, log_changes=True):
# so we have to handle the case of a parameter being None. Finally,
# we will return True if there is no None parameter and False otherwise.
params = list(self.__get_process_config_params().values())
sleep_time, max_jobs_queued, max_jobs_submit, submit_wait_time, \
sleep_time, max_jobs_queued, max_jobs_pending, max_jobs_submit, submit_wait_time, \
drain, cancel \
= params[:]
if log_changes:
if sleep_time is not None and sleep_time != self.sleep_time:
logging.info("Dag config change detected: sleep_time set to %s seconds" % (sleep_time))
if max_jobs_queued is not None and max_jobs_queued != self.max_jobs_queued:
logging.info("Dag config change detected: max_jobs_queued set to %s" % (max_jobs_queued))
if max_jobs_pending is not None and max_jobs_pending != self.max_jobs_pending:
logging.info("Dag config change detected: max_jobs_pending set to %s" % (max_jobs_pending))
if max_jobs_submit is not None and max_jobs_submit != self.max_jobs_submit:
logging.info("Dag config change detected: max_jobs_submit set to %s" % (max_jobs_submit))
if submit_wait_time is not None and submit_wait_time != self.submit_wait_time:
Expand All @@ -217,6 +223,8 @@ def __parse_process_config(self, log_changes=True):
self.sleep_time = sleep_time
if max_jobs_queued is not None:
self.max_jobs_queued = max_jobs_queued
if max_jobs_pending is not None:
self.max_jobs_pending = max_jobs_pending
if max_jobs_submit is not None:
self.max_jobs_submit = max_jobs_submit
if submit_wait_time is not None:
Expand Down Expand Up @@ -295,11 +303,13 @@ def __write_dag_file(self, dag_file, add_done_labels=True):
self.dag.write(dag_file, use_dag_nodes_appearance_order=True, add_done_labels=add_done_labels)


def __submit_ready_nodes(self):
def __submit_ready_nodes(self, num_nodes_pending):
num_submitted_nodes = 0
for node in self.dag:
if self.max_jobs_queued > 0 and len(self.queued_job_ids) >= self.max_jobs_queued:
break
if self.max_jobs_pending > 0 and (num_nodes_pending+num_submitted_nodes) >= self.max_jobs_pending:
break
if self.dag[node]['status'] == 1: # node ready to be submitted
# Submit a job
job_id, error = self.__submit(self.dag[node]['job_submission_file'], node)
Expand Down Expand Up @@ -528,7 +538,7 @@ def __execute_dag(self):
if self.num_nodes_queued > 0:
logging.info('Of %i nodes queued: %i running, %i pending, %i other' % (self.num_nodes_queued, num_nodes_running, num_nodes_pending, num_nodes_unknown))
if self.num_nodes_ready > 0 and not self.drain and not self.cancel:
self.__submit_ready_nodes()
self.__submit_ready_nodes(num_nodes_pending=num_nodes_pending)
else:
if self.num_nodes_done == self.num_nodes_total:
logging.info('DAG completed successfully.')
Expand Down

0 comments on commit c3b31d1

Please sign in to comment.