Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calendar Coffea processor and remove futures executor #391

Open
wants to merge 15 commits into
base: coffea2023
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
run: |
mkdir dir_for_topcoffea
cd dir_for_topcoffea
git clone https://github.com/TopEFT/topcoffea.git
git clone -b dask_hist https://github.com/TopEFT/topcoffea.git
cd topcoffea
conda run -n coffea-env pip install -e .
cd ../..
Expand Down
51 changes: 31 additions & 20 deletions analysis/topeft_run2/analysis_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import coffea
import numpy as np
import awkward as ak
import dask_awkward as dak

import hist
from topcoffea.modules.histEFT import HistEFT
Expand All @@ -20,7 +21,8 @@

from topeft.modules.axes import info as axes_info
from topeft.modules.paths import topeft_path
from topeft.modules.corrections import GetBTagSF, ApplyJetCorrections, GetBtagEff, AttachMuonSF, AttachElectronSF, AttachPerLeptonFR, GetPUSF, ApplyRochesterCorrections, ApplyJetSystematics, AttachPSWeights, AttachScaleWeights, GetTriggerSF
from topeft.modules.corrections import GetBTagSF, ApplyJetCorrections, GetBtagEff, AttachMuonSF, AttachElectronSF, AttachPerLeptonFR, GetPUSF, ApplyJetSystematics, AttachPSWeights, AttachScaleWeights, GetTriggerSF
#from topeft.modules.corrections import GetBTagSF, ApplyJetCorrections, GetBtagEff, AttachMuonSF, AttachElectronSF, AttachPerLeptonFR, GetPUSF, ApplyRochesterCorrections, ApplyJetSystematics, AttachPSWeights, AttachScaleWeights, GetTriggerSF
import topeft.modules.event_selection as te_es
import topeft.modules.object_selection as te_os

Expand Down Expand Up @@ -228,7 +230,7 @@ def process(self, events):

# Extract the EFT quadratic coefficients and optionally use them to calculate the coefficients on the w**2 quartic function
# eft_coeffs is never Jagged so convert immediately to numpy for ease of use.
eft_coeffs = ak.to_numpy(events["EFTfitCoefficients"]) if hasattr(events, "EFTfitCoefficients") else None
eft_coeffs = ak.flatten(events["EFTfitCoefficients"]) if hasattr(events, "EFTfitCoefficients") else None
if eft_coeffs is not None:
# Check to see if the ordering of WCs for this sample matches what want
if self._samples[dataset]["WCnames"] != self._wc_names_lst:
Expand All @@ -246,7 +248,8 @@ def process(self, events):

################### Muon selection ####################

mu["pt"] = ApplyRochesterCorrections(year, mu, isData) # Need to apply corrections before doing muon selection
#TODO Enable when rnd(shape) is sorted out
#mu["pt"] = ApplyRochesterCorrections(year, mu, isData) # Need to apply corrections before doing muon selection
mu["isPres"] = te_os.isPresMuon(mu.dxy, mu.dz, mu.sip3d, mu.eta, mu.pt, mu.miniPFRelIso_all)
mu["isLooseM"] = te_os.isLooseMuon(mu.miniPFRelIso_all,mu.sip3d,mu.looseId)
mu["isFO"] = te_os.isFOMuon(mu.pt, mu.conept, mu.btagDeepFlavB, mu.mvaTTHUL, mu.jetRelIso, year)
Expand Down Expand Up @@ -305,12 +308,12 @@ def process(self, events):
# We only calculate these values if not isData
# Note: add() will generally modify up/down weights, so if these are needed for any reason after this point, we should instead pass copies to add()
# Note: Here we will to the weights object the SFs that do not depend on any of the forthcoming loops
weights_obj_base = coffea.analysis_tools.Weights(len(events),storeIndividual=True)
weights_obj_base = coffea.analysis_tools.Weights(None,storeIndividual=True)
if not isData:

# If this is no an eft sample, get the genWeight
if eft_coeffs is None: genw = events["genWeight"]
else: genw= np.ones_like(events["event"])
else: genw= ak.ones_like(events["event"])

