Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatch jobs of different architectures onto matching machines. #119

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions rd_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ def run(self):
run.rundir = config['runs'] + '/' + run_id
run.log = log_file
run.set = info['task']
if 'arch' in info:
run.arch = info['arch']
else:
run.arch = 'x86_64'
run.arch = info.get('arch', 'x86_64')
run.bindir = run.rundir + '/' + run.arch + '/'
run.prefix = run.rundir + '/' + run.set
try:
Expand Down Expand Up @@ -209,6 +206,8 @@ def main():
global machines
global slots
global args
global arches
arches = set()
parser = argparse.ArgumentParser(description='Run AWCY scheduler daemon.')
parser.add_argument('-machineconf')
parser.add_argument('-port',default=4000)
Expand All @@ -218,7 +217,10 @@ def main():
if args.machineconf:
machineconf = json.load(open(args.machineconf, 'r'))
for m in machineconf:
machines.append(sshslot.Machine(m['host'],m['user'],m['cores'],m['work_root'],str(m['port']),m['media_path']))
if 'arch' not in m:
m['arch'] = 'x86_64'
machines.append(sshslot.Machine(m['host'],m['user'],m['cores'],m['work_root'],str(m['port']),m['media_path'],m['arch']))
arches.add(m['arch'])
for machine in machines:
slots.extend(machine.get_slots())
free_slots.extend(reversed(slots))
Expand Down Expand Up @@ -249,6 +251,7 @@ def machine_allocator_tick():
global machines
global work_list
global run_list
global arches
# start all machines if we don't have any but have work queued
if len(work_list) and not len(machines):
rd_print(None, "Starting machines.")
Expand Down Expand Up @@ -338,21 +341,25 @@ def scheduler_tick():
slot.clear_work()
free_slots.append(slot)
# fill empty slots with new work
if len(work_list) != 0:
if len(free_slots) != 0:
slot = free_slots.pop()
work = work_list[0]
# search for image work if there is only one slot available
# allows prioritizing image runs without making scheduler the bottleneck
if len(free_slots) == 0:
try:
work = find_image_work(work_list, work)
except Exception as e:
rd_print(None, e)
rd_print(None, 'Finding image work failed.')
work_list.remove(work)
rd_print(work.log,'Encoding',work.get_name(),'on',slot.machine.host)
slot.start_work(work)
for arch in arches:
free_slots_filtered = [slot for slot in free_slots if slot.arch == arch]
work_list_filtered = [work for work in work_list if work.arch == arch]
if len(work_list_filtered) != 0:
if len(free_slots_filtered) != 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: PEP-8 says you should just do:

if work_list_filtered:
   if free_slots_filtered:

slot = free_slots_filtered.pop()
free_slots.remove(slot)
work = work_list_filtered[0]
# search for image work if there is only one slot available
# allows prioritizing image runs without making scheduler the bottleneck
if len(free_slots_filtered) == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

try:
work = find_image_work(work_list_filtered, work)
except Exception as e:
rd_print(None, e)
rd_print(None, 'Finding image work failed.')
work_list.remove(work)
rd_print(work.log,'Encoding',work.get_name(),'on',slot.machine.host)
slot.start_work(work)
# find runs where all work has been completed
for run in run_list:
done = True
Expand Down
4 changes: 3 additions & 1 deletion sshslot.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ def shellquote(s):
return "'" + s.replace("'", "'\"'\"'") + "'"

class Machine:
def __init__(self,host,user='ec2-user',cores=18,work_root='/home/ec2-user',port=22,media_path='/mnt/media'):
def __init__(self,host,user='ec2-user',cores=18,work_root='/home/ec2-user',port=22,media_path='/mnt/media',arch='x86_64'):
self.host = host
self.user = user
self.cores = cores
self.work_root = work_root
self.port = str(port)
self.media_path = media_path
self.log = None
self.arch = arch
self.slots = []
def rsync(self, local, remote):
return subprocess.call(['rsync', '-r', '-e', "ssh -i "+ssh_privkey_file+" -o StrictHostKeyChecking=no -p "+str(self.port), local, self.user + '@' + self.host + ':' + remote])
Expand Down Expand Up @@ -102,6 +103,7 @@ def __init__(self, machine, num, log):
self.work = None
self.log = log
self.can_kill = None
self.arch = machine.arch
def gather(self):
return self.p.communicate()
def start_work(self, work):
Expand Down
1 change: 1 addition & 0 deletions work.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ def create_rdwork(run, video_filenames):
work.set = run.set
work.filename = filename
work.extra_options = run.extra_options
work.arch = run.arch
if run.save_encode:
work.no_delete = True
if work.codec == 'av1' or work.codec == 'av1-rt' or work.codec == 'rav1e' or work.codec == 'svt-av1':
Expand Down