Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Coffea2023 #14

Open
wants to merge 50 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
af6cde6
First updates for coffea2023
kmohrman Dec 19, 2023
37c42eb
More coffea 2023
kmohrman Dec 19, 2023
ca7de05
Do not need columns thing here
kmohrman Jan 3, 2024
747630d
Pass dict of samples to processor
kmohrman Jan 3, 2024
ce8d019
Do not loop over datasets in processor
kmohrman Jan 3, 2024
de47ff1
We pass the info for each json one at a time again
kmohrman Jan 3, 2024
e20d047
Update ele id xgb for coffea 2023
kmohrman Jan 4, 2024
a0e0937
Update mu xgb stuff too
kmohrman Jan 4, 2024
4b643d2
Reorganize xgb to factor out common part
kmohrman Jan 4, 2024
f1875f7
Uncomment lep ID and SFs
kmohrman Jan 4, 2024
40c5087
Make flake8 happy
kmohrman Jan 4, 2024
70d45f2
Need dask in conda env?
kmohrman Jan 4, 2024
3b894de
Dont need dak in processor
kmohrman Jan 4, 2024
f1fedb8
Ahh dont pin coffea version
kmohrman Jan 4, 2024
2573205
For now need coffea2023 branch of topcoffea
kmohrman Jan 4, 2024
9a7f7d3
Clean up
kmohrman Jan 4, 2024
2cfacfb
Enable sr bdt evaluation
kmohrman Jan 4, 2024
6d28fd1
Dont need xgb import
kmohrman Jan 4, 2024
5f6bdb2
Dont need np here
kmohrman Jan 4, 2024
f94e217
clean up run script
kmohrman Jan 4, 2024
e752129
In progress updates for scaling up
kmohrman Jan 4, 2024
d056850
Updates for distributed client
kmohrman Jan 5, 2024
0952ffc
Clean up run script a bit
kmohrman Jan 5, 2024
d8765cd
Tmp add run script and json for reproducing error
kmohrman Jan 7, 2024
192056a
Tmp make distributed client default in run script
kmohrman Jan 7, 2024
75ccf8b
Updates to run script
kmohrman Jan 8, 2024
4b70f10
Workaround from Lindsey to avoid overtouching
kmohrman Jan 11, 2024
b5411a0
No longer need workaround for None arrays, see ak issue 2768
kmohrman Jan 11, 2024
362a472
UAF 10 not working so switch to 8
kmohrman Jan 11, 2024
3be1cd9
Temp un-fix overtouching workaround
kmohrman Jan 11, 2024
a4a1856
Use events.nom for data for now, as weights object seems now to be No…
kmohrman Jan 12, 2024
1086d26
Remove this root file, it seems to be empty and causes error: files' …
kmohrman Jan 12, 2024
bdb9bd5
Add fsspec-xrootd to env file
kmohrman Jan 12, 2024
27da35d
Do not need to skip empty file anymore
kmohrman Jan 26, 2024
f8b185f
Merge remote-tracking branch 'origin/main' into coffea2023_systs
kmohrman Jan 28, 2024
b54550d
Deepcopy causes crash, probably dont even need copy.copy here
kmohrman Jan 28, 2024
25db1e1
Merge pull request #18 from cmstas/coffea2023_systs
kmohrman Jan 29, 2024
bcbfcdb
Fix weights for data
kmohrman Jan 30, 2024
26a0bb7
Try out parallelize_with_dask, temporary add exit after build task graph
kmohrman Feb 6, 2024
acbfc57
Clean up run script
kmohrman Mar 16, 2024
fc03300
Remove some old WQ stuff
kmohrman Mar 16, 2024
8920081
Using cache for masks
kmohrman Mar 17, 2024
43ae6b0
Remove unused import to make flake8 happy
kmohrman Mar 17, 2024
6d541cc
Adding jsons for hpg sample
kmohrman Mar 26, 2024
7f4cac4
Adding option to run with TaskVine
kmohrman Mar 26, 2024
0ed3830
Fix CI
kmohrman Mar 26, 2024
817f592
Fix some missing and duplicates, and add cfgs for the local jsons
kmohrman Apr 5, 2024
d3853cb
Update chunk size and just comment out the weird edge case problem fi…
kmohrman Jun 21, 2024
f91672e
Update README.md
kmohrman Jun 21, 2024
63abb06
Comment out samples for consistency, add run script
kmohrman Jun 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
run: |
mkdir dir_for_topcoffea
cd dir_for_topcoffea
git clone https://github.com/TopEFT/topcoffea.git
git clone -b coffea2023 https://github.com/TopEFT/topcoffea.git
cd topcoffea
conda run -n coffea-env pip install -e .
cd ../..
Expand Down
18 changes: 16 additions & 2 deletions analysis/wwz/get_wwz_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,32 @@ def get_counts(histos_dict):
return out_dict