# Normalize by (xsec/sow)*genw where genw is 1 for EFT samples
# Note that for theory systs, will need to multiply by sow/sow_wgtUP to get (xsec/sow_wgtUp)*genw and same for Down
Expand Down Expand Up @@ -344,15 +347,17 @@ def process(self, events):
for syst_var in syst_var_list:
# Make a copy of the base weights object, so that each time through the loop we do not double count systs
# In this loop over systs that impact kinematics, we will add to the weights objects the SFs that depend on the object kinematics
weights_obj_base_for_kinematic_syst = copy.deepcopy(weights_obj_base)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to verify this didn't change the results.

weights_obj_base_for_kinematic_syst = copy.copy(weights_obj_base)

#################### Jets ####################

# Jet cleaning, before any jet selection
#vetos_tocleanjets = ak.with_name( ak.concatenate([tau, l_fo], axis=1), "PtEtaPhiMCandidate")
vetos_tocleanjets = ak.with_name( l_fo, "PtEtaPhiMCandidate")
tmp = ak.cartesian([ak.local_index(jets.pt), vetos_tocleanjets.jetIdx], nested=True)
cleanedJets = jets[~ak.any(tmp.slot0 == tmp.slot1, axis=-1)] # this line should go before *any selection*, otherwise lep.jetIdx is not aligned with the jet index
tmp = ak.local_index(jets.pt) == vetos_tocleanjets.jetIdx
#cleanedJets = jets[~ak.any(tmp, axis=-1)] # this line should go before *any selection*, otherwise lep.jetIdx is not aligned with the jet index
#TODO fix jet cleaning
cleanedJets = jets

# Selecting jets and cleaning them
jetptname = "pt_nom" if hasattr(cleanedJets, "pt_nom") else "pt"
Expand All @@ -363,11 +368,12 @@ def process(self, events):
cleanedJets["mass_raw"] = (1 - cleanedJets.rawFactor)*cleanedJets.mass
cleanedJets["pt_gen"] =ak.values_astype(ak.fill_none(cleanedJets.matched_gen.pt, 0), np.float32)
cleanedJets["rho"] = ak.broadcast_arrays(events.fixedGridRhoFastjetAll, cleanedJets.pt)[0]
events_cache = events.caches[0]
cleanedJets = ApplyJetCorrections(year, corr_type='jets').build(cleanedJets, lazy_cache=events_cache)
#TODO implement or remove
#events_cache = events.caches[0]
cleanedJets = ApplyJetCorrections(year, corr_type='jets').build(cleanedJets)#, lazy_cache=events_cache)
# SYSTEMATICS
cleanedJets=ApplyJetSystematics(year,cleanedJets,syst_var)
met=ApplyJetCorrections(year, corr_type='met').build(met_raw, cleanedJets, lazy_cache=events_cache)
met=ApplyJetCorrections(year, corr_type='met').build(met_raw, cleanedJets)#, lazy_cache=events_cache)
cleanedJets["isGood"] = tc_os.is_tight_jet(getattr(cleanedJets, jetptname), cleanedJets.eta, cleanedJets.jetId, pt_cut=30., eta_cut=get_te_param("eta_j_cut"), id_cut=get_te_param("jet_id_cut"))
goodJets = cleanedJets[cleanedJets.isGood]

Expand Down Expand Up @@ -433,6 +439,7 @@ def process(self, events):
# Btag SF following 1a) in https://twiki.cern.ch/twiki/bin/viewauth/CMS/BTagSFMethods
isBtagJetsLooseNotMedium = (isBtagJetsLoose & isNotBtagJetsMedium)
bJetSF = [GetBTagSF(goodJets, year, 'LOOSE'),GetBTagSF(goodJets, year, 'MEDIUM')]
#TODO fix SFs pkl
bJetEff = [GetBtagEff(goodJets, year, 'loose'),GetBtagEff(goodJets, year, 'medium')]
bJetEff_data = [bJetEff[0]*bJetSF[0],bJetEff[1]*bJetSF[1]]
pMC = ak.prod(bJetEff[1] [isBtagJetsMedium], axis=-1) * ak.prod((bJetEff[0] [isBtagJetsLooseNotMedium] - bJetEff[1] [isBtagJetsLooseNotMedium]), axis=-1) * ak.prod((1-bJetEff[0] [isNotBtagJetsLoose]), axis=-1)
Expand All @@ -452,7 +459,8 @@ def process(self, events):

