Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Coffea2023 #14

Open
wants to merge 50 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
af6cde6
First updates for coffea2023
kmohrman Dec 19, 2023
37c42eb
More coffea 2023
kmohrman Dec 19, 2023
ca7de05
Do not need columns thing here
kmohrman Jan 3, 2024
747630d
Pass dict of samples to processor
kmohrman Jan 3, 2024
ce8d019
Do not loop over datasets in processor
kmohrman Jan 3, 2024
de47ff1
We pass the info for each json one at a time again
kmohrman Jan 3, 2024
e20d047
Update ele id xgb for coffea 2023
kmohrman Jan 4, 2024
a0e0937
Update mu xgb stuff too
kmohrman Jan 4, 2024
4b643d2
Reorganize xgb to factor out common part
kmohrman Jan 4, 2024
f1875f7
Uncomment lep ID and SFs
kmohrman Jan 4, 2024
40c5087
Make flake8 happy
kmohrman Jan 4, 2024
70d45f2
Need dask in conda env?
kmohrman Jan 4, 2024
3b894de
Dont need dak in processor
kmohrman Jan 4, 2024
f1fedb8
Ahh dont pin coffea version
kmohrman Jan 4, 2024
2573205
For now need coffea2023 branch of topcoffea
kmohrman Jan 4, 2024
9a7f7d3
Clean up
kmohrman Jan 4, 2024
2cfacfb
Enable sr bdt evaluation
kmohrman Jan 4, 2024
6d28fd1
Dont need xgb import
kmohrman Jan 4, 2024
5f6bdb2
Dont need np here
kmohrman Jan 4, 2024
f94e217
clean up run script
kmohrman Jan 4, 2024
e752129
In progress updates for scaling up
kmohrman Jan 4, 2024
d056850
Updates for distributed client
kmohrman Jan 5, 2024
0952ffc
Clean up run script a bit
kmohrman Jan 5, 2024
d8765cd
Tmp add run script and json for reproducing error
kmohrman Jan 7, 2024
192056a
Tmp make distributed client default in run script
kmohrman Jan 7, 2024
75ccf8b
Updates to run script
kmohrman Jan 8, 2024
4b70f10
Workaround from Lindsey to avoid overtouching
kmohrman Jan 11, 2024
b5411a0
No longer need workaround for None arrays, see ak issue 2768
kmohrman Jan 11, 2024
362a472
UAF 10 not working so switch to 8
kmohrman Jan 11, 2024
3be1cd9
Temp un-fix overtouching workaround
kmohrman Jan 11, 2024
a4a1856
Use events.nom for data for now, as weights object seems now to be No…
kmohrman Jan 12, 2024
1086d26
Remove this root file, it seems to be empty and causes error: files' …
kmohrman Jan 12, 2024
bdb9bd5
Add fsspec-xrootd to env file
kmohrman Jan 12, 2024
27da35d
Do not need to skip empty file anymore
kmohrman Jan 26, 2024
f8b185f
Merge remote-tracking branch 'origin/main' into coffea2023_systs
kmohrman Jan 28, 2024
b54550d
Deepcopy causes crash, probably dont even need copy.copy here
kmohrman Jan 28, 2024
25db1e1
Merge pull request #18 from cmstas/coffea2023_systs
kmohrman Jan 29, 2024
bcbfcdb
Fix weights for data
kmohrman Jan 30, 2024
26a0bb7
Try out parallelize_with_dask, temporary add exit after build task graph
kmohrman Feb 6, 2024
acbfc57
Clean up run script
kmohrman Mar 16, 2024
fc03300
Remove some old WQ stuff
kmohrman Mar 16, 2024
8920081
Using cache for masks
kmohrman Mar 17, 2024
43ae6b0
Remove unused import to make flake8 happy
kmohrman Mar 17, 2024
6d541cc
Adding jsons for hpg sample
kmohrman Mar 26, 2024
7f4cac4
Adding option to run with TaskVine
kmohrman Mar 26, 2024
0ed3830
Fix CI
kmohrman Mar 26, 2024
817f592
Fix some missing and duplicates, and add cfgs for the local jsons
kmohrman Apr 5, 2024
d3853cb
Update chunk size and just comment out the weird edge case problem fi…
kmohrman Jun 21, 2024
f91672e
Update README.md
kmohrman Jun 21, 2024
63abb06
Comment out samples for consistency, add run script
kmohrman Jun 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
run: |
mkdir dir_for_topcoffea
cd dir_for_topcoffea
git clone https://github.com/TopEFT/topcoffea.git
git clone -b coffea2023 https://github.com/TopEFT/topcoffea.git
cd topcoffea
conda run -n coffea-env pip install -e .
cd ../..
Expand All @@ -105,7 +105,7 @@ jobs:

- name: Download root files
run: |
wget --no-verbose http://uaf-10.t2.ucsd.edu/~kmohrman/for_ci/for_wwz/WWZJetsTo4L2Nu_4F_TuneCP5_13TeV-amcatnlo-pythia8_RunIISummer20UL17NanoAODv9-106X_mc2017_realistic_v9-v2_NANOAODSIM_3LepTau_4Lep/output_1.root
wget --no-verbose http://uaf-8.t2.ucsd.edu/~kmohrman/for_ci/for_wwz/WWZJetsTo4L2Nu_4F_TuneCP5_13TeV-amcatnlo-pythia8_RunIISummer20UL17NanoAODv9-106X_mc2017_realistic_v9-v2_NANOAODSIM_3LepTau_4Lep/output_1.root

- name: Pytest setup
run: |
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This analysis repository contains scripts and tools for performing analyses asso

First, clone the repository and `cd` into the toplevel directory.
```
git clone https://github.com/kmohrman/ewkcoffea.git
git clone -b coffea2023 https://github.com/kmohrman/ewkcoffea.git
cd ewkcoffea
```
Next, create a `conda` environment and activate it.
Expand All @@ -25,7 +25,7 @@ pip install mt2
The `topcoffea` package upon which this analysis also depends is not yet available on `PyPI`, so we need to clone the `topcoffea` repo and install it ourselves.
```
cd /your/favorite/directory
git clone https://github.com/TopEFT/topcoffea.git
git clone -b coffea2023 https://github.com/TopEFT/topcoffea.git
cd topcoffea
pip install -e .
```
Expand Down
17 changes: 16 additions & 1 deletion analysis/wwz/get_wwz_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,32 @@ def get_counts(histos_dict):
return out_dict


# Restructure hists so samples are in category axis
def restructure_hist(in_dict):

dense_axes = list(list(in_dict.values())[0].keys())
restructured_hist = {}
for dense_axis in dense_axes:
for dataset_name in in_dict.keys():
if dense_axis not in restructured_hist:
restructured_hist[dense_axis] = in_dict[dataset_name][dense_axis]
else:
restructured_hist[dense_axis] += in_dict[dataset_name][dense_axis]
return restructured_hist


def main():

# Set up the command line parser
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--pkl-file-path", default="histos/plotsTopEFT.pkl.gz", help = "The path to the pkl file")
parser.add_argument("pkl_file_path", help = "The path to the pkl file")
parser.add_argument("-o", "--output-path", default=".", help = "The path the output files should be saved to")
parser.add_argument("-n", "--output-name", default="counts_wwz_sync", help = "A name for the output directory")
args = parser.parse_args()

# Get the counts from the input hiso
histo_dict = pickle.load(gzip.open(args.pkl_file_path))
histo_dict = restructure_hist(histo_dict)

counts_dict = get_counts(histo_dict)

Expand Down
8 changes: 8 additions & 0 deletions analysis/wwz/run_wrapper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Get the file the CI uses, and move it to the directory the JSON expects
printf "\nDownloading root file...\n"
wget -nc http://uaf-10.t2.ucsd.edu/~kmohrman/for_ci/for_wwz/WWZJetsTo4L2Nu_4F_TuneCP5_13TeV-amcatnlo-pythia8_RunIISummer20UL17NanoAODv9-106X_mc2017_realistic_v9-v2_NANOAODSIM_3LepTau_4Lep/output_1.root

# Run the processor via the run_wwz4l.py script
# Note the -x executor argument does not do anything anymore, though should keep it in for now since without it the code tries to default to wq so tries to start packaging up env

