Skip to content

Commit

Permalink
Changes client.py such that the location of reco.py is transmitted to…
Browse files Browse the repository at this point in the history
… the clients, as opposed to hard-coding it into constants.py, thereby avoiding specifying the location of the Gen_Reco repository.
  • Loading branch information
AlexSchuy committed Nov 21, 2017
1 parent 07a7ecf commit de26a0f
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import subprocess32
import sys

def compute(version_number, max_events, skip_events, event_file, log_dir, tmp_dir, aod_dir, job_id):
def compute(version_number, max_events, skip_events, event_file, log_dir, tmp_dir, aod_dir, reco, job_id):
''' Runs reco.py with the given parameters. '''
import subprocess32, shutil, constants, socket, os
import subprocess32, shutil, socket, os
log_dir = os.path.join(log_dir, 'job{:0>4}'.format(job_id))
tmp_dir = os.path.join(tmp_dir, 'job{:0>4}'.format(job_id))
try:
Expand All @@ -24,7 +24,7 @@ def compute(version_number, max_events, skip_events, event_file, log_dir, tmp_di
except OSError as e:
print(e)
athena_log = os.path.join(log_dir, 'athena.log')
arg = 'nice python {} -n {} -s {} --log_file {} --tmp_dir {} --output_dir {} {} {}'.format(constants.reco, max_events, skip_events, athena_log, tmp_dir, aod_dir, event_file, version_number)
arg = 'nice python {} -n {} -s {} --log_file {} --tmp_dir {} --output_dir {} {} {}'.format(reco, max_events, skip_events, athena_log, tmp_dir, aod_dir, event_file, version_number)
with open(os.path.join(log_dir, 'reco.log'), 'w+') as fh:
subprocess32.check_call(arg, executable='/bin/bash', shell=True, stdout=fh, stderr=subprocess32.STDOUT)
try:
Expand Down Expand Up @@ -52,6 +52,7 @@ def get_job_args(batch_size, evnt_dir, log_dir, tmp_dir, output_dir):
print('\t[{}] {}'.format(ii+1, evnt_file))
job_id = 0
job_args = []
reco = os.path.join(os.getcwd(), 'reco.py')
for evnt_file in evnt_files:
rel_evnt_file = string.split(evnt_file, sep='/')[-1]
run_number, evnt_batch_size, evnt_batch_number = string.split(rel_evnt_file, sep='.')[:3]
Expand All @@ -61,7 +62,7 @@ def get_job_args(batch_size, evnt_dir, log_dir, tmp_dir, output_dir):
for _ in range(int(math.ceil(evnt_batch_size / batch_size))):
job_batch_size = min(batch_size, evnt_batch_size - job_skip_events)
job_version_number = "{}.{}.{}".format(evnt_version, job_batch_size, job_skip_events)
job_args.append((job_version_number, job_batch_size, job_skip_events, evnt_file, log_dir, tmp_dir, output_dir, job_id))
job_args.append((job_version_number, job_batch_size, job_skip_events, evnt_file, log_dir, tmp_dir, output_dir, reco, job_id))
job_skip_events += job_batch_size
job_id += 1
with open(os.path.join(tmp_dir, 'jobs.args'), 'wb+') as fj_handle:
Expand All @@ -87,7 +88,6 @@ def check_jobs(jobs, job_args, tmp_dir, timestamp):

def dispatch_computations(job_args, tmp_dir, timestamp):
client = distributed.Client('localhost:8786')
client.upload_file('constants.py')
webbrowser.open('http://localhost:8787')
jobs = []
for job_arg in job_args:
Expand Down Expand Up @@ -143,7 +143,7 @@ def main():
job_args = get_job_args(args.batch_size, args.evnt_dir, log_dir, tmp_dir, aod_dir)
if args.test:
for job_arg in job_args:
job_version_number, job_batch_size, job_skip_events, evnt_file, _, _, _, job_id = job_arg
job_version_number, job_batch_size, job_skip_events, evnt_file, _, _, _, _, job_id = job_arg
print("[client]: job {:0>4}: version={}, batch_size={}, skip_events={}, evnt_file={}".format(job_id, job_version_number, job_batch_size, job_skip_events, evnt_file))
else:
dispatch_computations(job_args, tmp_dir, timestamp)
Expand Down

0 comments on commit de26a0f

Please sign in to comment.