Skip to content

Commit

Permalink
Improves logging procedures.
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexSchuy committed Nov 8, 2017
1 parent 1d1b49e commit c65e9a2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 27 deletions.
44 changes: 25 additions & 19 deletions client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
def compute(version_number, max_events, skip_events, event_file, log_dir, tmp_dir, job_id):
''' Runs reco.py with the given parameters. '''
import subprocess, constants, socket, os
log_file = os.path.join(log_dir, 'job{}.log'.format(job_id))
tmp_dir = os.path.join(tmp_dir, 'job{}'.format(job_id))
log_dir = os.path.join(log_dir, 'job{:0>4}'.format(job_id))
tmp_dir = os.path.join(tmp_dir, 'job{:0>4}'.format(job_id))
os.makedirs(log_dir)
os.makedirs(tmp_dir)
arg = 'nice python {} -n {} -s {} --log_file {} --tmp_dir {} {} {}'.format(constants.reco, max_events, skip_events, log_file, tmp_dir, event_file, version_number)
subprocess.check_call(arg, executable='/bin/bash', shell=True)
athena_log = os.path.join(log_dir, 'athena.log')
arg = 'nice python {} -n {} -s {} --log_file {} --tmp_dir {} {} {}'.format(constants.reco, max_events, skip_events, athena_log, tmp_dir, event_file, version_number)
with open(os.path.join(log_dir, 'reco.log'), 'w+') as fh:
subprocess.check_call(arg, executable='/bin/bash', shell=True, stdout=fh, stderr=subprocess.STDOUT)
return socket.gethostname()

def get_job_args(batch_size, evnt_dir):
def get_job_args(batch_size, evnt_dir, log_dir, tmp_dir):
evnt_files = glob.glob(evnt_dir + '/*.evnt.pool.root')
print ('Found evnt files:')
for ii, evnt_file in enumerate(evnt_files):
Expand All @@ -43,11 +46,7 @@ def get_job_args(batch_size, evnt_dir):
job_id += 1
return job_args

def dispatch_computations(job_args, timestamp, test, local):
log_dir = os.path.join(constants.log_dir, timestamp)
tmp_dir = os.path.join(constants.tmp_dir, timestamp)
os.makedirs(log_dir)
os.makedirs(tmp_dir)
def dispatch_computations(job_args, test, local, log_dir, tmp_dir, timestamp):
if not test and not local:
cluster = dispy.JobCluster(compute, depends=[constants])
http_server = dispy.httpd.DispyHTTPServer(cluster)
Expand Down Expand Up @@ -76,10 +75,11 @@ def dispatch_computations(job_args, timestamp, test, local):
print('{} executed job {:0>4} from {} to {}'.format(host, job.id, job.start_time, job.end_time))
if len(failed_jobs) != 0:
print('The following jobs failed ({} in total): '.format(len(failed_jobs)))
for job in jobs:
for job in failed_jobs:
print('\tjob {}'.format(job[-1]))
with open(os.path.join(tmp_dir, 'failed_jobs.args')) as fj_handle:
pickle.dump(failed_jobs, fj_handle)
with open(os.path.join(tmp_dir, 'failed_jobs.args'), 'wb+') as fj_handle:
pickle.dump(failed_jobs, fj_handle)
print('Run "python client.py -r {}" to redispatch failed jobs.'.format(timestamp))
cluster.print_status()
http_server.shutdown()

Expand Down Expand Up @@ -113,14 +113,20 @@ def main():
if args.clean:
clean()
if args.timestamp:
tmp_dir = os.path.join(constants.tmp_dir, args.timestamp)
with open(tmp_dir, 'failed_jobs.args') as fj_handle:
failed_jobs = pickle.load(fj_handle)
dispatch_computations(failed_jobs, timestamp, args.test, args.local)
timestamp = args.timestamp
else:
job_args = get_job_args(args.batch_size, args.evnt_dir)
timestamp = '{:%Y%m%d%H%M%S}'.format(datetime.datetime.now())
dispatch_computations(job_args, timestamp, args.test, args.local)
log_dir = os.path.join(constants.log_dir, timestamp)
tmp_dir = os.path.join(constants.tmp_dir, timestamp)
os.makedirs(log_dir)
os.makedirs(tmp_dir)
if args.timestamp:
with open(os.path.join(tmp_dir, 'failed_jobs.args'), 'rb') as fj_handle:
failed_jobs = pickle.load(fj_handle)
dispatch_computations(failed_jobs, args.test, args.local, log_dir, tmp_dir, timestamp)
else:
job_args = get_job_args(args.batch_size, args.evnt_dir, log_dir, tmp_dir)
dispatch_computations(job_args, args.test, args.local, log_dir, tmp_dir, timestamp)

