-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLSFManager.py
executable file
·310 lines (270 loc) · 10.5 KB
/
LSFManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
#!/usr/bin/env python3.5
"""
A script to manage LSF runs.
"""
import os
import sys
import time
import smtplib
import subprocess
from result_processor import process_folder
from DoCohResultProcessor import generate_run_filters, all_who_pass_run_filters
from RosettaFilter import score2dict
__author__ = 'jonathan'
global log
def main():
global log
original_pwd = os.getcwd()
running = get_running_folders()
pending = get_pending_folders()
run_filters = generate_run_filters(args={'ddg': 25.0, 'sasa': 1400, 'shape': 0.6, 'packstat': 0.6, 'buried_2': 3})
for runner in running:
os.chdir(runner)
if is_folder_finished(runner):
print('processing %s' % runner)
log += 'folder is finished, processing %s\n' % runner.split('/')[-2]
process_folder({'folder': runner, 'force_process': False, 'remove_pdbs': False})
move_to_processed(runner, running, 0)
else:
score_dict = folder_scores(runner)
passed, failed = all_who_pass_run_filters({}, score_dict, run_filters)
log += 'passed %i, failed %i' % (len(passed), len(failed))
if len(passed) >= 50:
print('found enough passed, stopping folder %s' % runner.split('/')[-1])
log += 'found enough passed, stopping folder\n'
bkill_folder(runner)
process_folder({'folder': runner, 'force_process': True, 'remove_pdbs': False})
move_to_processed(runner, running, len(passed))
else:
print('not enough finished, letting him be %s, found %i passed and %i failed' %
(runner.split('/')[-2], len(passed), len(failed)))
log += 'not enough finished, letting him be %s, found %i passed and %i failed\n' % \
(runner.split('/')[-2], len(passed), len(failed))
os.chdir(original_pwd)
# for pender in pending:
# os.chdir(pender)
# lsf_status, pends = how_many_queue()
# if lsf_status['fleishman'] < 12000:
# print('found %i jobs in fleishman, submitting %s' % (lsf_status['fleishman'], pender))
# log += 'found %i jobs in fleishman, submitting %s\n' % (lsf_status['fleishman'], pender)
# submit_folder(pender)
# move_pender_to_runner(pending, pender)
# os.chdir(original_pwd)
# lsf_status, pends = how_many_queue()
# if lsf_status['new-all.q'] <= 2000:
# bswitch_pends(pends, 2000-lsf_status['new-all.q'])
os.chdir(original_pwd)
def bswitch_pends(pends: list, num: int) -> None:
"""
:param pends: a list of pending jobs
:param num: number of jobs to switch
:return: None
"""
global log
log += 'switching %i jobs to new-all.q\n' % num
for pnd in pends[:num]:
os.system('bmod -q new-all.q %s' % pnd)
def how_many_queue() -> (dict, list):
"""
checks how many jobs are running in each queue
:return: {fleishman: #jobs, new-all.q: #jobs} and a list of job id that are pending in fleishman
"""
proc = subprocess.Popen(['bjobs', '-u', 'all'], stdout=subprocess.PIPE)
bjobs = proc.stdout.read()
results = {'fleishman': 0, 'new-all.q': 0}
pends = []
for l in str(bjobs).split('\\n'):
s = l.split()
if len(s) > 1:
if s[3] in ['fleishman', 'new-all.q']:
results[s[3]] += 1
if s[1] == 'jonatha' and s[2] == 'PEND' and s[3] == 'fleishman':
pends.append(s[0])
return results, pends
def move_to_processed(folder: str, running: list, passed: int) -> None:
"""
deletes folder form the running_folders list, and adds it to the processed list
:param folder: a folder address that finished being processed
:param running: a list of running folders
:return: None
"""
with open('/home/labs/fleishman/jonathaw/general_lists/processed_folders.txt', 'a') as fout:
fout.write('%s\t%i\n' % (folder, passed))
with open('/home/labs/fleishman/jonathaw/general_lists/running_folders.txt', 'w+') as fout:
for runner in running:
if runner != folder:
fout.write('%s\n' % runner)
def bkill_folder(folder: str) -> None:
"""
bkills all jobs that are from folder
:param folder: a folder address
:return: None
"""
global log
pwd = os.getcwd()
os.chdir(folder)
folder_jobs = [a for a in os.listdir(folder) if a[:4] == 'job.']
running_jobs = get_my_running_jobs()
if not running_jobs:
log += 'found NO running jobs...\n'
return
killed = 0
for folder_job in folder_jobs:
if folder_job[4:] in running_jobs.values():
os.system('bkill %s 1>2>/dev/null' % [k for k, v in running_jobs.items() if v == folder_job[4:]][0])
killed += 1
log += 'in %s KILLED %i jobs' % (folder.split('/')[-1], killed)
os.chdir(pwd)
def get_my_running_jobs() -> dict:
"""
returns a dict of all my running jobs
:return: {job_id: job_name}
"""
results = {}
proc = subprocess.Popen(['bjobs'], stdout=subprocess.PIPE)
bjobs = proc.stdout.read()
t = 0
for l in str(bjobs).split('\\n'):
s = l.split()
if len(s) < 4:
continue
if s[1] == 'jonatha' and s[0] != 'wexac':
try:
results[s[0]] = s[-4].split('.')[1]
t += 1
except:
pass
return results
def folder_scores(folder: str) -> dict:
"""
concatenates all the score files on the folder to one score dict
:param folder: a folder address
:return: {name: {filter: grade}} a score dict for the entire folder
"""
results = {}
score_files = [a for a in os.listdir(folder) if a[-3:] == '.sc']
for score in score_files:
results.update(score2dict(score))
return results
def is_folder_finished(folder: str) -> bool:
"""
checks if #outs == #jobs, if it is, checks if thei'r all finished
:param folder: a folder address
:return: True iff the folder finished running all jobs, and all of them finished
"""
global log
job_files = [a for a in os.listdir(folder) if a[:4] == 'job.']
out_files = [a for a in os.listdir(folder) if a[:4] == 'out.']
if len(job_files) != len(out_files):
log += 'in %s found %i outs, and %i jobs, not finished\n' % (folder.split('/')[-2], len(out_files),
len(job_files))
return False
for out in out_files:
with open(out, 'r') as fin:
cont = fin.read().split('\n')
if not any(['protocols.jd2.JobDistributor: no more batches to process...' in a for a in cont]):
log += '%s has enough outs, but NOT FINISHED\n' % folder.split('/')[-2]
return False
return True
def submit_folder(folder: str) -> None:
"""
submits jobs from folder
:param folder: a folder address
:return: None
"""
try:
os.system('sh %scommand' % folder)
except:
pass
def move_pender_to_runner(pending: list, pender: str) -> None:
"""
removes pender from the pending list, and places it in the running list
:param pending: list of folders pending for submission
:param pender: a pender that is being submitted
:return: None
"""
with open('/home/labs/fleishman/jonathaw/general_lists/pending_folders.txt', 'w') as fout:
for p in pending:
if p != pender:
fout.write('%s\n' % p)
with open('/home/labs/fleishman/jonathaw/general_lists/running_folders.txt', 'a') as fout:
fout.write('%s\n' % pender)
def get_running_folders() -> list:
"""
:return: list of folders that are currently running
"""
with open('/home/labs/fleishman/jonathaw/general_lists/running_folders.txt', 'r') as fin:
cont = fin.read().split('\n')
resutls = []
for l in cont:
if len(l) != 0:
resutls.append(l if l[-1] == '/' else l + '/')
return resutls
def get_pending_folders() -> list:
"""
:return: a list of folders that need to run
"""
with open('/home/labs/fleishman/jonathaw/general_lists/pending_folders.txt', 'r') as fin:
cont = fin.read().split('\n')
resutls = []
for l in cont:
if len(l) != 0:
resutls.append(l if l[-1] == '/' else l + '/')
return resutls
def lists_status() -> None:
"""
prints how many folders are in which list
:return: None
"""
global log
for kind in ['pending', 'processed', 'running']:
num_lines = sum(1 for line in open('/home/labs/fleishman/jonathaw/general_lists/%s_folders.txt' % kind, 'r'))
log += 'found %i lines in %s\n' % (num_lines, kind)
num_lines = sum(1 for line in open('/home/labs/fleishman/jonathaw/general_lists/switched_jobs.txt', 'r'))
log += 'found %i lines in switched_jobs\n' % num_lines
def am_i_running() -> bool:
"""
:return: whether the .cron_script_running file exists or not
"""
return os.path.isfile('/home/labs/fleishman/jonathaw/.cron_script_running')
def set_as_running() -> None:
"""
:return: None. creates the .cron_script_running file
"""
os.mknod('/home/labs/fleishman/jonathaw/.cron_script_running')
def set_as_not_running() -> None:
"""
:return: None. removes the .cron_script_running
"""
os.remove('/home/labs/fleishman/jonathaw/.cron_script_running')
def should_i_run() -> bool:
return os.path.isfile('/home/labs/fleishman/jonathaw/.run_lsf_manager')
if __name__ == '__main__':
global log
sender = '[email protected]'
smtpObj = smtplib.SMTP('localhost')
while should_i_run():
set_as_running()
log = ("From: LSFManager <[email protected]>\n"
"To: Me <[email protected]>\n"
"Subject: LSFManager Report\n")
print('starting run!!!', time.ctime())
main()
print('finished run, sending email')
smtpObj.sendmail(sender, [sender], log)
set_as_not_running()
for i in range(61):
sys.stdout.write('\r')
sys.stdout.write("[%-60s] %d%%" % ('='*i, 100./60.*float(i)))
sys.stdout.flush()
time.sleep(10)
print('\n')
print('found I should not run anymore...')
smtpObj.sendmail(sender, [sender], 'found I should not run anymore...\n')
# if not am_i_running():
# print('cron is not running, so now it is')
# set_as_running()
# main()
# set_as_not_running()
# else:
# print('cron is still running...')