-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun-wasserman-lab-scheduled-jobs.py
114 lines (87 loc) · 3.44 KB
/
run-wasserman-lab-scheduled-jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import os
import schedule
import time
import subprocess
import logging
import datetime
import signal
import argparse
parser = argparse.ArgumentParser(description='Check for -slurm flag')
parser.add_argument('-slurm', action='store_true', help='a flag to indicate if the script is run with slurm')
args = parser.parse_args()
use_slurm = args.slurm
if use_slurm:
print("scripts will be run using slurm")
else:
print("scripts will not be run using slurm")
script_file = os.path.abspath(__file__)
print(f"script_file: {script_file}")
script_dir = os.path.dirname(script_file)
print(f"script_dir: {script_dir}")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.FileHandler(os.path.join(script_dir, 'output.log'))
handler.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(message)s')
handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(handler)
logger.addHandler(console_handler)
# Function to execute the given shell script
def execute_script(script_path):
try:
if use_slurm:
subprocess.run(['sbatch', script_path], check=True, cwd=os.path.join(script_dir, 'workspace'))
else:
subprocess.run(['bash', script_path], check=True, cwd=os.path.join(script_dir, 'workspace'))
# logging.info(f'Successfully executed {script_path}')
except subprocess.CalledProcessError as e:
logging.error(f'Execution of {script_path} failed with error: {e}')
already_scheduled = {}
def scan_and_schedule(folder, day_of_month='1', day_of_week='monday', time_of_day='12:00'):
full_path = os.path.join(script_dir, folder)
for file in os.listdir(full_path):
if file.endswith('.sh'):
script_path = os.path.join(full_path, file)
if script_path not in already_scheduled:
job = None
if folder == 'daily':
job = schedule.every().day.at(time_of_day).do(execute_script, script_path)
elif folder == 'weekly':
job = schedule.every(1).weeks.monday.at(time_of_day).do(execute_script, script_path)
elif folder == 'monthly':
job = schedule.every(4).weeks.do(execute_script, script_path)
already_scheduled[script_path] = job
logger.info(f'Scheduled {script_path}')
def check_for_deleted_scripts():
for script_path in list(already_scheduled.keys()):
if not os.path.exists(script_path):
logging.info(f"Cancel {script_path}")
schedule.cancel_job(already_scheduled[script_path])
del already_scheduled[script_path]
start_time = datetime.datetime.now()
logging.info(f"starting scheduler")
Running = True
def exit_handler(signal=None, stack_frame=None):
global Running
now = datetime.datetime.now()
ran_for = now - start_time
logger.info(f"Exiting scheduler. It ran for this long: {ran_for}")
Running = False
quit()
signal.signal(signal.SIGTERM, exit_handler)
signal.signal(signal.SIGINT, exit_handler)
try:
while Running:
scan_and_schedule('daily')
scan_and_schedule('weekly')
scan_and_schedule('monthly')
schedule.run_pending()
check_for_deleted_scripts()
seconds_in_day = 24 * 60 * 60
time.sleep(seconds_in_day)
except Exception as e:
logging.error(e)
pass