Skip to content

Commit

Permalink
Dask (#15)
Browse files Browse the repository at this point in the history
* Updates package setup.

* Modifies code to use dask.

* Removes client recovery option in client.py, as it is not supported by dask.

* Adds file for server initialization.

* Update README.md
  • Loading branch information
AlexSchuy authored Nov 10, 2017
1 parent 50069d5 commit fe44adb
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 147 deletions.
21 changes: 9 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,29 @@ Once you are on the tev machines, the first thing to do is to [download](https:/
Once you have cloned the repository and are in bash, run `. package_setup.sh`. This will install the necessary python packages, and run a simple test that should print out version numbers for these packages.

## Installation
The settings for the project are stored in the constants.py file. In this file you can edit the list of tev servers to use, the root directory for the project, and settings relating to the athena scripts.
The settings for the project are stored in the constants.py and hostfile.txt files. In these files you can edit the list of tev servers to use, the root directory for the project, and settings relating to the athena scripts.

To check that you have everything setup correctly, run:
To check that you have everything setup correctly, run (on tev01):
```
python servers.py start
python client.py -t
python client_test.py
python servers.py stop
python servers.py status
python ~/.local/lib/python2.7/site-packages/distributed/cli/dask-ssh.py --scheduler tev01 --hostfile hostfile.txt
```
Then, run `python client.py -t` on another terminal. Press Ctrl + C on the first terminal to terminate the servers.

## Running
The files in this repository can be split into two main categories: generation (generate.py, hss-runner.py, MadGraphControl_HSS.py) and simulation/digitization/reconstruction (client.py, servers.py, reco.py), usually referred to as simply reconstruction, or reco.
The files in this repository can be split into two main categories: generation (generate.py, hss-runner.py, MadGraphControl_HSS.py) and simulation/digitization/reconstruction (client.py, reco.py), usually referred to as simply reconstruction, or reco.

### Generation
hss-runner.py and MadGraphControl_HSS.py are configuration files for athena event generation: they specify details regarding the hidden valley model and how it should be generated. They should generally not be modified unless the parameters of the project change. generate.py is a script that runs the athena event generation, creating a set of event files. Run `python generate.py -h` to see a complete listing of available options (this also works on client.py, servers.py, and reco.py).

### Reconstruction
reco.py is similar to generate.py, but instead of generation, it performs the full simulation/digitization/reconstruction process on a given event file. This file can be run directly for testing purposes, but in deployment, it should only be run indirectly through client.py. client.py is a script that manages distributed computation across the tev machines: it essentially splits up the job of reconstructing a large set of events into small batches called jobs, and distributes these jobs to the tev machines. However, before this can be done, servers.py must be run, which primes the tev machines listed in constants.py so that they are ready to receive jobs from client.py.
reco.py is similar to generate.py, but instead of generation, it performs the full simulation/digitization/reconstruction process on a given event file. This file can be run directly for testing purposes, but in deployment, it should only be run indirectly through client.py. client.py is a script that manages distributed computation across the tev machines: it essentially splits up the job of reconstructing a large set of events into small batches called jobs, and distributes these jobs to the tev machines. However, before this can be done, you must run dask-ssh as shown in the installation section, which primes the tev machines listed in hostfile.txt so that they are ready to receive jobs from client.py.

# Deployment
Once you're comfortable with how the scripts work, and you've modified the settings as appropriate, you're ready to run a full-blown generation/reconstruction cycle. There are 4 steps to follow:
1. Run `python generate.py -b BATCHSIZE -n NUM_BATCHES --evnt_dir EVNT_DIR`
2. Run `python servers.py start`. Ensure servers are operating by running `python servers.py status`.
3. Run `python client.py -b BATCH_SIZE --evnt_dir EVNT_DIR` where BATCH_SIZE is the size of each job sent to each tev machine (around 10-50 is probably a good number), and EVNT_DIR should be the same as in step 1.
4. Analyze the logs from the run, and diagnose the causes of any failures. Once these issues are resolved, Run `python client.py -b BATCH_SIZE --evnt_dir EVNT_DIR -r TIMESTAMP`, where TIMESTAMP is the timestamp shown by step 3, to rerun the failed jobs. Repeat this step as necessary.
2. Run `python ~/.local/lib/python2.7/site-packages/distributed/cli/dask-ssh.py --scheduler tev01 --hostfile hostfile.txt`.
3. Run `python client.py -b BATCH_SIZE --evnt_dir EVNT_DIR` where BATCH_SIZE is the size of each job sent to each tev machine (around 10-50 is probably a good number), and EVNT_DIR should be the same as in step 1. It's a good idea to run this command with `-t` first, to make sure that your settings are correct.
4. Analyze the logs from the run, and diagnose the causes of any failures. Once these issues are resolved, Run `python client.py -r TIMESTAMP`, where TIMESTAMP is the timestamp shown by step 3, to rerun the failed jobs. Repeat this step as necessary.

# Authors
* Andrew Arbogast - *created the repository, created the initial script on which later work was based*
Expand Down
52 changes: 19 additions & 33 deletions client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import division
import argparse
import dispy
import dispy.httpd
import distributed
import webbrowser
import glob
import math
Expand All @@ -27,7 +26,7 @@ def compute(version_number, max_events, skip_events, event_file, log_dir, tmp_di

def get_job_args(batch_size, evnt_dir, log_dir, tmp_dir, aod_dir):
evnt_files = glob.glob(evnt_dir + '/*.evnt.pool.root')
print ('Found evnt files:')
print ('[client]: Found evnt files:')
for ii, evnt_file in enumerate(evnt_files):
print('\t[{}] {}'.format(ii+1, evnt_file))
job_id = 0
Expand All @@ -51,32 +50,29 @@ def get_job_args(batch_size, evnt_dir, log_dir, tmp_dir, aod_dir):
def check_jobs(jobs, job_args, tmp_dir, timestamp):
failed_jobs = []
for job_id, job in enumerate(jobs):
print ('Waiting for job {:0>4}'.format(job_id))
host = job()
if host is None:
print ('[client]: Waiting for job {:0>4}'.format(job_id))
try:
host = job.result()
print('[{}]: Executed job {:0>4}'.format(host))
except subprocess32.CalledProcessError as e:
failed_jobs.append(job_args[job_id])
print('ERROR: job {:0>4} failed!'.format(job_id))
else:
print('{} executed job {:0>4} from {} to {}'.format(host, job_id, job.start_time, job.end_time))
if len(failed_jobs) != 0:
print('The following jobs failed ({} in total): '.format(len(failed_jobs)))
print('[client]: The following jobs failed ({} in total): '.format(len(failed_jobs)))
for job in failed_jobs:
print('\tjob {:0>4}'.format(job[-1]))
with open(os.path.join(tmp_dir, 'failed_jobs.args'), 'wb+') as fj_handle:
pickle.dump(failed_jobs, fj_handle)
print('Run "python client.py -r {}" to redispatch failed jobs.'.format(timestamp))
print('[client]: Run "python client.py -r {}" to redispatch failed jobs.'.format(timestamp))

def dispatch_computations(job_args, tmp_dir, timestamp):
cluster = dispy.JobCluster(compute, depends=[constants])
http_server = dispy.httpd.DispyHTTPServer(cluster)
webbrowser.open('http://localhost:8181')
client = distributed.Client('localhost:8786')
client.upload_file('constants.py')
webbrowser.open('http://localhost:8787')
jobs = []
for job_arg in job_args:
job = cluster.submit(*job_arg)
job = client.submit(compute, *job_arg)
jobs.append(job)
check_jobs(jobs, job_args, tmp_dir, timestamp)
cluster.print_status()
http_server.shutdown()

def clean():
for dir in (constants.log_dir, constants.tmp_dir):
Expand All @@ -88,12 +84,12 @@ def clean():
elif os.path.isdir(entry_path):
shutil.rmtree(entry_path, ignore_errors=True)
except OSError as e:
print('clean: {}'.format(e))
print('[client]: clean: {}'.format(e))
for entry in glob.glob('./_dispy_*'):
try:
os.remove(entry)
except OSError as e:
print('clean: {}'.format(e))
print('[client]: clean: {}'.format(e))

def main():
parser = argparse.ArgumentParser(description='Dispatch reco jobs to tev machines.')
Expand All @@ -103,39 +99,29 @@ def main():
group = parser.add_mutually_exclusive_group()
group.add_argument('-c', '--clean', action='store_true', help='Remove old recovery, temporary, and log files.')
group.add_argument('-r', '--redispatch_timestamp', dest='timestamp', default=None, help='The timestamp of a run, the failed jobs of which will be redispatched.')
group.add_argument('--recover', default=None, help='Recover from client failure using provided timestamp.')
args = parser.parse_args()
if args.clean:
clean()
if args.recover:
timestamp = args.recover
elif args.timestamp:
timestamp = args.timestamp
else:
timestamp = '{:%Y%m%d%H%M%S}'.format(datetime.datetime.now())
log_dir = os.path.join(constants.log_dir, timestamp)
tmp_dir = os.path.join(constants.tmp_dir, timestamp)
aod_dir = os.path.join(constants.aod_dir, timestamp)
if not args.recover:
os.makedirs(log_dir)
os.makedirs(tmp_dir)
os.makedirs(aod_dir)
os.makedirs(log_dir)
os.makedirs(tmp_dir)
os.makedirs(aod_dir)
if args.timestamp:
with open(os.path.join(tmp_dir, 'failed_jobs.args'), 'rb') as fj_handle:
failed_jobs = pickle.load(fj_handle)
dispatch_computations(failed_jobs, tmp_dir, timestamp)
elif args.recover:
with open(os.path.join(tmp_dir, 'jobs.args'), 'rb') as fj_handle:
job_args = pickle.load(fj_handle)
jobs = dispy.recover_jobs()
print ('Recovering jobs ({} in total):'.format(len(jobs)))
check_jobs(jobs, job_args, tmp_dir, timestamp)
else:
job_args = get_job_args(args.batch_size, args.evnt_dir, log_dir, tmp_dir, aod_dir)
if args.test:
for job_arg in job_args:
job_version_number, job_batch_size, job_skip_events, evnt_file, _, _, _, job_id = job_arg
print("job {:0>4}: version={}, batch_size={}, skip_events={}, evnt_file={}".format(job_id, job_version_number, job_batch_size, job_skip_events, evnt_file))
print("[client]: job {:0>4}: version={}, batch_size={}, skip_events={}, evnt_file={}".format(job_id, job_version_number, job_batch_size, job_skip_events, evnt_file))
else:
dispatch_computations(job_args, tmp_dir, timestamp)
if args.test:
Expand Down
25 changes: 0 additions & 25 deletions client_test.py

This file was deleted.

4 changes: 0 additions & 4 deletions constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
servers = ['tev02', 'tev03', 'tev04', 'tev05', 'tev06', 'tev07', 'tev08', 'tev10']

# Note: this directory should contain the Gen_Reco repository.
root_dir = '/phys/groups/tev/scratch3/users/WHHV'

Expand All @@ -13,8 +11,6 @@
dig_release = '19.2.4.14'
reco_release = '19.2.4.14'

dispy_dir = '~/.local/lib/python2.7/site-packages/dispy'
pids_dir = root_dir + '/.var/run'
evnt_dir = root_dir + '/evnts'
tmp_dir = root_dir + '/tmp'
log_dir = root_dir + '/logs'
Expand Down
8 changes: 8 additions & 0 deletions hostfile.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
tev02
tev03
tev04
tev05
tev06
tev07
tev08
tev10
2 changes: 1 addition & 1 deletion package_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
source /phys/users/gwatts/bin/CommonScripts/configASetup.sh
lsetup root
easy_install-2.7 --install-dir ~/.local/lib/python2.7/site-packages pip
~/.local/lib/python2.7/site-packages/pip install --upgrade --user psutil dispy subprocess32
~/.local/lib/python2.7/site-packages/pip install --upgrade --user python-gssapi paramiko bokeh dask distributed subprocess32
python package_test.py
11 changes: 7 additions & 4 deletions package_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import sys
import psutil
import dispy
import dask
import dask.distributed
import paramiko
import bokeh
import subprocess32

print('python version: {}.{}.{}'.format(sys.version_info[0], sys.version_info[1], sys.version_info[2]))
print ('psutil version: {}'.format(psutil.__version__))
print ('dispy version: {}'.format(dispy.__version__))
print('dask version: {}'.format(dask.__version__))
print('bokeh version: {}'.format(bokeh.__version__))
print('paramiko version: {}'.format(paramiko.__version__))
1 change: 1 addition & 0 deletions reco.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def reco(evnt_file, version, aod_dir, num_events, skip_events, geometry_version,
not_writable = ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
os.chmod(aod_file_path, st.st_mode & not_writable)
shutil.rmtree(tmp_dir)
shutil.rmtree(log_dir)

def main():
import argparse
Expand Down
68 changes: 0 additions & 68 deletions servers.py

This file was deleted.

0 comments on commit fe44adb

Please sign in to comment.