time python run_wwz4l.py ../../input_samples/sample_jsons/test_samples/UL17_WWZJetsTo4L2Nu_forCI.json,../../input_samples/sample_jsons/test_samples/UL17_WWZJetsTo4L2Nu_forCI_extra.json -x iterative
238 changes: 119 additions & 119 deletions analysis/wwz/run_wwz4l.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@
import cloudpickle
import gzip
import os
from coffea import processor
from coffea.nanoevents import NanoAODSchema
import dask
from distributed import Client

import topcoffea.modules.remote_environment as remote_environment
from coffea.dataset_tools import preprocess
from coffea.dataset_tools import apply_to_fileset
from coffea.dataset_tools import filter_files

from ndcctools.taskvine import DaskVine

import wwz4l

LST_OF_KNOWN_EXECUTORS = ["futures","work_queue","iterative"]
LST_OF_KNOWN_EXECUTORS = ["local","task_vine"]

if __name__ == '__main__':

parser = argparse.ArgumentParser(description='You can customize your run')
parser.add_argument('jsonFiles' , nargs='?', default='', help = 'Json file(s) containing files and metadata')
parser.add_argument('--executor','-x' , default='work_queue', help = 'Which executor to use', choices=LST_OF_KNOWN_EXECUTORS)
parser.add_argument('--executor','-x' , default='dask', help = 'Which executor to use', choices=LST_OF_KNOWN_EXECUTORS)
parser.add_argument('--prefix', '-r' , nargs='?', default='', help = 'Prefix or redirector to look for the files')
parser.add_argument('--test','-t' , action='store_true' , help = 'To perform a test, run over a few events in a couple of chunks')
parser.add_argument('--pretend' , action='store_true', help = 'Read json files but, not execute the analysis')
Expand Down Expand Up @@ -152,11 +156,11 @@ def LoadJsonToSampleName(jsonFile, prefix):
else:
LoadJsonToSampleName(l, prefix)

flist = {}
fdict = {}
nevts_total = 0
for sname in samplesdict.keys():
redirector = samplesdict[sname]['redirector']
flist[sname] = [(redirector+f) for f in samplesdict[sname]['files']]
fdict[sname] = [(redirector+f) for f in samplesdict[sname]['files']]
samplesdict[sname]['year'] = samplesdict[sname]['year']
samplesdict[sname]['xsec'] = float(samplesdict[sname]['xsec'])
samplesdict[sname]['nEvents'] = int(samplesdict[sname]['nEvents'])
Expand Down Expand Up @@ -190,133 +194,129 @@ def LoadJsonToSampleName(jsonFile, prefix):
print('pretending...')
exit()

# Extract the list of all WCs, as long as we haven't already specified one.
if len(wc_lst) == 0:
for k in samplesdict.keys():
for wc in samplesdict[k]['WCnames']:
if wc not in wc_lst:
wc_lst.append(wc)

if len(wc_lst) > 0:
# Yes, why not have the output be in correct English?
if len(wc_lst) == 1:
wc_print = wc_lst[0]
elif len(wc_lst) == 2:
wc_print = wc_lst[0] + ' and ' + wc_lst[1]
else:
wc_print = ', '.join(wc_lst[:-1]) + ', and ' + wc_lst[-1]
print('Wilson Coefficients: {}.'.format(wc_print))
else:
print('No Wilson coefficients specified')

processor_instance = wwz4l.AnalysisProcessor(samplesdict,wc_lst,hist_lst,ecut_threshold,do_errors,do_systs,split_lep_flavor,skip_sr,skip_cr)

