Skip to content

Commit

Permalink
first crab submission implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ajgilbert committed Sep 24, 2015
1 parent f9254d3 commit b3225ac
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 5 deletions.
17 changes: 17 additions & 0 deletions data/FrameworkJobReport.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<FrameworkJobReport>
<ReadBranches>
</ReadBranches>
<PerformanceReport>
<PerformanceSummary Metric="StorageStatistics">
<Metric Name="Parameter-untracked-bool-enabled" Value="true"/>
<Metric Name="Parameter-untracked-bool-stats" Value="true"/>
<Metric Name="Parameter-untracked-string-cacheHint" Value="application-only"/>
<Metric Name="Parameter-untracked-string-readHint" Value="auto-detect"/>
<Metric Name="ROOT-tfile-read-totalMegabytes" Value="0"/>
<Metric Name="ROOT-tfile-write-totalMegabytes" Value="0"/>
</PerformanceSummary>
</PerformanceReport>

<GeneratorInfo>
</GeneratorInfo>
</FrameworkJobReport>
5 changes: 5 additions & 0 deletions data/do_nothing_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import FWCore.ParameterSet.Config as cms
process = cms.Process("MAIN")

process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring())

114 changes: 109 additions & 5 deletions python/combine/CombineToolBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,47 @@
'PWD': os.environ['PWD']
})

CRAB_PREFIX = """
set -x
set -e
ulimit -s unlimited
ulimit -c 0
function error_exit
{
if [ $1 -ne 0 ]; then
echo "Error with exit code ${1}"
if [ -e FrameworkJobReport.xml ]
then
cat << EOF > FrameworkJobReport.xml.tmp
<FrameworkJobReport>
<FrameworkError ExitStatus="${1}" Type="" >
Error with exit code ${1}
</FrameworkError>
EOF
tail -n+2 FrameworkJobReport.xml >> FrameworkJobReport.xml.tmp
mv FrameworkJobReport.xml.tmp FrameworkJobReport.xml
else
cat << EOF > FrameworkJobReport.xml
<FrameworkJobReport>
<FrameworkError ExitStatus="${1}" Type="" >
Error with exit code ${1}
</FrameworkError>
</FrameworkJobReport>
EOF
fi
exit 0
fi
}
trap 'error_exit $?' ERR
"""

CRAB_POSTFIX = """
tar -cf combine_output.tar higgsCombine*.root
rm higgsCombine*.root
"""


def run_command(dry_run, command):
if not dry_run:
Expand All @@ -38,6 +79,8 @@ def __init__(self):
self.merge = 1
self.task_name = 'combine_task'
self.dry_run = False
self.bopts = '' # batch submission options
self.custom_crab = None

def attach_job_args(self, group):
group.add_argument('--job-mode', default=self.job_mode, choices=[
Expand All @@ -50,6 +93,10 @@ def attach_job_args(self, group):
help='Number of jobs to run in a single script [only affects batch submission]')
group.add_argument('--dry-run', action='store_true',
help='Print commands to the screen but do not run them')
group.add_argument('--sub-opts', default=self.bopts,
help='Options for batch/crab submission')
group.add_argument('--custom-crab', default=self.custom_crab,
help='python file containing a function with name signature "custom_crab(config)" that can be used to modify the default crab configuration')

def attach_intercept_args(self, group):
pass
Expand All @@ -65,24 +112,27 @@ def set_args(self, known, unknown):
self.merge = self.args.merge
self.dry_run = self.args.dry_run
self.passthru.extend(unknown)
self.bopts = self.args.sub_opts

def put_back_arg(self, arg_name, target_name):
if hasattr(self.args, arg_name):
self.passthru.extend([target_name, getattr(self.args, arg_name)])
delattr(self.args, arg_name)

def create_job_script(self, commands, script_filename):
def create_job_script(self, commands, script_filename, do_log = False):
fname = script_filename
logname = script_filename.replace('.sh', '.log')
with open(fname, "w") as text_file:
text_file.write(JOB_PREFIX)
for i, command in enumerate(commands):
tee = 'tee' if i == 0 else 'tee -a'
log_part = '\n'
if do_log: log_part = ' 2>&1 | %s ' % tee + logname + log_part
if command.startswith('combine'):
text_file.write(
'eval ' + command + ' 2>&1 | %s ' % tee + logname + '\n')
'eval ' + command + log_part)
else:
text_file.write('command')
text_file.write(command)
st = os.stat(fname)
os.chmod(fname, st.st_mode | stat.S_IEXEC)
# print JOB_PREFIX + command
Expand All @@ -97,13 +147,67 @@ def run_method(self):
self.job_queue.append(command)
self.flush_queue()

