From 65db0d69db962b0505e3bb97bb26ca3eb40c3e15 Mon Sep 17 00:00:00 2001 From: Alexander Held <45009355+alexander-held@users.noreply.github.com> Date: Fri, 17 Nov 2023 11:27:12 +0100 Subject: [PATCH] feat: improve handling of optional dependencies (#213) * better handling of optional dependencies: ServiceX and cabinetry on worker nodes * fix indent * fix CI --- .../ttbar_analysis_pipeline.ipynb | 46 +++++++------------ .../ttbar_analysis_pipeline.py | 28 +++++++---- .../cms-open-data-ttbar/utils/__init__.py | 4 +- .../cms-open-data-ttbar/utils/file_input.py | 11 ++++- 4 files changed, 48 insertions(+), 41 deletions(-) diff --git a/analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.ipynb b/analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.ipynb index 7a9d95e6..b8e9a859 100644 --- a/analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.ipynb +++ b/analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.ipynb @@ -1,7 +1,6 @@ { "cells": [ { - "attachments": {}, "cell_type": "markdown", "id": "0dc37683", "metadata": {}, @@ -25,7 +24,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "404927fe", "metadata": {}, @@ -37,7 +35,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "bd72d323", "metadata": {}, @@ -65,8 +62,6 @@ "from coffea.nanoevents import NanoAODSchema\n", "from coffea.analysis_tools import PackedSelection\n", "import copy\n", - "from func_adl import ObjectStream\n", - "from func_adl_servicex import ServiceXSourceUpROOT\n", "import hist\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", @@ -78,7 +73,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "cd573bb1", "metadata": {}, @@ -135,7 +129,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "141d6520", "metadata": {}, @@ -407,7 +400,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "90dd4c9e", "metadata": {}, @@ -454,7 +446,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "b910d3d5", "metadata": {}, @@ -473,7 +464,7 @@ }, "outputs": [], "source": [ - "def get_query(source: ObjectStream) -> ObjectStream:\n", + "def get_query(source):\n", " \"\"\"Query for event / column selection: >=4j >=1b, ==1 lep with pT>30 GeV + additional cuts,\n", " return relevant columns\n", " *NOTE* jet pT cut is set lower to account for systematic variations to jet pT\n", @@ -554,7 +545,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "d8f08fc1", "metadata": {}, @@ -574,6 +564,12 @@ "outputs": [], "source": [ "if USE_SERVICEX:\n", + " try:\n", + " from func_adl_servicex import ServiceXSourceUpROOT\n", + " except ImportError:\n", + " print(\"cannot import func_adl_servicex, which is a required dependency when using ServiceX\")\n", + " raise\n", + "\n", " # dummy dataset on which to generate the query\n", " dummy_ds = ServiceXSourceUpROOT(\"cernopendata://dummy\", \"Events\", backend_name=\"uproot\")\n", "\n", @@ -597,7 +593,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "c28a9e49", "metadata": {}, @@ -649,10 +644,10 @@ " executor = processor.FuturesExecutor(workers=utils.config[\"benchmarking\"][\"NUM_CORES\"])\n", "\n", "run = processor.Runner(\n", - " executor=executor, \n", - " schema=NanoAODSchema, \n", - " savemetrics=True, \n", - " metadata_cache={}, \n", + " executor=executor,\n", + " schema=NanoAODSchema,\n", + " savemetrics=True,\n", + " metadata_cache={},\n", " chunksize=utils.config[\"benchmarking\"][\"CHUNKSIZE\"])\n", "\n", "if USE_SERVICEX:\n", @@ -670,8 +665,8 @@ "t0 = time.monotonic()\n", "# processing\n", "all_histograms, metrics = run(\n", - " fileset, \n", - " treename, \n", + " fileset,\n", + " treename,\n", " processor_instance=TtbarAnalysis(USE_INFERENCE, USE_TRITON)\n", ")\n", "exec_time = time.monotonic() - t0\n", @@ -701,7 +696,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "d7bb4428", "metadata": {}, @@ -732,6 +726,8 @@ } ], "source": [ + "import utils.plotting # noqa: E402\n", + "\n", "utils.plotting.set_style()\n", "\n", "all_histograms[\"hist_dict\"][\"4j1b\"][120j::hist.rebin(2), :, \"nominal\"].stack(\"process\")[::-1].plot(stack=True, histtype=\"fill\", linewidth=1, edgecolor=\"grey\")\n", @@ -767,7 +763,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "bed3df8b", "metadata": {}, @@ -884,7 +879,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "7c334dd3", "metadata": {}, @@ -917,7 +911,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "e904cd3c", "metadata": {}, @@ -941,6 +934,8 @@ }, "outputs": [], "source": [ + "import utils.rebinning # noqa: E402\n", + "\n", "cabinetry_config = cabinetry.configuration.load(\"cabinetry_config.yml\")\n", "\n", "# rebinning: lower edge 110 GeV, merge bins 2->1\n", @@ -952,7 +947,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "f36dc601", "metadata": {}, @@ -1000,7 +994,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "c74e4361", "metadata": {}, @@ -1038,7 +1031,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "bd480eec", "metadata": {}, @@ -1069,7 +1061,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "3a293479", "metadata": {}, @@ -1125,7 +1116,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "14dc4b23", "metadata": {}, @@ -1173,7 +1163,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "7f60c316", "metadata": {}, @@ -1317,7 +1306,6 @@ ] }, { - "attachments": {}, "cell_type": "markdown", "id": "a2ce2d14", "metadata": {}, diff --git a/analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.py b/analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.py index 21acf4f5..bb2acaa0 100644 --- a/analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.py +++ b/analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.py @@ -6,7 +6,7 @@ # extension: .py # format_name: percent # format_version: '1.3' -# jupytext_version: 1.14.7 +# jupytext_version: 1.15.2 # kernelspec: # display_name: Python 3 (ipykernel) # language: python @@ -52,8 +52,6 @@ from coffea.nanoevents import NanoAODSchema from coffea.analysis_tools import PackedSelection import copy -from func_adl import ObjectStream -from func_adl_servicex import ServiceXSourceUpROOT import hist import matplotlib.pyplot as plt import numpy as np @@ -388,7 +386,7 @@ def postprocess(self, accumulator): # Define the func_adl query to be used for the purpose of extracting columns and filtering. # %% -def get_query(source: ObjectStream) -> ObjectStream: +def get_query(source): """Query for event / column selection: >=4j >=1b, ==1 lep with pT>30 GeV + additional cuts, return relevant columns *NOTE* jet pT cut is set lower to account for systematic variations to jet pT @@ -475,6 +473,12 @@ def get_query(source: ObjectStream) -> ObjectStream: # %% if USE_SERVICEX: + try: + from func_adl_servicex import ServiceXSourceUpROOT + except ImportError: + print("cannot import func_adl_servicex, which is a required dependency when using ServiceX") + raise + # dummy dataset on which to generate the query dummy_ds = ServiceXSourceUpROOT("cernopendata://dummy", "Events", backend_name="uproot") @@ -512,10 +516,10 @@ def get_query(source: ObjectStream) -> ObjectStream: executor = processor.FuturesExecutor(workers=utils.config["benchmarking"]["NUM_CORES"]) run = processor.Runner( - executor=executor, - schema=NanoAODSchema, - savemetrics=True, - metadata_cache={}, + executor=executor, + schema=NanoAODSchema, + savemetrics=True, + metadata_cache={}, chunksize=utils.config["benchmarking"]["CHUNKSIZE"]) if USE_SERVICEX: @@ -533,8 +537,8 @@ def get_query(source: ObjectStream) -> ObjectStream: t0 = time.monotonic() # processing all_histograms, metrics = run( - fileset, - treename, + fileset, + treename, processor_instance=TtbarAnalysis(USE_INFERENCE, USE_TRITON) ) exec_time = time.monotonic() - t0 @@ -552,6 +556,8 @@ def get_query(source: ObjectStream) -> ObjectStream: # We built histograms in two phase space regions, for multiple physics processes and systematic variations. # %% +import utils.plotting # noqa: E402 + utils.plotting.set_style() all_histograms["hist_dict"]["4j1b"][120j::hist.rebin(2), :, "nominal"].stack("process")[::-1].plot(stack=True, histtype="fill", linewidth=1, edgecolor="grey") @@ -644,6 +650,8 @@ def get_query(source: ObjectStream) -> ObjectStream: # We will use `cabinetry` to combine all histograms into a `pyhf` workspace and fit the resulting statistical model to the pseudodata we built. # %% +import utils.rebinning # noqa: E402 + cabinetry_config = cabinetry.configuration.load("cabinetry_config.yml") # rebinning: lower edge 110 GeV, merge bins 2->1 diff --git a/analyses/cms-open-data-ttbar/utils/__init__.py b/analyses/cms-open-data-ttbar/utils/__init__.py index 047e28aa..4a2139ee 100644 --- a/analyses/cms-open-data-ttbar/utils/__init__.py +++ b/analyses/cms-open-data-ttbar/utils/__init__.py @@ -6,5 +6,7 @@ from . import metrics as metrics from . import ml as ml from . import plotting as plotting -from . import rebinning as rebinning from . import systematics as systematics + + +# to avoid issues: only import submodules if dependencies are present on worker nodes too diff --git a/analyses/cms-open-data-ttbar/utils/file_input.py b/analyses/cms-open-data-ttbar/utils/file_input.py index a8000bc1..02c379a1 100644 --- a/analyses/cms-open-data-ttbar/utils/file_input.py +++ b/analyses/cms-open-data-ttbar/utils/file_input.py @@ -2,10 +2,18 @@ import numpy as np import os from pathlib import Path -from servicex import ServiceXDataset import tqdm import urllib + +try: + from servicex import ServiceXDataset +except ImportError: + # if servicex is not available, ServiceXDatasetGroup cannot be used + # this is fine for worker nodes: only needed where main notebook is executed + pass + + # If local_data_cache is a writable path, this function will download any missing file into it and # then return file paths corresponding to these local copies. def construct_fileset(n_files_max_per_sample, use_xcache=False, af_name="", local_data_cache=None, input_from_eos=False, xcache_atlas_prefix=None): @@ -112,6 +120,7 @@ def download_file(url, out_file): with tqdm.tqdm(unit='B', unit_scale=True, unit_divisor=1024, miniters=1, desc=out_path.name) as t: urllib.request.urlretrieve(url, out_path.absolute(), reporthook=tqdm_urlretrieve_hook(t)) + class ServiceXDatasetGroup(): def __init__(self, fileset, backend_name="uproot", ignore_cache=False): self.fileset = fileset