Skip to content

Commit

Permalink
Reorganize looper and helper class
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-may committed Feb 4, 2021
1 parent bbed521 commit d512d21
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 112 deletions.
202 changes: 116 additions & 86 deletions Preselection/helpers/loop_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ def __init__(self, **kwargs):
self.fast = kwargs.get("fast")
self.dry_run = kwargs.get("dry_run")

self.do_plots = kwargs.get("do_plots")
self.do_tables = kwargs.get("do_tables")
self.do_ntuple = kwargs.get("do_ntuple")

self.outputs = []

if self.debug > 0:
Expand All @@ -65,53 +61,6 @@ def __init__(self, **kwargs):

self.load_samples()


def load_file(self, file, tree_name = "Events", data = False):
with uproot.open(file) as f:
if not f:
print("[LoopHelper] Problem opening file %s" % file)
return None
tree = f[tree_name]
if data:
branches = self.branches_data
else:
branches = self.branches
events = tree.arrays(branches, library = "ak", how = "zip")
#events = tree.arrays(branches, entry_start = 0, entry_stop = 10000, library = "ak", how = "zip")
# library = "ak" to load arrays as awkward arrays for best performance
# how = "zip" allows us to access arrays as records, e.g. events.Photon
return events

def select_events(self, events):
# Dipho preselection
events = photon_selections.diphoton_preselection(events, self.debug)
events.Photon = events.Photon[photon_selections.select_photons(events, self.debug)]

if self.selections == "HHggTauTau_InclusivePresel":
events = analysis_selections.ggTauTau_inclusive_preselection(events, self.debug)
events.Electron = events.Electron[lepton_selections.select_electrons(events, self.debug)]
events.Muon = events.Muon[lepton_selections.select_muons(events, self.debug)]
events.Tau = events.Tau[tau_selections.select_taus(events, self.debug)]

elif self.selections == "ttH_LeptonicPresel":
events = analysis_selections.tth_leptonic_preselection(events, self.debug)
events.Electron = events.Electron[lepton_selections.select_electrons(events, self.debug)]
events.Muon = events.Muon[lepton_selections.select_muons(events, self.debug)]

return events

def trim_events(self, events, data):
events = photon_selections.set_photons(events, self.debug)
events = lepton_selections.set_electrons(events, self.debug)
events = lepton_selections.set_muons(events, self.debug)
events = tau_selections.set_taus(events, self.debug)
if data:
branches = self.save_branches_data
else:
branches = self.save_branches
trimmed_events = events[branches]
return trimmed_events

def load_samples(self):
with open(self.samples, "r") as f_in:
self.samples_dict = json.load(f_in)
Expand All @@ -120,13 +69,32 @@ def load_samples(self):
print("[LoopHelper] Running over the following samples:")
print("\n".join(["{0}={1!r}".format(a, b) for a, b in self.samples_dict.items()]))

def chunks(self, files, fpo):
for i in range(0, len(files), fpo):
yield files[i : i + fpo]
##########################
### Main function: run ###
##########################

def run(self):
lumi_map = { "2016" : 35.9, "2017" : 41.5, "2018" : 59 } # FIXME: do in a more configurable way
self.prepare_jobs() # split files for each job, prepare relevants inputs (scale1fb, isData, etc)

start = time.time()
self.submit_jobs() # actually submit the jobs (local, Dask, condor)
elapsed_time = time.time() - start
print("[LoopHelper] Total time to run %d jobs on %d cores: %.2f minutes" % (len(self.jobs_manager), self.nCores, elapsed_time/60.))

start = time.time()
self.merge_outputs() # merge individual pkl files into a single master pkl
elapsed_time = time.time() - start
print("[LoopHelper] Total time to merge %d outputs: %.2f minutes" % (len(self.outputs), elapsed_time/60.))

self.write_summary() # write a json file containing run options

##################
### Core tasks ###
##################

def prepare_jobs(self):
lumi_map = { "2016" : 35.9, "2017" : 41.5, "2018" : 59 } # FIXME: do in a more configurable way

self.jobs_manager = []

for sample, info in self.samples_dict.items():
Expand All @@ -144,7 +112,7 @@ def run(self):
if len(files) == 0:
continue

