diff --git a/rd_server.py b/rd_server.py index 9d19e2c..d553c07 100755 --- a/rd_server.py +++ b/rd_server.py @@ -103,10 +103,7 @@ def run(self): run.rundir = config['runs'] + '/' + run_id run.log = log_file run.set = info['task'] - if 'arch' in info: - run.arch = info['arch'] - else: - run.arch = 'x86_64' + run.arch = info.get('arch', 'x86_64') run.bindir = run.rundir + '/' + run.arch + '/' run.prefix = run.rundir + '/' + run.set try: @@ -209,6 +206,8 @@ def main(): global machines global slots global args + global arches + arches = set() parser = argparse.ArgumentParser(description='Run AWCY scheduler daemon.') parser.add_argument('-machineconf') parser.add_argument('-port',default=4000) @@ -218,7 +217,10 @@ def main(): if args.machineconf: machineconf = json.load(open(args.machineconf, 'r')) for m in machineconf: - machines.append(sshslot.Machine(m['host'],m['user'],m['cores'],m['work_root'],str(m['port']),m['media_path'])) + if 'arch' not in m: + m['arch'] = 'x86_64' + machines.append(sshslot.Machine(m['host'],m['user'],m['cores'],m['work_root'],str(m['port']),m['media_path'],m['arch'])) + arches.add(m['arch']) for machine in machines: slots.extend(machine.get_slots()) free_slots.extend(reversed(slots)) @@ -249,6 +251,7 @@ def machine_allocator_tick(): global machines global work_list global run_list + global arches # start all machines if we don't have any but have work queued if len(work_list) and not len(machines): rd_print(None, "Starting machines.") @@ -338,21 +341,25 @@ def scheduler_tick(): slot.clear_work() free_slots.append(slot) # fill empty slots with new work - if len(work_list) != 0: - if len(free_slots) != 0: - slot = free_slots.pop() - work = work_list[0] - # search for image work if there is only one slot available - # allows prioritizing image runs without making scheduler the bottleneck - if len(free_slots) == 0: - try: - work = find_image_work(work_list, work) - except Exception as e: - rd_print(None, e) - rd_print(None, 'Finding image work failed.') - work_list.remove(work) - rd_print(work.log,'Encoding',work.get_name(),'on',slot.machine.host) - slot.start_work(work) + for arch in arches: + free_slots_filtered = [slot for slot in free_slots if slot.arch == arch] + work_list_filtered = [work for work in work_list if work.arch == arch] + if len(work_list_filtered) != 0: + if len(free_slots_filtered) != 0: + slot = free_slots_filtered.pop() + free_slots.remove(slot) + work = work_list_filtered[0] + # search for image work if there is only one slot available + # allows prioritizing image runs without making scheduler the bottleneck + if len(free_slots_filtered) == 0: + try: + work = find_image_work(work_list_filtered, work) + except Exception as e: + rd_print(None, e) + rd_print(None, 'Finding image work failed.') + work_list.remove(work) + rd_print(work.log,'Encoding',work.get_name(),'on',slot.machine.host) + slot.start_work(work) # find runs where all work has been completed for run in run_list: done = True diff --git a/sshslot.py b/sshslot.py index 8fdae97..c7f5a6d 100644 --- a/sshslot.py +++ b/sshslot.py @@ -41,7 +41,7 @@ def shellquote(s): return "'" + s.replace("'", "'\"'\"'") + "'" class Machine: - def __init__(self,host,user='ec2-user',cores=18,work_root='/home/ec2-user',port=22,media_path='/mnt/media'): + def __init__(self,host,user='ec2-user',cores=18,work_root='/home/ec2-user',port=22,media_path='/mnt/media',arch='x86_64'): self.host = host self.user = user self.cores = cores @@ -49,6 +49,7 @@ def __init__(self,host,user='ec2-user',cores=18,work_root='/home/ec2-user',port= self.port = str(port) self.media_path = media_path self.log = None + self.arch = arch self.slots = [] def rsync(self, local, remote): return subprocess.call(['rsync', '-r', '-e', "ssh -i "+ssh_privkey_file+" -o StrictHostKeyChecking=no -p "+str(self.port), local, self.user + '@' + self.host + ':' + remote]) @@ -102,6 +103,7 @@ def __init__(self, machine, num, log): self.work = None self.log = log self.can_kill = None + self.arch = machine.arch def gather(self): return self.p.communicate() def start_work(self, work): diff --git a/work.py b/work.py index 1fbb515..04d31f5 100644 --- a/work.py +++ b/work.py @@ -266,6 +266,7 @@ def create_rdwork(run, video_filenames): work.set = run.set work.filename = filename work.extra_options = run.extra_options + work.arch = run.arch if run.save_encode: work.no_delete = True if work.codec == 'av1' or work.codec == 'av1-rt' or work.codec == 'rav1e' or work.codec == 'svt-av1':