if __name__ == '__main__':
main()
16 changes: 8 additions & 8 deletions reco.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ def reco(evnt_file, version, aod_dir, num_events, skip_events, geometry_version,
rdo_file = version + '.rdo.pool.root'
esd_file = version + '.esd.pool.root'
aod_file = version + '.aod.pool.root'
outputs = (hits_file, rdo_file, esd_file, aod_file)
log_file_handle = open(log_file, 'w+')
source_arg = '. /phys/users/gwatts/bin/CommonScripts/configASetup.sh && . $AtlasSetup/scripts/asetup.sh here,'
sim_arg = source_arg + '{} && Sim_tf.py --inputEVNTFile {} --geometryVersion {} --conditionsTag {} --outputHITSFile {} --physicsList "FTFP_BERT" --postInclude "PyJobTransforms/UseFrontier.py" --preInclude "EVNTtoHITS:SimulationJobOptions/preInclude.BeamPipeKill.py" --maxEvents {} --skipEvents {} --randomSeed "8" --simulator "MC12G4" --truthStrategy "MC12"'.format(sim_release, evnt_file, geometry_version, conditions_tag, hits_file, num_events, skip_events)
dig_arg = source_arg + '{} && Digi_tf.py --inputHitsFile {} --outputRDOFile {} --geometryVersion {} --conditionsTag {}'.format(dig_release, hits_file, rdo_file, geometry_version, conditions_tag)
reco_arg = source_arg + '{} && Reco_tf.py --inputRDOFile {} --outputESDFile {} --DBRelease current --autoConfiguration="everything" && Reco_tf.py --inputESDFile {} --outputAODFile {} --DBRelease current --autoConfiguration="everything"'.format(reco_release, rdo_file, esd_file, esd_file, aod_file)
for arg in (sim_arg, dig_arg, reco_arg):
hits_arg = source_arg + '{} && Sim_tf.py --inputEVNTFile {} --geometryVersion {} --conditionsTag {} --outputHITSFile {} --physicsList "FTFP_BERT" --postInclude "PyJobTransforms/UseFrontier.py" --preInclude "EVNTtoHITS:SimulationJobOptions/preInclude.BeamPipeKill.py" --maxEvents {} --skipEvents {} --randomSeed "8" --simulator "MC12G4" --truthStrategy "MC12"'.format(sim_release, evnt_file, geometry_version, conditions_tag, hits_file, num_events, skip_events)
rdo_arg = source_arg + '{} && Digi_tf.py --inputHitsFile {} --outputRDOFile {} --geometryVersion {} --conditionsTag {}'.format(dig_release, hits_file, rdo_file, geometry_version, conditions_tag)
esd_arg = source_arg + '{} && Reco_tf.py --inputRDOFile {} --outputESDFile {} --DBRelease current --autoConfiguration="everything"'.format(reco_release, rdo_file, esd_file)
aod_arg = source_arg + '{} && Reco_tf.py --inputESDFile {} --outputAODFile {} --DBRelease current --autoConfiguration="everything"'.format(reco_release, esd_file, aod_file)
for arg, output in zip((hits_arg, rdo_arg, esd_arg, aod_arg), (hits_file, rdo_file, esd_file, aod_file)):
try:
subprocess.check_call(arg, executable='/bin/bash', cwd=tmp_dir, shell=True, stdout=log_file_handle, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
print(e)
print('reco.py: {}'.format(e))
# Remove all non-essential files after each step.
for entry in os.listdir(tmp_dir):
entry_path = os.path.join(tmp_dir, entry)
try:
if os.path.isfile(entry_path) and entry not in outputs:
if os.path.isfile(entry_path) and entry != output:
os.remove(entry_path)
elif os.path.isdir(entry_path):
shutil.rmtree(entry_path)
except OSError as e:
print(e)
print('reco.py: {}'.format(e))
# move the aod file to the output directory, and make it immutable so that
# it is not accidentally deleted.
os.rename(os.path.join(tmp_dir, aod_file), os.path.join(aod_dir, aod_file))
Expand Down

0 comments on commit c65e9a2

Please sign in to comment.