def extract_workspace_arg(self, cmd_list=[]):
for arg in ['-d', '--datacard']:
if arg in cmd_list:
idx = cmd_list.index(arg)
assert idx != -1 and idx < len(cmd_list)
return cmd_list[idx + 1]
raise RuntimeError('The workspace argument must be specified explicity with -d or --datacard')
def flush_queue(self):
if self.job_mode == 'interactive':
pool = Pool(processes=self.parallel)
result = pool.map(
partial(run_command, self.dry_run), self.job_queue)
elif self.job_mode == 'script':
script_list = []
if self.job_mode in ['script', 'lxbatch']:
for i, j in enumerate(range(0, len(self.job_queue), self.merge)):
script_name = 'job_%s_%i.sh' % (self.task_name, i)
# each job is given a slice from the list of combine commands of length 'merge'
# we also keep track of the files that were created in case submission to a
# batch system was also requested
self.create_job_script(
self.job_queue[j:j + self.merge], 'job_%s_%i.sh' % (self.task_name, i))
self.job_queue[j:j + self.merge], script_name, self.job_mode == 'script')
script_list.append(script_name)
if self.job_mode == 'lxbatch':
for script in script_list:
full_script = os.path.abspath(script)
logname = full_script.replace('.sh', '_%J.log')
run_command(self.dry_run, 'bsub -o %s %s %s' % (logname, self.bopts, full_script))
if self.job_mode == 'crab3':
#import the stuff we need
from CRABAPI.RawCommand import crabCommand
from httplib import HTTPException
print '>> crab3 requestName will be %s' % self.task_name
outscriptname = 'crab_%s.sh' % self.task_name
print '>> crab3 script will be %s' % outscriptname
outscript = open(outscriptname, "w")
outscript.write(CRAB_PREFIX)
jobs = 0
wsp_files = set()
for line in self.job_queue:
jobs += 1
wsp = self.extract_workspace_arg(line.split())
wsp_files.add(wsp)
outscript.write('\nif [ $1 -eq %i ]; then\n' % jobs)
outscript.write(' ' + line.replace(wsp, os.path.basename(wsp)) + '\n')
outscript.write('fi')
outscript.write(CRAB_POSTFIX)
outscript.close()
from HiggsAnalysis.HiggsToTauTau.combine.crab import config
config.General.requestName = self.task_name
config.JobType.scriptExe = outscriptname
config.JobType.inputFiles.extend(wsp_files)
config.Data.totalUnits = jobs
config.Data.publishDataName = config.General.requestName
if self.custom_crab is not None:
d = {}
execfile(self.custom_crab, d)
d['custom_crab'](config)
print config
if not self.dry_run:
try:
crabCommand('submit', config = config)
except HTTPException, hte:
print hte.headers
del self.job_queue[:]
31 changes: 31 additions & 0 deletions python/combine/crab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
from WMCore.Configuration import Configuration


config = Configuration()

config.section_('General')
config.General.requestName = ''
# if (args.workArea != ''):
# config.General.workArea = args.workArea

config.section_('JobType')
config.JobType.pluginName = 'PrivateMC'
config.JobType.psetName = os.environ['CMSSW_BASE']+'/src/HiggsAnalysis/HiggsToTauTau/data/do_nothing_cfg.py'
config.JobType.scriptExe = ''
config.JobType.inputFiles = [os.environ['CMSSW_BASE']+'/src/HiggsAnalysis/HiggsToTauTau/data/FrameworkJobReport.xml', os.environ['CMSSW_BASE']+'/bin/'+os.environ['SCRAM_ARCH']+'/combine']
config.JobType.outputFiles = ['combine_output.tar']
# config.JobType.maxMemoryMB = args.maxMemory

config.section_('Data')
config.Data.primaryDataset = 'Combine'
config.Data.splitting = 'EventBased'
config.Data.unitsPerJob = 1
config.Data.totalUnits = 1
config.Data.publication = False
config.Data.publishDataName = ''

config.section_('User')

config.section_('Site')
config.Site.storageSite = 'T2_CH_CERN'

0 comments on commit b3225ac

Please sign in to comment.