Skip to content

Commit

Permalink
Merge pull request #523 from tim-moody/0.6.1-misc
Browse files Browse the repository at this point in the history
fix job restart
  • Loading branch information
tim-moody authored Feb 10, 2023
2 parents fb00fe8 + 6006ca4 commit 600cfb6
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
49 changes: 30 additions & 19 deletions roles/cmdsrv/files/iiab-cmdsrv3.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ def job_minder_thread(client_url, worker_control_url, context=None):
jobs_requested_done = [] # list of processed job_id from jobs_requested
prereq_jobs_to_clear = [] # prerequisite jobs whose descendent has been processed
jobs_to_close = [] # list of running jobs that have completed or been cancelled
prereq_info = {}

# control signals from main thread
context = context or zmq.Context.instance()
Expand All @@ -313,8 +312,7 @@ def job_minder_thread(client_url, worker_control_url, context=None):
for job_id in jobs_to_restart:
job_info = jobs_to_restart[job_id]
if job_info['has_dependent'] == "Y":
prereq_info ['status'] = 'STARTED' # for both 'STARTED' and 'RESTARTED'
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {'status': 'STARTED'} # for both 'STARTED' and 'RESTARTED'
else:
add_wip(job_info)
job_info = start_job(job_id, job_info, status='RESTARTED')
Expand All @@ -325,7 +323,6 @@ def job_minder_thread(client_url, worker_control_url, context=None):
thread_run = True

# Main loop of IIAB-CMDSRV

while thread_run == True:
lock.acquire() # will block if lock is already held
try:
Expand Down Expand Up @@ -365,11 +362,9 @@ def job_minder_thread(client_url, worker_control_url, context=None):
#print "starting prereq check"
# don't start job if it depends on another job that is not finished
depend_on_job_id = job_info['depend_on_job_id']

if depend_on_job_id in prereq_jobs:
prereq_status = prereq_jobs[depend_on_job_id]['status']

if prereq_status == 'SCHEDULED' or prereq_status == 'STARTED':
if prereq_status in ('SCHEDULED', 'STARTED'):
continue # successor step can't start yet
else:
if prereq_status == 'SUCCEEDED':
Expand All @@ -395,6 +390,7 @@ def job_minder_thread(client_url, worker_control_url, context=None):
jobs_requested_done.append(job_id)

#print 'starting clear'
# WRONG prereq_jobs_to_clear = []

# Clear started or cancelled jobs from requested queue and prereq dict
for job_id in jobs_requested_done:
Expand All @@ -414,7 +410,6 @@ def job_minder_thread(client_url, worker_control_url, context=None):

jobs_running[job_id]['subproc'].poll()
returncode = jobs_running[job_id]['subproc'].returncode

if returncode == None:
tprint (str(job_id) + ' still running.')

Expand Down Expand Up @@ -611,7 +606,7 @@ def end_job(job_id, job_info, status): # modify to use tail of job_output

job_output = job_output.encode('ascii', 'replace').decode()

#print(job_id)
# print(job_id)
tprint(job_output)
jobs_running[job_id]['job_output'] = job_output

Expand Down Expand Up @@ -722,6 +717,7 @@ def cmd_handler(cmd_msg):

avail_cmds = {
"TEST": {"funct": do_test, "inet_req": False},
"TEST-JOB": {"funct": do_test_job, "inet_req": False},
"LIST-LIBR": {"funct": list_library, "inet_req": False},
"WGET": {"funct": wget_file, "inet_req": True},
"GET-ADM-CONF": {"funct": get_adm_conf, "inet_req": False},
Expand Down Expand Up @@ -884,6 +880,23 @@ def do_test(cmd_info):
json_outp = json_array("TEST",outp)
return (json_outp)

def do_test_job(cmd_info):
if 'cmd_args' in cmd_info:
steps = cmd_info['cmd_args']['steps']
else:
return cmd_malformed(cmd_info['cmd'])

step_no = 1
job_id = -1
for step_sleep in steps:
job_command = "scripts/test-job.sh " + str(step_sleep)
if step_no < len(steps):
job_id = request_one_job(cmd_info, job_command, step_no, job_id, "Y")
else:
resp = request_job(cmd_info=cmd_info, job_command=job_command, cmd_step_no=step_no, depend_on_job_id=job_id, has_dependent="N")
step_no += 1
return resp

def list_library(cmd_info):
libr_list = {}
file_list = []
Expand Down Expand Up @@ -2980,7 +2993,6 @@ def request_one_job(cmd_info, job_command, cmd_step_no, depend_on_job_id, has_de
global jobs_requested
global prereq_jobs

prereq_info = {}
job_id = get_job_id()

job_info = {}
Expand Down Expand Up @@ -3009,8 +3021,8 @@ def request_one_job(cmd_info, job_command, cmd_step_no, depend_on_job_id, has_de
insert_job(job_id, cmd_info['cmd_rowid'], job_command, opt_args_json, cmd_step_no, depend_on_job_id, has_dependent)

if has_dependent == "Y":
prereq_info ['status'] = job_info['status']
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {}
prereq_jobs[job_id]['status'] = job_info['status']

if cmd_step_no == 1:
add_wip(job_info)
Expand Down Expand Up @@ -3566,9 +3578,9 @@ def read_maps_catalog():
def get_incomplete_jobs():
global jobs_requested
global jobs_to_restart
global prereq_jobs

jobs_to_cancel = {}
prereq_info = {}

# calculate boot time so we can tell if pid is ours
with open('/proc/uptime', 'r') as f:
Expand Down Expand Up @@ -3633,12 +3645,12 @@ def get_incomplete_jobs():

# only restart if we haven't already seen this command
# we assume that we can always use the highest numbered job for a given command
# do we need to populate prereq_jobs since job_minder does it?
if job_command != last_command:
if job_status == 'SCHEDULED':
jobs_requested[job_id] = job_info
if has_dependent == "Y":
prereq_info ['status'] = 'SCHEDULED'
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {'status': 'SCHEDULED'}
else:
jobs_to_restart[job_id] = job_info
# Add to active_commands
Expand All @@ -3660,8 +3672,7 @@ def get_incomplete_jobs():
for job_id in jobs_to_cancel:
upd_job_cancelled(job_id)
if jobs_to_cancel[job_id]['has_dependent'] == "Y":
prereq_info ['status'] = 'CANCELLED'
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {'status': 'CANCELLED'}

# fix up prereq_jobs with status of completed prereq jobs, not selected in the previous query
conn = sqlite3.connect(cmdsrv_dbpath)
Expand All @@ -3670,8 +3681,8 @@ def get_incomplete_jobs():
for row in cur.fetchall():
job_id, job_status = row
if not job_id in prereq_jobs:
prereq_info ['status'] = job_status
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {}
prereq_jobs[job_id]['status'] = job_status
cur.close()
conn.close()

Expand Down
2 changes: 2 additions & 0 deletions roles/cmdsrv/files/scripts/test-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
sleep $1

0 comments on commit 600cfb6

Please sign in to comment.