# Restructure hists so samples are in category axis
def restructure_hist(in_dict):

dense_axes = list(list(in_dict.values())[0].keys())
restructured_hist = {}
for dense_axis in dense_axes:
for dataset_name in in_dict.keys():
if dense_axis not in restructured_hist:
restructured_hist[dense_axis] = in_dict[dataset_name][dense_axis]
else:
restructured_hist[dense_axis] += in_dict[dataset_name][dense_axis]
return restructured_hist


def main():

# Set up the command line parser
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--pkl-file-path", default="histos/plotsTopEFT.pkl.gz", help = "The path to the pkl file")
parser.add_argument("pkl_file_path", help = "The path to the pkl file")
parser.add_argument("-o", "--output-path", default=".", help = "The path the output files should be saved to")
parser.add_argument("-n", "--output-name", default="counts_wwz_sync", help = "A name for the output directory")
args = parser.parse_args()

# Get the counts from the input hiso
histo_dict = pickle.load(gzip.open(args.pkl_file_path))

histo_dict = restructure_hist(histo_dict)

counts_dict = get_counts(histo_dict)

Expand Down
8 changes: 8 additions & 0 deletions analysis/wwz/run_wrapper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Get the file the CI uses, and move it to the directory the JSON expects
printf "\nDownloading root file...\n"
wget -nc http://uaf-10.t2.ucsd.edu/~kmohrman/for_ci/for_wwz/WWZJetsTo4L2Nu_4F_TuneCP5_13TeV-amcatnlo-pythia8_RunIISummer20UL17NanoAODv9-106X_mc2017_realistic_v9-v2_NANOAODSIM_3LepTau_4Lep/output_1.root

# Run the processor via the run_wwz4l.py script
# Note the -x executor argument does not do anything anymore, though should keep it in for now since without it the code tries to default to wq so tries to start packaging up env

time python run_wwz4l.py ../../input_samples/sample_jsons/test_samples/UL17_WWZJetsTo4L2Nu_forCI.json,../../input_samples/sample_jsons/test_samples/UL17_WWZJetsTo4L2Nu_forCI_extra.json -x iterative
119 changes: 95 additions & 24 deletions analysis/wwz/run_wwz4l.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@
import cloudpickle
import gzip
import os
from coffea import processor
import dask
import dask_awkward as dak
from distributed import Client

from coffea.nanoevents import NanoAODSchema
from coffea.nanoevents import NanoEventsFactory

from coffea.dataset_tools import preprocess
from coffea.dataset_tools import apply_to_fileset

import topcoffea.modules.remote_environment as remote_environment

from ndcctools.taskvine import DaskVine

import wwz4l

LST_OF_KNOWN_EXECUTORS = ["futures","work_queue","iterative"]
Expand Down Expand Up @@ -165,11 +174,11 @@ def LoadJsonToSampleName(jsonFile, prefix):
else:
LoadJsonToSampleName(l, prefix)

