-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_batch_jobs.py
138 lines (123 loc) · 5.86 KB
/
run_batch_jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
" A utility for launching groups of AWS Batch Jobs"
# stdlib imports
import argparse
import os
import sys
import json
import datetime
# third-party imports
import boto3
def main(): # pylint: disable=too-many-locals, too-many-branches, too-many-statements
"do the work"
now = datetime.datetime.now()
timestamp = now.strftime("%Y%m%d%H%M%S")
user = os.getenv("USER")
batch = boto3.client("batch")
description = ["Start a set of AWS Batch jobs.",
"See full documentation at",
"http://bit.ly/HutchBatchDocs/#submitting-with-the-run_batch_jobs-utility"]
# Handle command-line arguments
parser = argparse.ArgumentParser(description="\n".join(description),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.prog = parser.prog.replace(".py", "")
parser.add_argument("-q", "--queue", default="medium",
help="Queue Name")
parser.add_argument("-d", "--jobdef", default="hello:2",
help="Job definition name:version.")
parser.add_argument("-n", "--numjobs", default=1, type=int,
help="Number of jobs to run.")
parser.add_argument("--name", default="sample_job",
help="Name of job group (your username and job number will be injected).")
parser.add_argument("-f", "--func",
help="\n".join(["Python path to a function to customize job,",
"see link above for full docs. Example: myscript.myfunc"]))
parser.add_argument("-j", "--json", action='store_true',
help="\n".join(["Output JSON instead of starting jobs. JSON can be used",
"with `aws batch submit-job --cli-input-json`.",
"Makes most sense with `--numjobs 1`."]))
parser.add_argument("-x", "--cpus",
help="Number of CPUs, if overriding value in job defnition.")
parser.add_argument("-m", "--memory",
help="GB of memory, if overriding value in job definition.")
parser.add_argument("-p", "--parameters",
help="\n".join(["Parameters to replace placeholders in job definition.",
"Format as a single-quoted JSON list of",
"objects/dictionaries."]))
parser.add_argument("-a", "--attempts",
help="Number of retry attempts, if overriding job definition.")
parser.add_argument("-c", "--command",
help="\n".join(["Command, if overriding job definition. Example:",
# TODO some kind of interpolation here?
'\'["echo", "hello world"]\'']))
parser.add_argument("-e", "--environment",
help="\n".join(["Environment to replace placeholders in job definition.",
"Format as a single-quoted JSON object/dictionary."]))
args = parser.parse_args()
# Fill in job template
container_overrides = {}
# is all this necessary if we use argument groups?
if args.cpus:
container_overrides['vcpus'] = args.cpus
if args.memory:
container_overrides['memory'] = args.memory
if args.command:
container_overrides['command'] = args.command
container_overrides['environment'] = []
container_overrides['environment'].append(dict(name='JOB_GROUP_NAME',
value=args.name))
container_overrides['environment'].append(dict(name='JOB_GROUP_USER',
value=user))
if args.environment:
try:
env = json.loads(args.environment)
except json.JSONDecodeError:
print("Environment argument is not properly formatted JSON!")
sys.exit(1)
if not env.__class__ == list: # could do further checking here....
print("Environment argument is not a JSON list!")
sys.exit(1)
container_overrides['environment'].extend(env)
# TODO make sure job name is valid length & has no invalid characters
template = dict(jobName=args.name, jobQueue=args.queue, jobDefinition=args.jobdef,
containerOverrides=container_overrides)
if args.attempts:
template['retryStrategy'] = {'attempts': args.attempts}
if args.parameters:
try:
params = json.loads(args.parameters)
except json.JSONDecodeError:
print("Parameters argument is not properly formatted JSON!")
sys.exit(1)
if not params.__class__ == dict:
print("Parameters object is not a JSON object/dictionary!")
sys.exit(1)
template['parameters'] = params
# print(args)
func = None
if args.func:
module_dir = os.path.abspath(os.path.dirname(args.func))
segs = os.path.basename(args.func).replace(".py", "").split(".")
module_name = segs[0]
func_name = segs[1]
sys.path.append(module_dir)
module = __import__(module_name)
func = getattr(module, func_name)
jobs = []
for iteration in range(1, args.numjobs+1):
job_template = template
job_template['jobName'] = "{}-{}-{}".format(user, args.name, iteration)
if func:
job_template = func(job_template, iteration)
if args.json:
jobs.append(job_template)
else:
job = batch.submit_job(**job_template)
del job['ResponseMetadata']
jobs.append(job)
if args.json and len(jobs) == 1: # print output suitable for aws cli
print(json.dumps(jobs[0], indent=4))
else:
print(json.dumps(jobs, indent=4))
if __name__ == "__main__":
main()