Skip to content

Commit

Permalink
First version of SlurmDagman.
Browse files Browse the repository at this point in the history
  • Loading branch information
Andres Tanasijczuk committed Nov 18, 2020
0 parents commit 85e0d81
Show file tree
Hide file tree
Showing 34 changed files with 2,948 additions and 0 deletions.
674 changes: 674 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# SlurmDagman

This application allows to run (the jobs in) a DAG with [Slurm](https://slurm.schedmd.com/documentation.html) workload manager.

## Installation and configuration

SlurmDagman can be installed via `pip` ([SlurmDagman in PyPI](https://pypi.org/project/SlurmDagman))
or it can simply be downloaded from [github](https://github.com/AndresTanasijczuk/SlurmDagman).

A configuration file for SlurmDagman named _SlurmDagman.conf_
is included in the package. In the git repository it can be found in the
__etc__ folder. In the distribution it can be found in a sub-directory named
__etc/SlurmDagman__. The configuration file provides default values for Slurm
job submission options and for some SlurmDagman worker parameters.

SlurmDagman will try to read the _SlurmDagman.conf_ configuration file from the
following locations, in this order:

- `/etc/` and `/etc/SlurmDagman/`
- `$xdg_config_dir/` and `$xdg_config_dir/SlurmDagman/` where $xdg_config_dir
is each of the paths defined by the environment variable $XDG_CONFIG_DIRS;
if $XDG_CONFIG_DIRS is not defined, $xdg_config_dir defaults to /etc/xdg/
- `site.USER_BASE/etc/` and `site.USER_BASE/etc/SlurmDagman/` where site.USER_BASE
is the python variable that defines the location for a user installation
- `$xdg_config_home/` and `$xdg_config_home/SlurmDagman/` where $xdg_config_home
is the environment variable $XDG_CONFIG_HOME when defined and not empty;
otherwise it defaults to $HOME/.config
- `$VIRTUAL_ENV/etc/` and `$VIRTUAL_ENV/etc/SlurmDagman/` when $VIRTUAL_ENV
is defined and not empty

## Usage

See the [project webpage](https://andrestanasijczuk.github.io/SlurmDagman/).

## License

SlurmDagman is provided "as is" and with no warranty. This software is
distributed under the GNU General Public License; please see the LICENSE file
for details.
89 changes: 89 additions & 0 deletions bin/condor_dag_to_slurm_dag
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python
"""
Copyright (C) 2020 Universite catholique de Louvain, Belgium.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""

import os
import sys

from argparse import ArgumentParser

from SlurmDagman.config.data.package import package_config
from SlurmDagman.translators.condor_to_slurm import CondorToSlurmTranslator


parser = ArgumentParser(description="Translate a HTCondor DAG file and all the therein referenced submission files to SLURM")

parser.add_argument("--slurm-partition",
dest = "slurm_partition",
default = package_config.get_param('SLURM', 'partition'),
help = "(partition to use in the Slurm submission file)")

parser.add_argument("--slurm-qos",
dest = "slurm_qos",
default = package_config.get_param('SLURM', 'qos'),
help = "(qos to use in the Slurm submission file)")

parser.add_argument("--slurm-time-limit",
dest = "slurm_time_limit",
default = package_config.get_param('SLURM', 'time_limit'),
help = "(time limit to use in the Slurm submission file)")

parser.add_argument("--slurm-scratch-dir",
dest = "slurm_scratch_dir",
default = package_config.get_param('SLURM', 'scratch_dir'),
help = "(replace _CONDOR_SCRATCH_DIR with this scratch directory)")

parser.add_argument("--slurm-use-setsid",
action = "store_true",
dest = "slurm_use_setsid",
default = False,
help = "(change the user payload in the Slurm submission file so that it will be run in a new session by means of setsid)")

parser.add_argument("--singularity-image",
dest = "singularity_image",
default = '',
help = "(change the user payload in the Slurm submission file so that it will be run with singularity, with the given singularity image)")

parser.add_argument("--singularity-bind-mount-dbus",
action = "store_true",
dest = "singularity_bind_mount_dbus",
default = False,
help = "(add the bind mount of dbus directories to singularity in the Slurm submission file; this option only makes sense if a singularity image is provided with --singularity-image)")

parser.add_argument("dagfile",
nargs = 1,
help = "(a HTCondor DAG file)")

args = parser.parse_args()

_condor_dag_file = args.dagfile[0]
if not os.path.exists(_condor_dag_file):
print("ERROR: %s does not exist." % (_condor_dag_file))
sys.exit(1)
if not os.path.isfile(_condor_dag_file):
print("ERROR: %s is not a file." % (_condor_dag_file))
sys.exit(1)

if args.singularity_bind_mount_dbus and not args.singularity_image:
parser.error('Option --singularity-bind-mount-dbus only makes sense if a singularity image is provided with --singularity-image.')

translator = CondorToSlurmTranslator(_condor_dag_file, _condor_dag_file + '.slurm',
args.slurm_partition, args.slurm_qos, args.slurm_time_limit, args.slurm_scratch_dir, args.slurm_use_setsid,
args.singularity_image, args.singularity_bind_mount_dbus)
translator.translate()

sys.exit(0)
61 changes: 61 additions & 0 deletions bin/fix_dag_file
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/bin/bash

# When a slurm_dagman process is killed with slurm_cancel_dag or with kill -9, it should create a new rescue dag file.
# If slurm_dagman is killed by other means, the rescue dag file will be missing, meaning that nodes that were completed
# successfully will not be marked as DONE and slurm_dagman (if run again on the same DAG) will run these nodes again.
# This tool is to mark completed nodes as DONE in a DAG file. It takes two arguments as input: the first is the DAG file
# to fix (choose the one that will be used as input in the next slurm_dagman run) and the second is the slurm_dagman
# .out file from where the list of completed nodes will be extracted.

script_name=`basename $0`

usage="Usage: ${script_name} dag_file slurm_dagman_out_file"
usage+="\nwhere dag_file is the file to be fixed and slurm_dagman_out_file if the slurm_dagman .out file that should be read to extract the list of completed nodes."

if [[ $# -eq 2 ]]; then
dag_file=$1
slurm_dagman_out_file=$2
else
echo "ERROR: Script ${script_name} takes two arguments."
echo -e "${usage}"
exit 1
fi

grep 'JOB ' ${dag_file} > ~/.${dag_file}_JOB_lines
grep 'VARS ' ${dag_file} > ~/.${dag_file}_VARS_lines
grep 'RETRY ' ${dag_file} > ~/.${dag_file}_RETRY_lines
grep 'PARENT ' ${dag_file} > ~/.${dag_file}_PARENT_lines

while IFS= read -r line
do
if [[ $line == *"Node "*" completed" ]];
then
linearray=($line)
node=${linearray[3]}
sed -i "/^JOB ${node} .*[^DONE]$/ s/$/ DONE/g1" ~/.${dag_file}_JOB_lines
fi
done < ${slurm_dagman_out_file}

dag_file_fixed=${dag_file}_fixed

#while IFS= read -r line
#do
# if [[ $line == "JOB "* && $line != *" DONE" ]];
# then
# linearray=($line)
# node=${linearray[1]}
# grep -q "Node ${node} completed" ${slurm_dagman_out_file}
# if [[ $? -eq 0 ]];
# then
# line="${line} DONE"
# fi
# fi
# echo ${line} >> ${dag_file_fixed}
#done < ${dag_file}

paste -d \\n ~/.${dag_file}_JOB_lines ~/.${dag_file}_VARS_lines > ~/.${dag_file}_JOB_VARS_lines
rm ~/.${dag_file}_JOB_lines ~/.${dag_file}_VARS_lines
cat ~/.${dag_file}_JOB_VARS_lines ~/.${dag_file}_RETRY_lines ~/.${dag_file}_PARENT_lines > ${dag_file_fixed}
rm ~/.${dag_file}_JOB_VARS_lines ~/.${dag_file}_RETRY_lines ~/.${dag_file}_PARENT_lines

echo "Fixed dag saved to ${dag_file_fixed}"
42 changes: 42 additions & 0 deletions bin/slurm_cancel_dag
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env python
"""
Copyright (C) 2020 Universite catholique de Louvain, Belgium.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""

import signal
import sys

from SlurmDagman.utils.process import find_slurm_dagman_pids_for_current_user, send_signal


rc = 1
user, slurm_dagman_pids = find_slurm_dagman_pids_for_current_user()
num_slurm_dagman_processes = len(slurm_dagman_pids)
if num_slurm_dagman_processes == 1:
print('Found one slurm_dagman process (%s) belonging to user %s.' % (slurm_dagman_pids[0], user))
send_signal(slurm_dagman_pids[0], signal.SIGTERM)
rc = 0
else:
if num_slurm_dagman_processes == 0:
msg = 'Error: Did not find any slurm_dagman process belonging to user %s.' % (user)
print(msg)
else:
msg = 'Error: Found more than one slurm_dagman process belonging to user %s: %s.' % (user, slurm_dagman_pids)
msg += ' Do not know which one to cancel.'
msg += "\nYou can cancel the dag of a slurm_dagman process by running 'kill -%s <slurm-dagman-pid>' by hand." % (signal.SIGTERM)
print(msg)

sys.exit(rc)
79 changes: 79 additions & 0 deletions bin/slurm_dagman
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python
"""
Copyright (C) 2020 Universite catholique de Louvain, Belgium.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""

import signal
import sys
import traceback

from SlurmDagman.process.command.arguments import options
from SlurmDagman.process.worker import Worker


SLURM_DAGMAN_WORKER = None


def init():
global SLURM_DAGMAN_WORKER
try:
SLURM_DAGMAN_WORKER = Worker(options['dag_file'], options['outfile'], options['proxy_file'], options['sleep_time'], options['max_jobs_queued'], options['max_jobs_submit'], options['submit_wait_time'])
except Exception:
print('Error running slurm_dagman:\n%s' % (traceback.format_exc()))
sys.exit(1)


def run():
if isinstance(SLURM_DAGMAN_WORKER, Worker):
try:
return SLURM_DAGMAN_WORKER.run()
except Exception:
print('Error running slurm_dagman:\n%s' % (traceback.format_exc()))
return 1
else:
print('Unexpected type of slurm dagman worker object:\n%s' % (traceback.format_exc()))
sys.exit(1)


def terminate():
if isinstance(SLURM_DAGMAN_WORKER, Worker):
try:
SLURM_DAGMAN_WORKER.terminate()
except Exception:
print('Error terminating slurm dagman worker:\n%s' % (traceback.format_exc()))
try:
SLURM_DAGMAN_WORKER.write_rescue_dag_file()
except Exception:
print('Failed to write rescue DAG:\n%s' % (traceback.format_exc()))
else:
print('Unexpected type of slurm dagman worker object:\n%s' % (traceback.format_exc()))


if __name__ == '__main__':

def signal_term_handler(signum, frame):
print('Got SIGTERM (%s) signal:' % (signum))
print(' -> Terminating...')
terminate()
print('Terminated.')
sys.exit(0)
signal.signal(signal.SIGTERM, signal_term_handler)

init()
rc = run()
if rc > 0:
terminate()
sys.exit(rc)
76 changes: 76 additions & 0 deletions bin/slurm_submit_dag
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python
"""
Copyright (C) 2020 Universite catholique de Louvain, Belgium.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""

from datetime import datetime as dt
import os
import sys
import subprocess
import traceback

from SlurmDagman.dag.utils.rescue_dag import get_dag_file_rootname
from SlurmDagman.process.command.arguments import options


slurm_dagman = 'slurm_dagman'
if 'SLURM_DAGMAN_DIR' in os.environ and os.environ['SLURM_DAGMAN_DIR'].strip() != '':
slurm_dagman = os.path.join(os.environ['SLURM_DAGMAN_DIR'].strip(), 'slurm_dagman')
if not os.path.isfile(slurm_dagman):
msg = 'ERROR: Executable script %s does not exist.' % (slurm_dagman)
msg += '\nMake sure the environment variable SLURM_DAGMAN_DIR points to the directory where the slurm_dagman script is located.'
print(msg)
sys.exit(1)

cmd = [slurm_dagman,
'--do-rescue-from', '%i' % (options['do_rescue_from']),
'--outfile', options['outfile'],
'--sleep-time', '%i' % (options['sleep_time']),
'--max-jobs-queued', '%i' % (options['max_jobs_queued']),
'--max-jobs-submit', '%i' % (options['max_jobs_submit']),
'--submit-wait-time', '%i' % (options['submit_wait_time']),
]
if options['no_rescue']:
cmd.append('--no-rescue')
if options['use_proxy']:
cmd.append('--use-proxy')
if options['proxy_file']:
cmd.append('--proxy-file')
cmd.append('%s' % options['proxy_file'])
cmd.append(get_dag_file_rootname(options['dag_file']))

rc = 1
try:
with open(options['logfile'], 'a') as fd:
fd.write('\n========== %s ==========\n\n' % (dt.now().strftime("%Y-%m-%d %H:%M:%S")))
p = subprocess.Popen(cmd, stdout=fd, stderr=subprocess.STDOUT)
except IOError:
print('ERROR: Submission to slurm_dagman failed.')
traceback.print_exception(*sys.exc_info())
except Exception:
with open(options['logfile'], 'a') as fd:
traceback.print_exception(*sys.exc_info(), file=fd)
msg = 'ERROR: Submission to slurm_dagman failed.'
msg += '\nCheck %s' % (options['logfile'])
print(msg)
else:
msg = 'DAG submitted to slurm_dagman.'
msg += '\nProgress of the DAG is logged into %s' % (options['outfile'])
msg += '\nErrors in slurm_dagman are logged into %s' % (options['logfile'])
print(msg)
rc = 0

sys.exit(rc)
Loading

0 comments on commit 85e0d81

Please sign in to comment.