flist = {}
fdict = {}
nevts_total = 0
for sname in samplesdict.keys():
redirector = samplesdict[sname]['redirector']
flist[sname] = [(redirector+f) for f in samplesdict[sname]['files']]
fdict[sname] = [(redirector+f) for f in samplesdict[sname]['files']]
samplesdict[sname]['year'] = samplesdict[sname]['year']
samplesdict[sname]['xsec'] = float(samplesdict[sname]['xsec'])
samplesdict[sname]['nEvents'] = int(samplesdict[sname]['nEvents'])
Expand Down Expand Up @@ -306,34 +315,96 @@ def LoadJsonToSampleName(jsonFile, prefix):
# Run the processor and get the output
tstart = time.time()

if executor == "futures":
exec_instance = processor.FuturesExecutor(workers=nworkers)
runner = processor.Runner(exec_instance, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks)
elif executor == "iterative":
exec_instance = processor.IterativeExecutor()
runner = processor.Runner(exec_instance, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks)
elif executor == "work_queue":
executor = processor.WorkQueueExecutor(**executor_args)
runner = processor.Runner(executor, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks, skipbadfiles=False, xrootdtimeout=180)
####################################3
### coffea2023 ###

# Get fileset
fileset = {}
for name, fpaths in fdict.items():
fileset[name] = {}
fileset[name]["files"] = {}
for fpath in fpaths:
fileset[name]["files"][fpath] = {"object_path": "Events"}
fileset[name]["metadata"] = {"dataset": name}
print(fileset)
print("Number of datasets:",len(fdict))


#### Try with distributed Client ####

#with dask.config.set({"scheduler": "sync"}): # Single thread
#with Client() as _: # distributed Client scheduler
with Client() as client:

# Run preprocess
print("\nRunning preprocess...")
dataset_runnable, dataset_updated = preprocess(
fileset,
maybe_step_size=50_000,
align_clusters=False,
files_per_batch=1,
#skip_bad_files=True,
#calculate_form=True,
)

# Run apply_to_fileset
print("\nRunning apply_to_fileset...")
histos_to_compute, reports = apply_to_fileset(
processor_instance,
dataset_runnable,
uproot_options={"allow_read_errors_with_report": True}
)

# Check columns to be read
print("\nRunning necessary_columns...")
columns_read = dak.necessary_columns(histos_to_compute[list(histos_to_compute.keys())[0]])
print(columns_read)

# Compute
print("\nRunning compute...")
output_futures, report_futures = {}, {}
for key in histos_to_compute:
output_futures[key], report_futures[key] = client.compute((histos_to_compute[key], reports[key],)) # , scheduler=taskvine

coutputs, creports = client.gather((output_futures, report_futures,))



### Task vine testing ###
do_tv = 0
if do_tv:

fdict = {"UL17_WWZJetsTo4L2Nu_forCI": ["/home/k.mohrman/coffea_dir/migrate_to_coffea2023_repo/ewkcoffea/analysis/wwz/output_1.root"]}

# Create dict of events objects
print("Number of datasets:",len(fdict))
events_dict = {}
for name, fpaths in fdict.items():
events_dict[name] = NanoEventsFactory.from_root(
{fpath: "/Events" for fpath in fpaths},
schemaclass=NanoAODSchema,
metadata={"dataset": name},
).events()

# Get and compute the histograms
histos_to_compute = {}
for json_name in fdict.keys():
print(f"Getting histos for {json_name}")
histos_to_compute[json_name] = processor_instance.process(events_dict[json_name])

m = DaskVine([9123,9128], name=f"coffea-vine-{os.environ['USER']}")

print("Compute histos")
#output = dask.compute(histos_to_compute)[0] # Output of dask.compute is a tuple
coutputs = dask.compute(histos_to_compute, scheduler=m.get, resources={"cores": 1}, resources_mode=None, lazy_transfers=True)

output = runner(flist, treename, processor_instance)

