Skip to content

Commit

Permalink
feat: improve handling of optional dependencies (#213)
Browse files Browse the repository at this point in the history
* better handling of optional dependencies: ServiceX and cabinetry on worker nodes

* fix indent

* fix CI
  • Loading branch information
alexander-held authored Nov 17, 2023
1 parent 382e512 commit 65db0d6
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 41 deletions.
46 changes: 17 additions & 29 deletions analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.ipynb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"id": "0dc37683",
"metadata": {},
Expand All @@ -25,7 +24,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "404927fe",
"metadata": {},
Expand All @@ -37,7 +35,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "bd72d323",
"metadata": {},
Expand Down Expand Up @@ -65,8 +62,6 @@
"from coffea.nanoevents import NanoAODSchema\n",
"from coffea.analysis_tools import PackedSelection\n",
"import copy\n",
"from func_adl import ObjectStream\n",
"from func_adl_servicex import ServiceXSourceUpROOT\n",
"import hist\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
Expand All @@ -78,7 +73,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "cd573bb1",
"metadata": {},
Expand Down Expand Up @@ -135,7 +129,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "141d6520",
"metadata": {},
Expand Down Expand Up @@ -407,7 +400,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "90dd4c9e",
"metadata": {},
Expand Down Expand Up @@ -454,7 +446,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "b910d3d5",
"metadata": {},
Expand All @@ -473,7 +464,7 @@
},
"outputs": [],
"source": [
"def get_query(source: ObjectStream) -> ObjectStream:\n",
"def get_query(source):\n",
" \"\"\"Query for event / column selection: >=4j >=1b, ==1 lep with pT>30 GeV + additional cuts,\n",
" return relevant columns\n",
" *NOTE* jet pT cut is set lower to account for systematic variations to jet pT\n",
Expand Down Expand Up @@ -554,7 +545,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d8f08fc1",
"metadata": {},
Expand All @@ -574,6 +564,12 @@
"outputs": [],
"source": [
"if USE_SERVICEX:\n",
" try:\n",
" from func_adl_servicex import ServiceXSourceUpROOT\n",
" except ImportError:\n",
" print(\"cannot import func_adl_servicex, which is a required dependency when using ServiceX\")\n",
" raise\n",
"\n",
" # dummy dataset on which to generate the query\n",
" dummy_ds = ServiceXSourceUpROOT(\"cernopendata://dummy\", \"Events\", backend_name=\"uproot\")\n",
"\n",
Expand All @@ -597,7 +593,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c28a9e49",
"metadata": {},
Expand Down Expand Up @@ -649,10 +644,10 @@
" executor = processor.FuturesExecutor(workers=utils.config[\"benchmarking\"][\"NUM_CORES\"])\n",
"\n",
"run = processor.Runner(\n",
" executor=executor, \n",
" schema=NanoAODSchema, \n",
" savemetrics=True, \n",
" metadata_cache={}, \n",
" executor=executor,\n",
" schema=NanoAODSchema,\n",
" savemetrics=True,\n",
" metadata_cache={},\n",
" chunksize=utils.config[\"benchmarking\"][\"CHUNKSIZE\"])\n",
"\n",
"if USE_SERVICEX:\n",
Expand All @@ -670,8 +665,8 @@
"t0 = time.monotonic()\n",
"# processing\n",
"all_histograms, metrics = run(\n",
" fileset, \n",
" treename, \n",
" fileset,\n",
" treename,\n",
" processor_instance=TtbarAnalysis(USE_INFERENCE, USE_TRITON)\n",
")\n",
"exec_time = time.monotonic() - t0\n",
Expand Down Expand Up @@ -701,7 +696,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d7bb4428",
"metadata": {},
Expand Down Expand Up @@ -732,6 +726,8 @@
}
],
"source": [
"import utils.plotting # noqa: E402\n",
"\n",
"utils.plotting.set_style()\n",
"\n",
"all_histograms[\"hist_dict\"][\"4j1b\"][120j::hist.rebin(2), :, \"nominal\"].stack(\"process\")[::-1].plot(stack=True, histtype=\"fill\", linewidth=1, edgecolor=\"grey\")\n",
Expand Down Expand Up @@ -767,7 +763,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "bed3df8b",
"metadata": {},
Expand Down Expand Up @@ -884,7 +879,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7c334dd3",
"metadata": {},
Expand Down Expand Up @@ -917,7 +911,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "e904cd3c",
"metadata": {},
Expand All @@ -941,6 +934,8 @@
},
"outputs": [],
"source": [
"import utils.rebinning # noqa: E402\n",
"\n",
"cabinetry_config = cabinetry.configuration.load(\"cabinetry_config.yml\")\n",
"\n",
"# rebinning: lower edge 110 GeV, merge bins 2->1\n",
Expand All @@ -952,7 +947,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "f36dc601",
"metadata": {},
Expand Down Expand Up @@ -1000,7 +994,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c74e4361",
"metadata": {},
Expand Down Expand Up @@ -1038,7 +1031,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "bd480eec",
"metadata": {},
Expand Down Expand Up @@ -1069,7 +1061,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "3a293479",
"metadata": {},
Expand Down Expand Up @@ -1125,7 +1116,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "14dc4b23",
"metadata": {},
Expand Down Expand Up @@ -1173,7 +1163,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7f60c316",
"metadata": {},
Expand Down Expand Up @@ -1317,7 +1306,6 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "a2ce2d14",
"metadata": {},
Expand Down
28 changes: 18 additions & 10 deletions analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.14.7
# jupytext_version: 1.15.2
# kernelspec:
# display_name: Python 3 (ipykernel)
# language: python
Expand Down Expand Up @@ -52,8 +52,6 @@
from coffea.nanoevents import NanoAODSchema
from coffea.analysis_tools import PackedSelection
import copy
from func_adl import ObjectStream
from func_adl_servicex import ServiceXSourceUpROOT
import hist
import matplotlib.pyplot as plt
import numpy as np
Expand Down Expand Up @@ -388,7 +386,7 @@ def postprocess(self, accumulator):
# Define the func_adl query to be used for the purpose of extracting columns and filtering.

# %%
def get_query(source: ObjectStream) -> ObjectStream:
def get_query(source):
"""Query for event / column selection: >=4j >=1b, ==1 lep with pT>30 GeV + additional cuts,
return relevant columns
*NOTE* jet pT cut is set lower to account for systematic variations to jet pT
Expand Down Expand Up @@ -475,6 +473,12 @@ def get_query(source: ObjectStream) -> ObjectStream:

# %%
if USE_SERVICEX:
try:
from func_adl_servicex import ServiceXSourceUpROOT
except ImportError:
print("cannot import func_adl_servicex, which is a required dependency when using ServiceX")
raise

# dummy dataset on which to generate the query
dummy_ds = ServiceXSourceUpROOT("cernopendata://dummy", "Events", backend_name="uproot")

Expand Down Expand Up @@ -512,10 +516,10 @@ def get_query(source: ObjectStream) -> ObjectStream:
executor = processor.FuturesExecutor(workers=utils.config["benchmarking"]["NUM_CORES"])

run = processor.Runner(
executor=executor,
schema=NanoAODSchema,
savemetrics=True,
metadata_cache={},
executor=executor,
schema=NanoAODSchema,
savemetrics=True,
metadata_cache={},
chunksize=utils.config["benchmarking"]["CHUNKSIZE"])

if USE_SERVICEX:
Expand All @@ -533,8 +537,8 @@ def get_query(source: ObjectStream) -> ObjectStream:
t0 = time.monotonic()
# processing
all_histograms, metrics = run(
fileset,
treename,
fileset,
treename,
processor_instance=TtbarAnalysis(USE_INFERENCE, USE_TRITON)
)
exec_time = time.monotonic() - t0
Expand All @@ -552,6 +556,8 @@ def get_query(source: ObjectStream) -> ObjectStream:
# We built histograms in two phase space regions, for multiple physics processes and systematic variations.

# %%
import utils.plotting # noqa: E402

utils.plotting.set_style()

all_histograms["hist_dict"]["4j1b"][120j::hist.rebin(2), :, "nominal"].stack("process")[::-1].plot(stack=True, histtype="fill", linewidth=1, edgecolor="grey")
Expand Down Expand Up @@ -644,6 +650,8 @@ def get_query(source: ObjectStream) -> ObjectStream:
# We will use `cabinetry` to combine all histograms into a `pyhf` workspace and fit the resulting statistical model to the pseudodata we built.

# %%
import utils.rebinning # noqa: E402

cabinetry_config = cabinetry.configuration.load("cabinetry_config.yml")

# rebinning: lower edge 110 GeV, merge bins 2->1
Expand Down
4 changes: 3 additions & 1 deletion analyses/cms-open-data-ttbar/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
from . import metrics as metrics
from . import ml as ml
from . import plotting as plotting
from . import rebinning as rebinning
from . import systematics as systematics


# to avoid issues: only import submodules if dependencies are present on worker nodes too
11 changes: 10 additions & 1 deletion analyses/cms-open-data-ttbar/utils/file_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@
import numpy as np
import os
from pathlib import Path
from servicex import ServiceXDataset
import tqdm
import urllib


try:
from servicex import ServiceXDataset
except ImportError:
# if servicex is not available, ServiceXDatasetGroup cannot be used
# this is fine for worker nodes: only needed where main notebook is executed
pass


# If local_data_cache is a writable path, this function will download any missing file into it and
# then return file paths corresponding to these local copies.
def construct_fileset(n_files_max_per_sample, use_xcache=False, af_name="", local_data_cache=None, input_from_eos=False, xcache_atlas_prefix=None):
Expand Down Expand Up @@ -112,6 +120,7 @@ def download_file(url, out_file):
with tqdm.tqdm(unit='B', unit_scale=True, unit_divisor=1024, miniters=1, desc=out_path.name) as t:
urllib.request.urlretrieve(url, out_path.absolute(), reporthook=tqdm_urlretrieve_hook(t))


class ServiceXDatasetGroup():
def __init__(self, fileset, backend_name="uproot", ignore_cache=False):
self.fileset = fileset
Expand Down

0 comments on commit 65db0d6

Please sign in to comment.