From de26a0f6a4401cb05bbb40cb83ce737bc921dabe Mon Sep 17 00:00:00 2001 From: Alex Schuy Date: Mon, 20 Nov 2017 23:19:12 -0800 Subject: [PATCH] Changes client.py such that the location of reco.py is transmitted to the clients, as opposed to hard-coding it into constants.py, thereby avoiding specifying the location of the Gen_Reco repository. --- client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client.py b/client.py index ac33c43..e8349a6 100644 --- a/client.py +++ b/client.py @@ -13,9 +13,9 @@ import subprocess32 import sys -def compute(version_number, max_events, skip_events, event_file, log_dir, tmp_dir, aod_dir, job_id): +def compute(version_number, max_events, skip_events, event_file, log_dir, tmp_dir, aod_dir, reco, job_id): ''' Runs reco.py with the given parameters. ''' - import subprocess32, shutil, constants, socket, os + import subprocess32, shutil, socket, os log_dir = os.path.join(log_dir, 'job{:0>4}'.format(job_id)) tmp_dir = os.path.join(tmp_dir, 'job{:0>4}'.format(job_id)) try: @@ -24,7 +24,7 @@ def compute(version_number, max_events, skip_events, event_file, log_dir, tmp_di except OSError as e: print(e) athena_log = os.path.join(log_dir, 'athena.log') - arg = 'nice python {} -n {} -s {} --log_file {} --tmp_dir {} --output_dir {} {} {}'.format(constants.reco, max_events, skip_events, athena_log, tmp_dir, aod_dir, event_file, version_number) + arg = 'nice python {} -n {} -s {} --log_file {} --tmp_dir {} --output_dir {} {} {}'.format(reco, max_events, skip_events, athena_log, tmp_dir, aod_dir, event_file, version_number) with open(os.path.join(log_dir, 'reco.log'), 'w+') as fh: subprocess32.check_call(arg, executable='/bin/bash', shell=True, stdout=fh, stderr=subprocess32.STDOUT) try: @@ -52,6 +52,7 @@ def get_job_args(batch_size, evnt_dir, log_dir, tmp_dir, output_dir): print('\t[{}] {}'.format(ii+1, evnt_file)) job_id = 0 job_args = [] + reco = os.path.join(os.getcwd(), 'reco.py') for evnt_file in evnt_files: rel_evnt_file = string.split(evnt_file, sep='/')[-1] run_number, evnt_batch_size, evnt_batch_number = string.split(rel_evnt_file, sep='.')[:3] @@ -61,7 +62,7 @@ def get_job_args(batch_size, evnt_dir, log_dir, tmp_dir, output_dir): for _ in range(int(math.ceil(evnt_batch_size / batch_size))): job_batch_size = min(batch_size, evnt_batch_size - job_skip_events) job_version_number = "{}.{}.{}".format(evnt_version, job_batch_size, job_skip_events) - job_args.append((job_version_number, job_batch_size, job_skip_events, evnt_file, log_dir, tmp_dir, output_dir, job_id)) + job_args.append((job_version_number, job_batch_size, job_skip_events, evnt_file, log_dir, tmp_dir, output_dir, reco, job_id)) job_skip_events += job_batch_size job_id += 1 with open(os.path.join(tmp_dir, 'jobs.args'), 'wb+') as fj_handle: @@ -87,7 +88,6 @@ def check_jobs(jobs, job_args, tmp_dir, timestamp): def dispatch_computations(job_args, tmp_dir, timestamp): client = distributed.Client('localhost:8786') - client.upload_file('constants.py') webbrowser.open('http://localhost:8787') jobs = [] for job_arg in job_args: @@ -143,7 +143,7 @@ def main(): job_args = get_job_args(args.batch_size, args.evnt_dir, log_dir, tmp_dir, aod_dir) if args.test: for job_arg in job_args: - job_version_number, job_batch_size, job_skip_events, evnt_file, _, _, _, job_id = job_arg + job_version_number, job_batch_size, job_skip_events, evnt_file, _, _, _, _, job_id = job_arg print("[client]: job {:0>4}: version={}, batch_size={}, skip_events={}, evnt_file={}".format(job_id, job_version_number, job_batch_size, job_skip_events, evnt_file)) else: dispatch_computations(job_args, tmp_dir, timestamp)