Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix job restart #523

Merged
merged 3 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions roles/cmdsrv/files/iiab-cmdsrv3.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ def job_minder_thread(client_url, worker_control_url, context=None):
jobs_requested_done = [] # list of processed job_id from jobs_requested
prereq_jobs_to_clear = [] # prerequisite jobs whose descendent has been processed
jobs_to_close = [] # list of running jobs that have completed or been cancelled
prereq_info = {}

# control signals from main thread
context = context or zmq.Context.instance()
Expand All @@ -313,8 +312,7 @@ def job_minder_thread(client_url, worker_control_url, context=None):
for job_id in jobs_to_restart:
job_info = jobs_to_restart[job_id]
if job_info['has_dependent'] == "Y":
prereq_info ['status'] = 'STARTED' # for both 'STARTED' and 'RESTARTED'
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {'status': 'STARTED'} # for both 'STARTED' and 'RESTARTED'
else:
add_wip(job_info)
job_info = start_job(job_id, job_info, status='RESTARTED')
Expand All @@ -325,7 +323,6 @@ def job_minder_thread(client_url, worker_control_url, context=None):
thread_run = True

# Main loop of IIAB-CMDSRV

while thread_run == True:
lock.acquire() # will block if lock is already held
try:
Expand Down Expand Up @@ -365,11 +362,9 @@ def job_minder_thread(client_url, worker_control_url, context=None):
#print "starting prereq check"
# don't start job if it depends on another job that is not finished
depend_on_job_id = job_info['depend_on_job_id']

if depend_on_job_id in prereq_jobs:
prereq_status = prereq_jobs[depend_on_job_id]['status']

if prereq_status == 'SCHEDULED' or prereq_status == 'STARTED':
if prereq_status in ('SCHEDULED', 'STARTED'):
continue # successor step can't start yet
else:
if prereq_status == 'SUCCEEDED':
Expand All @@ -395,6 +390,7 @@ def job_minder_thread(client_url, worker_control_url, context=None):
jobs_requested_done.append(job_id)

#print 'starting clear'
# WRONG prereq_jobs_to_clear = []

# Clear started or cancelled jobs from requested queue and prereq dict
for job_id in jobs_requested_done:
Expand All @@ -414,7 +410,6 @@ def job_minder_thread(client_url, worker_control_url, context=None):

jobs_running[job_id]['subproc'].poll()
returncode = jobs_running[job_id]['subproc'].returncode

if returncode == None:
tprint (str(job_id) + ' still running.')

Expand Down Expand Up @@ -611,7 +606,7 @@ def end_job(job_id, job_info, status): # modify to use tail of job_output

job_output = job_output.encode('ascii', 'replace').decode()

#print(job_id)
# print(job_id)
tprint(job_output)
jobs_running[job_id]['job_output'] = job_output

Expand Down Expand Up @@ -722,6 +717,7 @@ def cmd_handler(cmd_msg):

avail_cmds = {
"TEST": {"funct": do_test, "inet_req": False},
"TEST-JOB": {"funct": do_test_job, "inet_req": False},
"LIST-LIBR": {"funct": list_library, "inet_req": False},
"WGET": {"funct": wget_file, "inet_req": True},
"GET-ADM-CONF": {"funct": get_adm_conf, "inet_req": False},
Expand Down Expand Up @@ -884,6 +880,23 @@ def do_test(cmd_info):
json_outp = json_array("TEST",outp)
return (json_outp)

def do_test_job(cmd_info):
if 'cmd_args' in cmd_info:
steps = cmd_info['cmd_args']['steps']
else:
return cmd_malformed(cmd_info['cmd'])

step_no = 1
job_id = -1
for step_sleep in steps:
job_command = "scripts/test-job.sh " + str(step_sleep)
if step_no < len(steps):
job_id = request_one_job(cmd_info, job_command, step_no, job_id, "Y")
else:
resp = request_job(cmd_info=cmd_info, job_command=job_command, cmd_step_no=step_no, depend_on_job_id=job_id, has_dependent="N")
step_no += 1
return resp

def list_library(cmd_info):
libr_list = {}
file_list = []
Expand Down Expand Up @@ -2980,7 +2993,6 @@ def request_one_job(cmd_info, job_command, cmd_step_no, depend_on_job_id, has_de
global jobs_requested
global prereq_jobs

prereq_info = {}
job_id = get_job_id()

job_info = {}
Expand Down Expand Up @@ -3009,8 +3021,8 @@ def request_one_job(cmd_info, job_command, cmd_step_no, depend_on_job_id, has_de
insert_job(job_id, cmd_info['cmd_rowid'], job_command, opt_args_json, cmd_step_no, depend_on_job_id, has_dependent)

if has_dependent == "Y":
prereq_info ['status'] = job_info['status']
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {}
prereq_jobs[job_id]['status'] = job_info['status']

if cmd_step_no == 1:
add_wip(job_info)
Expand Down Expand Up @@ -3566,9 +3578,9 @@ def read_maps_catalog():
def get_incomplete_jobs():
global jobs_requested
global jobs_to_restart
global prereq_jobs

jobs_to_cancel = {}
prereq_info = {}

# calculate boot time so we can tell if pid is ours
with open('/proc/uptime', 'r') as f:
Expand Down Expand Up @@ -3633,12 +3645,12 @@ def get_incomplete_jobs():

# only restart if we haven't already seen this command
# we assume that we can always use the highest numbered job for a given command
# do we need to populate prereq_jobs since job_minder does it?
if job_command != last_command:
if job_status == 'SCHEDULED':
jobs_requested[job_id] = job_info
if has_dependent == "Y":
prereq_info ['status'] = 'SCHEDULED'
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {'status': 'SCHEDULED'}
else:
jobs_to_restart[job_id] = job_info
# Add to active_commands
Expand All @@ -3660,8 +3672,7 @@ def get_incomplete_jobs():
for job_id in jobs_to_cancel:
upd_job_cancelled(job_id)
if jobs_to_cancel[job_id]['has_dependent'] == "Y":
prereq_info ['status'] = 'CANCELLED'
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {'status': 'CANCELLED'}

# fix up prereq_jobs with status of completed prereq jobs, not selected in the previous query
conn = sqlite3.connect(cmdsrv_dbpath)
Expand All @@ -3670,8 +3681,8 @@ def get_incomplete_jobs():
for row in cur.fetchall():
job_id, job_status = row
if not job_id in prereq_jobs:
prereq_info ['status'] = job_status
prereq_jobs[job_id] = prereq_info
prereq_jobs[job_id] = {}
prereq_jobs[job_id]['status'] = job_status
cur.close()
conn.close()

Expand Down
2 changes: 2 additions & 0 deletions roles/cmdsrv/files/scripts/test-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
sleep $1