dt = time.time() - tstart

if executor == "work_queue":
print('Processed {} events in {} seconds ({:.2f} evts/sec).'.format(nevts_total,dt,nevts_total/dt))

#nbins = sum(sum(arr.size for arr in h._sumw.values()) for h in output.values() if isinstance(h, hist.Hist))
#nfilled = sum(sum(np.sum(arr > 0) for arr in h._sumw.values()) for h in output.values() if isinstance(h, hist.Hist))
#print("Filled %.0f bins, nonzero bins: %1.1f %%" % (nbins, 100*nfilled/nbins,))

if executor == "futures":
print("Processing time: %1.2f s with %i workers (%.2f s cpu overall)" % (dt, nworkers, dt*nworkers, ))

# Save the output
if not os.path.isdir(outpath): os.system("mkdir -p %s"%outpath)
out_pkl_file = os.path.join(outpath,outname+".pkl.gz")
print(f"\nSaving output in {out_pkl_file}...")
with gzip.open(out_pkl_file, "wb") as fout:
cloudpickle.dump(output, fout)
cloudpickle.dump(coutputs, fout)
print("Done!")
90 changes: 51 additions & 39 deletions analysis/wwz/wwz4l.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
get_tc_param = GetParam(topcoffea_path("params/params.json"))
get_ec_param = GetParam(ewkcoffea_path("params/params.json"))

import hist.dask as hda