# Trigger SFs
GetTriggerSF(year,events,l0,l1)
weights_obj_base_for_kinematic_syst.add(f"triggerSF_{year}", events.trigger_sf, copy.deepcopy(events.trigger_sfUp), copy.deepcopy(events.trigger_sfDown)) # In principle does not have to be in the lep cat loop
#TODO Enable after trigger SFs are fixed
#weights_obj_base_for_kinematic_syst.add(f"triggerSF_{year}", events.trigger_sf, copy.deepcopy(events.trigger_sfUp), copy.deepcopy(events.trigger_sfDown)) # In principle does not have to be in the lep cat loop


######### Event weights that do depend on the lep cat ###########
Expand All @@ -462,7 +470,9 @@ def process(self, events):
for ch_name in ["2l", "2l_4t", "3l", "4l", "2l_CR", "2l_CRflip", "3l_CR", "2los_CRtt", "2los_CRZ"]:

# For both data and MC
weights_dict[ch_name] = copy.deepcopy(weights_obj_base_for_kinematic_syst)
weights_dict[ch_name] = copy.copy(weights_obj_base_for_kinematic_syst)
#TODO Enable after fake rates are fixed
'''
if ch_name.startswith("2l"):
weights_dict[ch_name].add("FF", events.fakefactor_2l, copy.deepcopy(events.fakefactor_2l_up), copy.deepcopy(events.fakefactor_2l_down))
weights_dict[ch_name].add("FFpt", events.nom, copy.deepcopy(events.fakefactor_2l_pt1/events.fakefactor_2l), copy.deepcopy(events.fakefactor_2l_pt2/events.fakefactor_2l))
Expand All @@ -480,18 +490,19 @@ def process(self, events):
if isData:
if ch_name in ["2l","2l_4t","2l_CR","2l_CRflip"]:
weights_dict[ch_name].add("fliprate", events.flipfactor_2l)
'''

# For MC only
if not isData:
if ch_name.startswith("2l"):
weights_dict[ch_name].add("lepSF_muon", events.sf_2l_muon, copy.deepcopy(events.sf_2l_hi_muon), copy.deepcopy(events.sf_2l_lo_muon))
weights_dict[ch_name].add("lepSF_elec", events.sf_2l_elec, copy.deepcopy(events.sf_2l_hi_elec), copy.deepcopy(events.sf_2l_lo_elec))
weights_dict[ch_name].add("lepSF_muon", events.sf_2l_muon, copy.copy(events.sf_2l_hi_muon), copy.copy(events.sf_2l_lo_muon))
weights_dict[ch_name].add("lepSF_elec", events.sf_2l_elec, copy.copy(events.sf_2l_hi_elec), copy.copy(events.sf_2l_lo_elec))
elif ch_name.startswith("3l"):
weights_dict[ch_name].add("lepSF_muon", events.sf_3l_muon, copy.deepcopy(events.sf_3l_hi_muon), copy.deepcopy(events.sf_3l_lo_muon))
weights_dict[ch_name].add("lepSF_elec", events.sf_3l_elec, copy.deepcopy(events.sf_3l_hi_elec), copy.deepcopy(events.sf_3l_lo_elec))
weights_dict[ch_name].add("lepSF_muon", events.sf_3l_muon, copy.copy(events.sf_3l_hi_muon), copy.copy(events.sf_3l_lo_muon))
weights_dict[ch_name].add("lepSF_elec", events.sf_3l_elec, copy.copy(events.sf_3l_hi_elec), copy.copy(events.sf_3l_lo_elec))
elif ch_name.startswith("4l"):
weights_dict[ch_name].add("lepSF_muon", events.sf_4l_muon, copy.deepcopy(events.sf_4l_hi_muon), copy.deepcopy(events.sf_4l_lo_muon))
weights_dict[ch_name].add("lepSF_elec", events.sf_4l_elec, copy.deepcopy(events.sf_4l_hi_elec), copy.deepcopy(events.sf_4l_lo_elec))
weights_dict[ch_name].add("lepSF_muon", events.sf_4l_muon, copy.copy(events.sf_4l_hi_muon), copy.copy(events.sf_4l_lo_muon))
weights_dict[ch_name].add("lepSF_elec", events.sf_4l_elec, copy.copy(events.sf_4l_hi_elec), copy.copy(events.sf_4l_lo_elec))
else:
raise Exception(f"Unknown channel name: {ch_name}")

Expand Down Expand Up @@ -635,7 +646,7 @@ def process(self, events):
ecut_mask = (ljptsum<self._ecut_threshold)

# Counts
counts = np.ones_like(events['event'])
counts = ak.ones_like(events['event'])