if executor == "work_queue":
executor_args = {
'master_name': '{}-workqueue-coffea'.format(os.environ['USER']),

# find a port to run work queue in this range:
'port': port,

'debug_log': 'debug.log',
'transactions_log': 'tr.log',
'stats_log': 'stats.log',
'tasks_accum_log': 'tasks.log',

'environment_file': remote_environment.get_environment(
extra_pip=["mt2","xgboost"],
extra_pip_local = {"ewkcoffea": ["ewkcoffea", "setup.py"]},
),
'extra_input_files': ["wwz4l.py"],

'retries': 5,

# use mid-range compression for chunks results. 9 is the default for work
# queue in coffea. Valid values are 0 (minimum compression, less memory
# usage) to 16 (maximum compression, more memory usage).
'compression': 9,

# automatically find an adequate resource allocation for tasks.
# tasks are first tried using the maximum resources seen of previously ran
# tasks. on resource exhaustion, they are retried with the maximum resource
# values, if specified below. if a maximum is not specified, the task waits
# forever until a larger worker connects.
'resource_monitor': True,
'resources_mode': 'auto',

# this resource values may be omitted when using
# resources_mode: 'auto', but they do make the initial portion
# of a workflow run a little bit faster.
# Rather than using whole workers in the exploratory mode of
# resources_mode: auto, tasks are forever limited to a maximum
# of 8GB of mem and disk.
#
# NOTE: The very first tasks in the exploratory
# mode will use the values specified here, so workers need to be at least
# this large. If left unspecified, tasks will use whole workers in the
# exploratory mode.
# 'cores': 1,
# 'disk': 8000, #MB
# 'memory': 10000, #MB

# control the size of accumulation tasks. Results are
# accumulated in groups of size chunks_per_accum, keeping at
# most chunks_per_accum at the same time in memory per task.
'chunks_per_accum': 25,
'chunks_accum_in_mem': 2,

# terminate workers on which tasks have been running longer than average.
# This is useful for temporary conditions on worker nodes where a task will
# be finish faster is ran in another worker.
# the time limit is computed by multipliying the average runtime of tasks
# by the value of 'fast_terminate_workers'. Since some tasks can be
# legitimately slow, no task can trigger the termination of workers twice.
#
# warning: small values (e.g. close to 1) may cause the workflow to misbehave,
# as most tasks will be terminated.
#
# Less than 1 disables it.
'fast_terminate_workers': 0,

# print messages when tasks are submitted, finished, etc.,
# together with their resource allocation and usage. If a task
# fails, its standard output is also printed, so we can turn
# off print_stdout for all tasks.
'verbose': True,
'print_stdout': False,
}

# Run the processor and get the output
tstart = time.time()
t_start = time.time()


####################################3
### coffea2023 ###

#fdict = {"UL17_WWZJetsTo4L2Nu_forCI": ["/home/k.mohrman/coffea_dir/migrate_to_coffea2023_repo/ewkcoffea/analysis/wwz/output_1.root"]}

# Get fileset
fileset = {}
for name, fpaths in fdict.items():
fileset[name] = {}
fileset[name]["files"] = {}
for fpath in fpaths:
fileset[name]["files"][fpath] = {"object_path": "Events"}
fileset[name]["metadata"] = {"dataset": name}
print(fileset)
print("Number of datasets:",len(fdict))

#### Run preprocess, build task graphs, compute ####

t_before_with_Client_as_client = time.time()
#with Client(n_workers=8, threads_per_worker=1) as client:
with Client() as client:

# Run preprocess
print("\nRunning preprocess...")
t_before_preprocess = time.time()
dataset_runnable, dataset_updated = preprocess(
fileset,
step_size=100_000,
align_clusters=False,
files_per_batch=1,
save_form=False,
)
dataset_runnable = filter_files(dataset_runnable)


# Dump to a json
#with gzip.open("dataset_runnable_test.json.gz", "wt") as fout:
# json.dump(dataset_runnable, fout)
#exit()
# Or load a json
#with gzip.open("dataset_runnable_test_mar16_full.json.gz", "r") as f:
# dataset_runnable = json.load(f)


t_before_applytofileset = time.time()
# Run apply_to_fileset
print("\nRunning apply_to_fileset...")
histos_to_compute, reports = apply_to_fileset(
processor_instance,
dataset_runnable,
uproot_options={"allow_read_errors_with_report": True},
#parallelize_with_dask=True,
)

# Does not work
#with gzip.open("histos_to_compute_full_mar25.json.gz", "wb") as fout:
# cloudpickle.dump(histos_to_compute, fout)

# Check columns to be read
#import dask_awkward as dak
#print("\nRunning necessary_columns...")
#columns_read = dak.necessary_columns(histos_to_compute[list(histos_to_compute.keys())[0]])
#print(columns_read)

# Compute
t_before_compute = time.time()
print("\nRunning compute...")