class AnalysisProcessor(processor.ProcessorABC):
Expand Down Expand Up @@ -130,16 +132,25 @@ def __init__(self, samples, wc_names_lst=[], hist_lst=None, ecut_threshold=None,
self._skip_control_regions = skip_control_regions # Whether to skip the CR categories


@property
def columns(self):
return self._columns

# Main function: run on a given dataset
def process(self, events):

# Loop over samples and fill histos
#hout = {}
#for events in events_dict.values():

# Dataset parameters
dataset = events.metadata["dataset"]

# If we pass the root files instead of events object
#from coffea.nanoevents import NanoEventsFactory
#from coffea.nanoevents import NanoAODSchema
#events = NanoEventsFactory.from_root(
# {fpath: "/Events" for fpath in fpaths},
# schemaclass=NanoAODSchema,
# metadata={"dataset": dataset},
#).events()

isData = self._samples[dataset]["isData"]
histAxisName = self._samples[dataset]["histAxisName"]
year = self._samples[dataset]["year"]
Expand Down Expand Up @@ -305,7 +316,7 @@ def process(self, events):
# We only calculate these values if not isData
# Note: add() will generally modify up/down weights, so if these are needed for any reason after this point, we should instead pass copies to add()
# Note: Here we will to the weights object the SFs that do not depend on any of the forthcoming loops
weights_obj_base = coffea.analysis_tools.Weights(len(events),storeIndividual=True)
weights_obj_base = coffea.analysis_tools.Weights(None,storeIndividual=True)
if not isData:
genw = events["genWeight"]

Expand All @@ -328,7 +339,7 @@ def process(self, events):
for syst_var in syst_var_list:
# Make a copy of the base weights object, so that each time through the loop we do not double count systs
# In this loop over systs that impact kinematics, we will add to the weights objects the SFs that depend on the object kinematics
weights_obj_base_for_kinematic_syst = copy.deepcopy(weights_obj_base)
weights_obj_base_for_kinematic_syst = copy.copy(weights_obj_base) # TODO do we need copy here?


#################### Jets ####################
Expand Down Expand Up @@ -396,8 +407,8 @@ def process(self, events):
######### Apply SFs #########

if not isData:
weights_obj_base_for_kinematic_syst.add("lepSF_muon", events.sf_4l_muon, copy.deepcopy(events.sf_4l_hi_muon), copy.deepcopy(events.sf_4l_lo_muon))
weights_obj_base_for_kinematic_syst.add("lepSF_elec", events.sf_4l_elec, copy.deepcopy(events.sf_4l_hi_elec), copy.deepcopy(events.sf_4l_lo_elec))
weights_obj_base_for_kinematic_syst.add("lepSF_muon", events.sf_4l_muon, copy.copy(events.sf_4l_hi_muon), copy.copy(events.sf_4l_lo_muon)) # TODO do we still need to copy in dask version
weights_obj_base_for_kinematic_syst.add("lepSF_elec", events.sf_4l_elec, copy.copy(events.sf_4l_hi_elec), copy.copy(events.sf_4l_lo_elec))

### OLD implimentation from TOP-22-006
## Btag SF following 1a) in https://twiki.cern.ch/twiki/bin/viewauth/CMS/BTagSFMethods
Expand Down Expand Up @@ -556,34 +567,36 @@ def process(self, events):

# lb pairs (i.e. always one lep, one bjet)
bjets = goodJets[isBtagJetsLoose]
lb_pairs = ak.cartesian({"l":l_wwz_t,"j":bjets})
mlb_min = ak.min((lb_pairs["l"] + lb_pairs["j"]).mass,axis=-1)
mlb_max = ak.max((lb_pairs["l"] + lb_pairs["j"]).mass,axis=-1)
#lb_pairs = ak.cartesian({"l":l_wwz_t,"j":bjets})
#mlb_min = ak.min((lb_pairs["l"] + lb_pairs["j"]).mass,axis=-1)
#mlb_max = ak.max((lb_pairs["l"] + lb_pairs["j"]).mass,axis=-1)

# Get BDT values
bdt_vars = [
ak.fill_none(mll_wl0_wl1,-9999),
ak.fill_none(dphi_4l_met,-9999),
ak.fill_none(dphi_zleps_met,-9999),
ak.fill_none(dphi_wleps_met,-9999),
ak.fill_none(dr_wl0_wl1,-9999),
ak.fill_none(dr_zl0_zl1,-9999),
ak.fill_none(dr_wleps_zleps,-9999),
ak.fill_none(met.pt,-9999),
ak.fill_none(mt2_val,-9999),
ak.fill_none(ptl4,-9999),
ak.fill_none(scalarptsum_lepmet,-9999),
ak.fill_none(scalarptsum_lepmetjet,-9999),
ak.fill_none(z_lep0.pt,-9999),
ak.fill_none(z_lep1.pt,-9999),
ak.fill_none(w_lep0.pt,-9999),
ak.fill_none(w_lep1.pt,-9999),
]

bdt_of_wwz_raw = es_ec.eval_sig_bdt(events,bdt_vars,ewkcoffea_path("data/wwz_zh_bdt/of_WWZ.json"))
bdt_sf_wwz_raw = es_ec.eval_sig_bdt(events,bdt_vars,ewkcoffea_path("data/wwz_zh_bdt/sf_WWZ.json"))
bdt_of_zh_raw = es_ec.eval_sig_bdt(events,bdt_vars,ewkcoffea_path("data/wwz_zh_bdt/of_ZH.json"))
bdt_sf_zh_raw = es_ec.eval_sig_bdt(events,bdt_vars,ewkcoffea_path("data/wwz_zh_bdt/sf_ZH.json"))
bdt_feat_lst = [ "m_ll", "dPhi_4Lep_MET", "dPhi_Zcand_MET", "dPhi_WW_MET", "dR_Wcands", "dR_Zcands", "dR_WW_Z", "MET", "MT2", "Pt4l", "STLepHad", "STLep", "leading_Zcand_pt", "subleading_Zcand_pt", "leading_Wcand_pt", "subleading_Wcand_pt"]
bdt_var_dict = {
"m_ll" : ak.fill_none(mll_wl0_wl1,-9999),
"dPhi_4Lep_MET" : ak.fill_none(dphi_4l_met,-9999),
"dPhi_Zcand_MET" : ak.fill_none(dphi_zleps_met,-9999),
"dPhi_WW_MET" : ak.fill_none(dphi_wleps_met,-9999),
"dR_Wcands" : ak.fill_none(dr_wl0_wl1,-9999),
"dR_Zcands" : ak.fill_none(dr_zl0_zl1,-9999),
"dR_WW_Z" : ak.fill_none(dr_wleps_zleps,-9999),
"MET" : ak.fill_none(met.pt,-9999),
"MT2" : ak.fill_none(mt2_val,-9999),
"Pt4l" : ak.fill_none(ptl4,-9999),
"STLepHad" : ak.fill_none(scalarptsum_lepmet,-9999),
"STLep" : ak.fill_none(scalarptsum_lepmetjet,-9999),
"leading_Zcand_pt" : ak.fill_none(z_lep0.pt,-9999),
"subleading_Zcand_pt": ak.fill_none(z_lep1.pt,-9999),
"leading_Wcand_pt" : ak.fill_none(w_lep0.pt,-9999),
"subleading_Wcand_pt": ak.fill_none(w_lep1.pt,-9999),
}

bdt_of_wwz_raw = os_ec.xgb_eval_wrapper(bdt_feat_lst,bdt_var_dict,ewkcoffea_path("data/wwz_zh_bdt/of_WWZ.json"))
bdt_sf_wwz_raw = os_ec.xgb_eval_wrapper(bdt_feat_lst,bdt_var_dict,ewkcoffea_path("data/wwz_zh_bdt/sf_WWZ.json"))
bdt_of_zh_raw = os_ec.xgb_eval_wrapper(bdt_feat_lst,bdt_var_dict,ewkcoffea_path("data/wwz_zh_bdt/of_ZH.json"))
bdt_sf_zh_raw = os_ec.xgb_eval_wrapper(bdt_feat_lst,bdt_var_dict,ewkcoffea_path("data/wwz_zh_bdt/sf_ZH.json"))

# Match TMVA's scaling https://root.cern.ch/doc/v606/MethodBDT_8cxx_source.html
bdt_of_wwz = (2.0*((1.0+math.e**(-2*bdt_of_wwz_raw))**(-1))) - 1.0
bdt_sf_wwz = (2.0*((1.0+math.e**(-2*bdt_sf_wwz_raw))**(-1))) - 1.0
Expand All @@ -593,8 +606,6 @@ def process(self, events):

######### Fill histos #########

hout = {}

dense_variables_dict = {
"mt2" : mt2_val,
"met" : met.pt,
Expand Down Expand Up @@ -652,8 +663,8 @@ def process(self, events):
"mll_min_afos" : mll_min_afos,
"mll_min_sfos" : mll_min_sfos,

"mlb_min" : mlb_min,
"mlb_max" : mlb_max,
#"mlb_min" : mlb_min,
#"mlb_max" : mlb_max,

"bdt_of_wwz_raw": bdt_of_wwz_raw,
"bdt_sf_wwz_raw": bdt_sf_wwz_raw,
Expand Down Expand Up @@ -726,12 +737,13 @@ def process(self, events):


# Loop over the hists we want to fill
hout = {}
for dense_axis_name, dense_axis_vals in dense_variables_dict.items():
#print("\ndense_axis_name,vals",dense_axis_name)
#print("dense_axis_name,vals",dense_axis_vals)

# Create the hist for this dense axis variable
hout[dense_axis_name] = hist.Hist(
hout[dense_axis_name] = hda.Hist(
hist.axis.StrCategory([], growth=True, name="process", label="process"),
hist.axis.StrCategory([], growth=True, name="category", label="category"),
self._dense_axes_dict[dense_axis_name],
Expand Down
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ channels:
dependencies:
- conda-forge::python=3.9
- numpy=1.23.5
- coffea==0.7.22
- coffea
- ndcctools
- conda-pack
- dill
Expand Down
Loading
Loading