# Variables we will loop over when filling hists
varnames = {}
Expand Down
75 changes: 73 additions & 2 deletions analysis/topeft_run2/run_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@
import cloudpickle
import gzip
import os
import dask
import dask_awkward as dak
from distributed import Client

from coffea import processor
from coffea.nanoevents import NanoAODSchema
from coffea.dataset_tools import (
apply_to_fileset,
#max_chunks,
preprocess,
)
#from coffea.dataset_tools import filter_files

import topcoffea.modules.utils as utils
import topcoffea.modules.remote_environment as remote_environment
Expand Down Expand Up @@ -317,11 +324,75 @@ def LoadJsonToSampleName(jsonFile, prefix):
'print_stdout': False,
}

# Get fileset
fileset = {}
for name, fpaths in flist.items():
fileset[name] = {}
fileset[name]["files"] = {}
for fpath in fpaths:
fileset[name]["files"][fpath] = {"object_path": "Events"}
fileset[name]["metadata"] = {"dataset": name}
print(f'{fileset=}')
print("Number of datasets:",len(fileset))

# Run the processor and get the output
tstart = time.time()

if executor == "futures":
output = dask.compute(processor_instance)
'''
dataset_runnable, dataset_updated = preprocess(
fileset,
align_clusters=False,
#step_size=100_000,
files_per_batch=1,
skip_bad_files=True,
#save_form=False,
)
to_compute = apply_to_fileset(
processor_instance,
max_chunks(dataset_runnable, 300),
#schemaclass=BaseSchema,
)
(out,) = dask.compute(to_compute)
'''
with Client(n_workers=8, threads_per_worker=1) as client:

# Run preprocess
print("\nRunning preprocess...")
dataset_runnable, dataset_updated = preprocess(
fileset,
#step_size=50_000, # missing in current version?
align_clusters=False,
files_per_batch=1,
#save_form=True,
)
#dataset_runnable = filter_files(dataset_runnable)

t_beforeapplytofileset = time.time()
# Run apply_to_fileset
print("\nRunning apply_to_fileset...")
histos_to_compute, reports = apply_to_fileset(
processor_instance,
dataset_runnable,
uproot_options={"allow_read_errors_with_report": True},
#parallelize_with_dask=True,
)

print("DONE with apply to fileset")

# Check columns to be read
print("\nRunning necessary_columns...")
columns_read = dak.necessary_columns(histos_to_compute[list(histos_to_compute.keys())[0]])
print(columns_read)

t_beforecompute = time.time()
# Compute
print("\nRunning compute...")
output_futures, report_futures = {}, {}
for key in histos_to_compute:
output_futures[key], report_futures[key] = client.compute((histos_to_compute[key], reports[key],))

coutputs, creports = client.gather((output_futures, report_futures,))
elif executor == "work_queue":
executor = processor.WorkQueueExecutor(**executor_args)
runner = processor(executor, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks, skipbadfiles=False, xrootdtimeout=180)
Expand Down
Binary file modified topeft/data/btagSF/UL/btagMCeff_2016.pkl.gz
Binary file not shown.
Binary file modified topeft/data/btagSF/UL/btagMCeff_2016APV.pkl.gz
Binary file not shown.
Binary file modified topeft/data/btagSF/UL/btagMCeff_2017.pkl.gz
Binary file not shown.
Binary file modified topeft/data/btagSF/UL/btagMCeff_2018.pkl.gz
Binary file not shown.
Binary file modified topeft/data/fliprates/flip_probs_topcoffea_UL16.pkl.gz
Binary file not shown.
Binary file modified topeft/data/fliprates/flip_probs_topcoffea_UL16APV.pkl.gz
Binary file not shown.
Binary file modified topeft/data/fliprates/flip_probs_topcoffea_UL17.pkl.gz
Binary file not shown.
Binary file modified topeft/data/fliprates/flip_probs_topcoffea_UL18.pkl.gz
Binary file not shown.
Binary file modified topeft/data/triggerSF/triggerSF_2016.pkl.gz
Binary file not shown.
Binary file modified topeft/data/triggerSF/triggerSF_2016APV.pkl.gz
Binary file not shown.
Binary file modified topeft/data/triggerSF/triggerSF_2017.pkl.gz
Binary file not shown.
Binary file modified topeft/data/triggerSF/triggerSF_2018.pkl.gz
Binary file not shown.
Loading
Loading