From a7ddf92c77031cd745fc9d92b43a23da2381fe11 Mon Sep 17 00:00:00 2001 From: Massimiliano Lincetto Date: Fri, 5 May 2023 12:02:20 +0200 Subject: [PATCH] Staging of (spline) data (#174) Add dynamic staging of spline data. --- .github/workflows/tests.yml | 20 ++++ skymap_scanner/config.py | 15 ++- skymap_scanner/recos/__init__.py | 17 +++ skymap_scanner/recos/dummy.py | 1 + skymap_scanner/recos/millipede_original.py | 30 ++++-- skymap_scanner/recos/millipede_wilks.py | 29 +++-- skymap_scanner/utils/data_handling.py | 117 +++++++++++++++++++++ tests/file_staging.py | 60 +++++++++++ 8 files changed, 269 insertions(+), 20 deletions(-) create mode 100644 skymap_scanner/utils/data_handling.py create mode 100644 tests/file_staging.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 14e4459bf..c98a31125 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -363,6 +363,26 @@ jobs: run: | docker logs rabbitmq + test-file-staging: + needs: test-build-docker + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: docker/setup-buildx-action@v2 + - uses: docker/build-push-action@v3 + with: + context: . + cache-from: type=gha + # cache-to: type=gha,mode=min + file: Dockerfile + tags: icecube/skymap_scanner:local + load: true + - name: run + run: | + docker run --rm -i \ + icecube/skymap_scanner:local \ + python tests/file_staging.py + test-run-single-pixel: needs: test-build-docker runs-on: ubuntu-latest diff --git a/skymap_scanner/config.py b/skymap_scanner/config.py index 6fd2a1626..b32716004 100644 --- a/skymap_scanner/config.py +++ b/skymap_scanner/config.py @@ -3,7 +3,7 @@ import dataclasses as dc import enum from pathlib import Path -from typing import Final +from typing import List, Final from wipac_dev_tools import from_environment_as_dataclass @@ -15,6 +15,19 @@ DEFAULT_GCD_DIR: Path = Path("/opt/i3-data/baseline_gcds") +# Local data sources. These are assumed to be filesystem paths and are expected to have the same directory structure. +LOCAL_DATA_SOURCES: Final[List[Path]] = [ + Path("/opt/i3-data"), + Path("/cvmfs/icecube.opensciencegrid.org/data"), +] +# Directory path under a local data source to fetch spline data from. +LOCAL_SPLINE_SUBDIR: Final[str] = "photon-tables/splines" + +# HTTP source to download data from. +REMOTE_DATA_SOURCE: Final[str] = "http://prod-exe.icecube.wisc.edu" +REMOTE_SPLINE_SUBDIR: Final[str] = "spline-tables" + +LOCAL_DATA_CACHE: Final[Path] = Path("./data-staging-cache") # physics strings INPUT_PULSES_NAME: Final = "SplitUncleanedInIcePulses" diff --git a/skymap_scanner/recos/__init__.py b/skymap_scanner/recos/__init__.py index e05050650..ca71c0b4f 100644 --- a/skymap_scanner/recos/__init__.py +++ b/skymap_scanner/recos/__init__.py @@ -27,6 +27,10 @@ def __init__(self, reco_algo: str): class RecoInterface: """An abstract class encapsulating reco-specific logic.""" + # List of spline file basenames required by the class. + # The spline files will be looked up in pre-defined local paths or fetched from a remote data store. + SPLINE_REQUIREMENTS: List[str] = list() + @staticmethod def traysegment(tray, name, logger, **kwargs: Any) -> None: raise NotImplementedError() @@ -48,10 +52,23 @@ def get_all_reco_algos() -> List[str]: def get_reco_interface_object(name: str) -> RecoInterface: """Dynamically import the reco sub-module's class.""" try: + # Fetch module module = importlib.import_module(f"{__name__}.{name.lower()}") + # Build the class name (i.e. reco_algo -> RecoAlgo). return getattr(module, "".join(x.capitalize() for x in name.split("_"))) except ModuleNotFoundError as e: if name not in get_all_reco_algos(): # checking this in 'except' allows us to use 'from e' raise UnsupportedRecoAlgoException(name) from e raise # something when wrong AFTER accessing sub-module + + +def get_reco_spline_requirements(name: str) -> List[str]: + try: + module = importlib.import_module(f"{__name__}.{name.lower()}") + return getattr(module, "spline_requirements") + except ModuleNotFoundError as e: + if name not in get_all_reco_algos(): + # checking this in 'except' allows us to use 'from e' + raise UnsupportedRecoAlgoException(name) from e + raise # something when wrong AFTER accessing sub-module diff --git a/skymap_scanner/recos/dummy.py b/skymap_scanner/recos/dummy.py index 9ba63c8f3..1b24aa9ea 100644 --- a/skymap_scanner/recos/dummy.py +++ b/skymap_scanner/recos/dummy.py @@ -4,6 +4,7 @@ import datetime import random import time +from typing import List from I3Tray import I3Units # type: ignore[import] from icecube import ( # type: ignore[import] # noqa: F401 diff --git a/skymap_scanner/recos/millipede_original.py b/skymap_scanner/recos/millipede_original.py index c83ff5d2d..7366395a9 100644 --- a/skymap_scanner/recos/millipede_original.py +++ b/skymap_scanner/recos/millipede_original.py @@ -10,10 +10,12 @@ from typing import Tuple import numpy + from I3Tray import I3Units from icecube import ( # noqa: F401 VHESelfVeto, dataclasses, + dataio, frame_object_diff, gulliver, gulliver_modules, @@ -27,34 +29,40 @@ from icecube.icetray import I3Frame from .. import config as cfg +from ..utils.data_handling import DataStager from ..utils.pixel_classes import RecoPixelVariation from . import RecoInterface - class MillipedeOriginal(RecoInterface): """Reco logic for millipede.""" - # Constants ######################################################## + # Spline requirements + MIE_ABS_SPLINE = "ems_mie_z20_a10.abs.fits" + MIE_PROB_SPLINE = "ems_mie_z20_a10.prob.fits" + + SPLINE_REQUIREMENTS = [ MIE_ABS_SPLINE, MIE_PROB_SPLINE ] + # Constants ######################################################## pulsesName = cfg.INPUT_PULSES_NAME pulsesName_cleaned = pulsesName+'LatePulseCleaned' SPEScale = 0.99 # Load Data ######################################################## - # At HESE energies, deposited light is dominated by the stochastic losses # (muon part emits so little light in comparison) # This is why we can use cascade tables - _splinedir = os.path.expandvars("$I3_DATA/photon-tables/splines") - _base = os.path.join(_splinedir, "ems_mie_z20_a10.%s.fits") - for fname in [_base % "abs", _base % "prob"]: - if not os.path.exists(fname): - raise FileNotFoundError(fname) - cascade_service = photonics_service.I3PhotoSplineService( - _base % "abs", _base % "prob", timingSigma=0.0 + datastager = DataStager( + local_paths=cfg.LOCAL_DATA_SOURCES, + local_subdir=cfg.LOCAL_SPLINE_SUBDIR, + remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}", ) + datastager.stage_files(SPLINE_REQUIREMENTS) + abs_spline: str = datastager.get_filepath(MIE_ABS_SPLINE) + prob_spline: str = datastager.get_filepath(MIE_PROB_SPLINE) + + cascade_service = photonics_service.I3PhotoSplineService(abs_spline, prob_spline, timingSigma=0.0) cascade_service.SetEfficiencies(SPEScale) muon_service = None - + def makeSurePulsesExist(frame, pulsesName) -> None: if pulsesName not in frame: raise RuntimeError("{0} not in frame".format(pulsesName)) diff --git a/skymap_scanner/recos/millipede_wilks.py b/skymap_scanner/recos/millipede_wilks.py index 7d39435c4..54207b6e0 100644 --- a/skymap_scanner/recos/millipede_wilks.py +++ b/skymap_scanner/recos/millipede_wilks.py @@ -27,12 +27,20 @@ from icecube.icetray import I3Frame from .. import config as cfg +from ..utils.data_handling import DataStager from ..utils.pixel_classes import RecoPixelVariation from . import RecoInterface + class MillipedeWilks(RecoInterface): """Reco logic for millipede.""" + # Spline requirements ############################################## + FTP_ABS_SPLINE = "cascade_single_spice_ftp-v1_flat_z20_a5.abs.fits" + FTP_PROB_SPLINE = "cascade_single_spice_ftp-v1_flat_z20_a5.prob.fits" + FTP_EFFD_SPLINE = "cascade_effectivedistance_spice_ftp-v1_z20.eff.fits" + + SPLINE_REQUIREMENTS = [FTP_ABS_SPLINE, FTP_PROB_SPLINE, FTP_EFFD_SPLINE] # Constants ######################################################## pulsesName_orig = cfg.INPUT_PULSES_NAME @@ -44,16 +52,21 @@ class MillipedeWilks(RecoInterface): # At HESE energies, deposited light is dominated by the stochastic losses # (muon part emits so little light in comparison) # This is why we can use cascade tables - _splinedir = os.path.expandvars("$I3_DATA/photon-tables/splines") - _base = os.path.join(_splinedir, "cascade_single_spice_ftp-v1_flat_z20_a5.%s.fits") - _effd = os.path.join(_splinedir, "cascade_effectivedistance_spice_ftp-v1_z20.eff.fits") - for fname in [_base % "abs", _base % "prob", _effd]: - if not os.path.exists(fname): - raise FileNotFoundError(fname) + datastager = DataStager( + local_paths=cfg.LOCAL_DATA_SOURCES, + local_subdir=cfg.LOCAL_SPLINE_SUBDIR, + remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}", + ) + + datastager.stage_files(SPLINE_REQUIREMENTS) + + abs_spline: str = datastager.get_filepath(FTP_ABS_SPLINE) + prob_spline: str = datastager.get_filepath(FTP_PROB_SPLINE) + effd_spline: str = datastager.get_filepath(FTP_EFFD_SPLINE) cascade_service = photonics_service.I3PhotoSplineService( - _base % "abs", _base % "prob", timingSigma=0.0, - effectivedistancetable = _effd, + abs_spline, prob_spline, timingSigma=0.0, + effectivedistancetable = effd_spline, tiltTableDir = os.path.expandvars('$I3_BUILD/ice-models/resources/models/ICEMODEL/spice_ftp-v1/'), quantileEpsilon=1 ) diff --git a/skymap_scanner/utils/data_handling.py b/skymap_scanner/utils/data_handling.py new file mode 100644 index 000000000..f44f0e972 --- /dev/null +++ b/skymap_scanner/utils/data_handling.py @@ -0,0 +1,117 @@ +from .. import config as cfg # type: ignore[import] +from pathlib import Path +import subprocess +from typing import Dict, List, Union + +from . import LOGGER + + +class DataStager: + """ + Class to manage the staging of (spline) data from different sources (in-container, mountpoint, CVMFS, http). + Some similarity in the paths is assumed. + """ + + def __init__(self, local_paths: List[Path], local_subdir: str, remote_path: str): + self.local_paths = local_paths + self.local_subdir = local_subdir + self.remote_path = remote_path + self.staging_path: Path = cfg.LOCAL_DATA_CACHE + self.staging_path.mkdir(exist_ok=True) + + def stage_files(self, file_list: List[str]): + """Checks local availability for filenames in a list, and retrieves the missing ones from the HTTP source. + + Args: + file_list (List[str]): list of file filenames to look up / retrieve. + """ + LOGGER.debug(f"Staging files in filelist: {file_list}") + for basename in file_list: + try: + filepath: str = self.get_local_filepath(basename) + except FileNotFoundError: + LOGGER.debug( + f"File {basename} is not available on default local paths." + ) + if (self.staging_path / basename).is_file(): + LOGGER.debug("File is available on staging path.") + else: + LOGGER.debug("Staging from HTTP source.") + self.stage_file(basename) + + else: + LOGGER.debug(f"File {basename} is available at {filepath}.") + + def stage_file(self, basename: str): + """Retrieves a file from the HTTP source. + + Args: + basename (str): the basename of the file. + + Raises: + RuntimeError: if the file retrieval fails. + """ + local_destination_path = self.staging_path / basename + http_source_path = f"{self.remote_path}/{basename}" + # not sure why we use the -O pattern here + cmd = [ + "wget", + "-nv", + "-t", + "5", + "-O", + str(local_destination_path), + http_source_path, + ] + + subprocess.run(cmd, check=True) + + if not local_destination_path.is_file(): + raise RuntimeError( + f"Subprocess `wget` succeeded but the resulting file is invalid:\n-> {cmd}" + ) + + def get_filepath(self, filename: str) -> str: + """Look up basename under the local paths and the staging path and returns the first valid filename. + + Args: + basename (str): file basename to look up. + + Returns: + str: valid filename. + """ + try: + local_filepath = self.get_local_filepath(filename) + return local_filepath + except FileNotFoundError: + filepath = self.staging_path / filename + if filepath.is_file(): + LOGGER.info("File {filename} available at {filepath}.") + return str(filepath) + else: + raise FileNotFoundError( + f"File {filename} is not available in any local or staging path." + ) + + def get_local_filepath(self, filename: str) -> str: + """Look up filename on local paths and return the first matching filename. + + Args: + filename (str): the filename of the file to look up. + + Returns: + str: the file path of the file if available + """ + LOGGER.debug(f"Look up file {filename}.") + for source in self.local_paths: + subdir = source / self.local_subdir + filepath = subdir / filename + LOGGER.debug(f"Trying to read {filepath}...") + if filepath.is_file(): + LOGGER.debug(f"-> success.") + filename = str(filepath) + return filename + else: + LOGGER.debug(f"-> fail.") + # File was not found in local paths. + raise FileNotFoundError(f"File {filename} is not available on any local path.") diff --git a/tests/file_staging.py b/tests/file_staging.py new file mode 100644 index 000000000..072ebd09a --- /dev/null +++ b/tests/file_staging.py @@ -0,0 +1,60 @@ +import logging +import subprocess +from typing import Dict + + +from skymap_scanner.utils.data_handling import DataStager +from skymap_scanner import config as cfg + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +# Build list of all local files. +local_file_list = [] +for path in cfg.LOCAL_DATA_SOURCES: + subpath = path / cfg.LOCAL_SPLINE_SUBDIR + directory_content = subpath.glob("*") + for path in directory_content: + if path.is_file(): # skip directories + local_file_list.append(path.name) # store filename without path + +# Declare at least one filename only expected to be available remotely. +remote_file_list = ["README"] + +# Declare at least one filename that does not exist. +invalid_file_list = ["NONEXISTENT_FILE"] + +datastager = DataStager( + local_paths=cfg.LOCAL_DATA_SOURCES, + local_subdir=cfg.LOCAL_SPLINE_SUBDIR, + remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}", +) + +for file_list in [local_file_list, remote_file_list, invalid_file_list]: + try: + datastager.stage_files(file_list) + except subprocess.CalledProcessError: + logger.debug(f"Staging failed as expected for invalid file.") + + +# ensure that filepaths can be retrieved for all local files +local_filepaths: Dict[str, str] = dict() +for filename in local_file_list: + logger.debug(f"Testing local file: {filename}.") + local_filepaths[filename] = datastager.get_local_filepath(filename) + assert local_filepaths[filename] == datastager.get_filepath(filename) + logger.debug(f"File available at {local_filepaths[filename]}.") + +for filename in remote_file_list: + logger.debug(f"Testing staging of remote file: {filename}") + filepath: str = datastager.get_filepath(filename) + logger.debug(f"File available at {filepath}.") + + +for filename in invalid_file_list: + logger.debug(f"Testing staging of remote file: {filename}") + try: + filepath = datastager.get_filepath(filename) + except FileNotFoundError: + logger.debug(f"File not available as expected.")