if executor == "task_vine":
print("Running with Task vine")
m = DaskVine(
[9123,9128],
name=f"coffea-vine-{os.environ['USER']}",
#run_info_path="/blue/p.chang/k.mohrman/vine-run-info",
)
proxy = m.declare_file(f"/tmp/x509up_u{os.getuid()}", cache=True)
coutputs, reports = dask.compute(
histos_to_compute,
reports,
scheduler=m.get,
resources={"cores": 1},
resources_mode=None,
lazy_transfers=True,
#submit_per_cycle=200,
#max_pending=1500,
#task_mode='function-calls',
extra_files={proxy: "proxy.pem"},
env_vars={"X509_USER_PROXY": "proxy.pem"},
)

if executor == "futures":
exec_instance = processor.FuturesExecutor(workers=nworkers)
runner = processor.Runner(exec_instance, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks)
elif executor == "iterative":
exec_instance = processor.IterativeExecutor()
runner = processor.Runner(exec_instance, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks)
elif executor == "work_queue":
executor = processor.WorkQueueExecutor(**executor_args)
runner = processor.Runner(executor, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks, skipbadfiles=False, xrootdtimeout=180)
else:
coutputs, creports = dask.compute(histos_to_compute, reports)

output = runner(flist, treename, processor_instance)

dt = time.time() - tstart

if executor == "work_queue":
print('Processed {} events in {} seconds ({:.2f} evts/sec).'.format(nevts_total,dt,nevts_total/dt))

#nbins = sum(sum(arr.size for arr in h._sumw.values()) for h in output.values() if isinstance(h, hist.Hist))
#nfilled = sum(sum(np.sum(arr > 0) for arr in h._sumw.values()) for h in output.values() if isinstance(h, hist.Hist))
#print("Filled %.0f bins, nonzero bins: %1.1f %%" % (nbins, 100*nfilled/nbins,))

if executor == "futures":
print("Processing time: %1.2f s with %i workers (%.2f s cpu overall)" % (dt, nworkers, dt*nworkers, ))
# Print timing info
t_end = time.time()
dt = t_end - t_start
time_for_with_Client_as_client = t_before_preprocess - t_before_with_Client_as_client
time_for_preprocess = t_before_applytofileset - t_before_preprocess
time_for_applytofset = t_before_compute - t_before_applytofileset
time_for_compute = t_end - t_before_compute
print("\nTiming info:")
print(f"\tTime for with Client() as client: {round(time_for_with_Client_as_client,3)}s , ({round(time_for_with_Client_as_client/60,3)}m)")
print(f"\tTime for preprocess : {round(time_for_preprocess,3)}s , ({round(time_for_preprocess/60,3)}m)")
print(f"\tTime for apply to fileset : {round(time_for_applytofset,3)}s , ({round(time_for_applytofset/60,3)}m)")
print(f"\tTime for compute : {round(time_for_compute,3)}s , ({round(time_for_compute/60,3)}m)")
#print(f"\tSanity check, these should equal: {round(dt,3)} , {round(time_for_with_Client_as_client+time_for_preprocess+time_for_applytofset+time_for_compute,3)}")

# Save the output
if not os.path.isdir(outpath): os.system("mkdir -p %s"%outpath)
out_pkl_file = os.path.join(outpath,outname+".pkl.gz")
print(f"\nSaving output in {out_pkl_file}...")
with gzip.open(out_pkl_file, "wb") as fout:
cloudpickle.dump(output, fout)
cloudpickle.dump(coutputs, fout)
print("Done!")
7 changes: 7 additions & 0 deletions analysis/wwz/runit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Example run commands

# Process the copy of the files at UF (not xrd accessible)
#time python run_wwz4l.py ../../input_samples/cfgs/wwz_analysis/mc_sig_bkg_samples_hpg.cfg,../../input_samples/cfgs/wwz_analysis/data_samples_hpg.cfg --hist-list njets -x task_vine

# Process the copy of the files at UCSD (xrd accessible)
time python run_wwz4l.py ../../input_samples/cfgs/wwz_analysis/mc_sig_bkg_samples.cfg,../../input_samples/cfgs/wwz_analysis/data_samples.cfg --hist-list njets -x task_vine
Loading
Loading