job_info = {
job_info = {
"sample" : sample,
"process_id" : info["process_id"],
"year" : year,
Expand All @@ -164,16 +132,17 @@ def run(self):
output = self.output_dir + self.selections + "_" + self.output_tag + "_" + sample + "_" + year + "_" + str(job_id) + ".pkl"
self.jobs_manager.append({
"info" : job_info,
"output" : output,
"output" : output,
"files" : file_split
})
self.outputs.append(output)

return

def submit_jobs(self):
if self.batch == "local":
start = time.time()
if self.debug > 0:
print("[LoopHelper] Submitting %d jobs locally on %d cores" % (len(self.jobs_manager), self.nCores))

manager = multiprocessing.Manager()
running_procs = []
for job in self.jobs_manager:
Expand Down Expand Up @@ -206,10 +175,7 @@ def run(self):
except:
continue

if self.debug > 0:
elapsed_time = time.time() - start
print("[LoopHelper] Total time to run %d jobs on %d cores: %.2f minutes" % (len(self.jobs_manager), self.nCores, elapsed_time/60.))

return

elif self.batch == "dask":
return
Expand All @@ -218,15 +184,60 @@ def run(self):
return
#TODO

self.merge_outputs()
return
def merge_outputs(self):
master_file = self.output_dir + self.selections + "_" + self.output_tag + ".pkl"
master_df = pandas.DataFrame()
for file in self.outputs:
if self.debug > 0:
print("[LoopHelper] Loading file %s" % file)
if not os.path.exists(file):
continue
df = pandas.read_pickle(file)
master_df = pandas.concat([master_df, df], ignore_index=True)

def write_to_df(self, events, output_name):
df = awkward.to_pandas(events)
df.to_pickle(output_name)
return
master_df.to_pickle(master_file)

def write_summary(self):
summary_file = self.output_dir + self.selections + "_" + self.output_tag + ".json"
summary = vars(self)
with open(summary_file, "w") as f_out:
json.dump(summary, f_out, sort_keys = True, indent = 4)

################################
### Physics: selections, etc ###
################################

def select_events(self, events):
# Dipho preselection
events = photon_selections.diphoton_preselection(events, self.debug)
events.Photon = events.Photon[photon_selections.select_photons(events, self.debug)]

if self.selections == "HHggTauTau_InclusivePresel":
events = analysis_selections.ggTauTau_inclusive_preselection(events, self.debug)
events.Electron = events.Electron[lepton_selections.select_electrons(events, self.debug)]
events.Muon = events.Muon[lepton_selections.select_muons(events, self.debug)]
events.Tau = events.Tau[tau_selections.select_taus(events, self.debug)]

elif self.selections == "ttH_LeptonicPresel":
events = analysis_selections.tth_leptonic_preselection(events, self.debug)
events.Electron = events.Electron[lepton_selections.select_electrons(events, self.debug)]
events.Muon = events.Muon[lepton_selections.select_muons(events, self.debug)]

def loop_sample(self, job):
return events

def trim_events(self, events, data):
events = photon_selections.set_photons(events, self.debug)
events = lepton_selections.set_electrons(events, self.debug)
events = lepton_selections.set_muons(events, self.debug)
events = tau_selections.set_taus(events, self.debug)
if data:
branches = self.save_branches_data
else:
branches = self.save_branches
trimmed_events = events[branches]
return trimmed_events

def loop_sample(self, job):
info = job["info"]
sample = info["sample"]
files = job["files"]
Expand All @@ -239,7 +250,7 @@ def loop_sample(self, job):
data = True
else:
data = False

sel_evts = []
process_id = info["process_id"]

Expand All @@ -251,8 +262,8 @@ def loop_sample(self, job):
if events is None:
self.outputs.pop(output)
return
events = self.select_events(events)
events = self.select_events(events)

events["process_id"] = numpy.ones(len(events)) * process_id
if data:
events["weight"] = numpy.ones(len(events))
Expand All @@ -266,16 +277,35 @@ def loop_sample(self, job):
self.write_to_df(events_full, output)
return

def merge_outputs(self):
master_file = self.output_dir + self.selections + "_" + self.output_tag + ".pkl"
master_df = pandas.DataFrame()
for file in self.outputs:
if self.debug > 0:
print("[LoopHelper] Loading file %s" % file)
if not os.path.exists(file):
continue
df = pandas.read_pickle(file)
master_df = pandas.concat([master_df, df], ignore_index=True)
########################
### Helper functions ###
########################

master_df.to_pickle(master_file)

def load_file(self, file, tree_name = "Events", data = False):
with uproot.open(file) as f:
if not f:
print("[LoopHelper] Problem opening file %s" % file)
return None
tree = f[tree_name]
if not tree:
print("[LoopHelper] Problem opening file %s" % file)
return None

if data:
branches = self.branches_data
else:
branches = self.branches
events = tree.arrays(branches, library = "ak", how = "zip")
#events = tree.arrays(branches, entry_start = 0, entry_stop = 10000, library = "ak", how = "zip")
# library = "ak" to load arrays as awkward arrays for best performance
# how = "zip" allows us to access arrays as records, e.g. events.Photon
return events

def chunks(self, files, fpo):
for i in range(0, len(files), fpo):
yield files[i : i + fpo]

def write_to_df(self, events, output_name):
df = awkward.to_pandas(events)
df.to_pickle(output_name)
return
29 changes: 3 additions & 26 deletions Preselection/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
from helpers import loop_helper

"""
This script is an all-purpose looper. Can perform any subset of the
following tasks:
- loop over events and write to histograms/ntuple
- make data/MC plots from histograms/ntuple
- make yield tables from histograms/ntuple
This script is an all-purpose looper which performs a selection and writes
all events passing this selection to a pandas dataframe
"""

parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -42,7 +39,7 @@
type = str,
default = "data/HH_ggTauTau_default.json"
)
parser.add_argument(
parser.add_argument( #TODO
"--systematics",
help = "include systematics variations",
action = "store_true"
Expand All @@ -62,8 +59,6 @@
default = "output/"
)



# Technical
parser.add_argument(
"--batch",
Expand Down Expand Up @@ -94,24 +89,6 @@
action = "store_true"
)


# Output options
parser.add_argument(
"--do_plots",
help = "make data/MC plots",
action = "store_true"
)
parser.add_argument(
"--do_tables",
help = "make yield tables",
action = "store_true"
)
parser.add_argument(
"--do_ntuple",
help = "make single ntuple with all events",
action = "store_true"
)

args = parser.parse_args()

looper = loop_helper.LoopHelper(**vars(args))
Expand Down

0 comments on commit d512d21

Please sign in to comment.