Skip to content

Commit

Permalink
Staging of (spline) data (#174)
Browse files Browse the repository at this point in the history
Add dynamic staging of spline data.
  • Loading branch information
mlincett authored May 5, 2023
1 parent 5a31787 commit a7ddf92
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 20 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,26 @@ jobs:
run: |
docker logs rabbitmq
test-file-staging:
needs: test-build-docker
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: docker/setup-buildx-action@v2
- uses: docker/build-push-action@v3
with:
context: .
cache-from: type=gha
# cache-to: type=gha,mode=min
file: Dockerfile
tags: icecube/skymap_scanner:local
load: true
- name: run
run: |
docker run --rm -i \
icecube/skymap_scanner:local \
python tests/file_staging.py
test-run-single-pixel:
needs: test-build-docker
runs-on: ubuntu-latest
Expand Down
15 changes: 14 additions & 1 deletion skymap_scanner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dataclasses as dc
import enum
from pathlib import Path
from typing import Final
from typing import List, Final

from wipac_dev_tools import from_environment_as_dataclass

Expand All @@ -15,6 +15,19 @@

DEFAULT_GCD_DIR: Path = Path("/opt/i3-data/baseline_gcds")

# Local data sources. These are assumed to be filesystem paths and are expected to have the same directory structure.
LOCAL_DATA_SOURCES: Final[List[Path]] = [
Path("/opt/i3-data"),
Path("/cvmfs/icecube.opensciencegrid.org/data"),
]
# Directory path under a local data source to fetch spline data from.
LOCAL_SPLINE_SUBDIR: Final[str] = "photon-tables/splines"

# HTTP source to download data from.
REMOTE_DATA_SOURCE: Final[str] = "http://prod-exe.icecube.wisc.edu"
REMOTE_SPLINE_SUBDIR: Final[str] = "spline-tables"

LOCAL_DATA_CACHE: Final[Path] = Path("./data-staging-cache")

# physics strings
INPUT_PULSES_NAME: Final = "SplitUncleanedInIcePulses"
Expand Down
17 changes: 17 additions & 0 deletions skymap_scanner/recos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def __init__(self, reco_algo: str):
class RecoInterface:
"""An abstract class encapsulating reco-specific logic."""

# List of spline file basenames required by the class.
# The spline files will be looked up in pre-defined local paths or fetched from a remote data store.
SPLINE_REQUIREMENTS: List[str] = list()

@staticmethod
def traysegment(tray, name, logger, **kwargs: Any) -> None:
raise NotImplementedError()
Expand All @@ -48,10 +52,23 @@ def get_all_reco_algos() -> List[str]:
def get_reco_interface_object(name: str) -> RecoInterface:
"""Dynamically import the reco sub-module's class."""
try:
# Fetch module
module = importlib.import_module(f"{__name__}.{name.lower()}")
# Build the class name (i.e. reco_algo -> RecoAlgo).
return getattr(module, "".join(x.capitalize() for x in name.split("_")))
except ModuleNotFoundError as e:
if name not in get_all_reco_algos():
# checking this in 'except' allows us to use 'from e'
raise UnsupportedRecoAlgoException(name) from e
raise # something when wrong AFTER accessing sub-module


def get_reco_spline_requirements(name: str) -> List[str]:
try:
module = importlib.import_module(f"{__name__}.{name.lower()}")
return getattr(module, "spline_requirements")
except ModuleNotFoundError as e:
if name not in get_all_reco_algos():
# checking this in 'except' allows us to use 'from e'
raise UnsupportedRecoAlgoException(name) from e
raise # something when wrong AFTER accessing sub-module
1 change: 1 addition & 0 deletions skymap_scanner/recos/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datetime
import random
import time
from typing import List

from I3Tray import I3Units # type: ignore[import]
from icecube import ( # type: ignore[import] # noqa: F401
Expand Down
30 changes: 19 additions & 11 deletions skymap_scanner/recos/millipede_original.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from typing import Tuple

import numpy

from I3Tray import I3Units
from icecube import ( # noqa: F401
VHESelfVeto,
dataclasses,
dataio,
frame_object_diff,
gulliver,
gulliver_modules,
Expand All @@ -27,34 +29,40 @@
from icecube.icetray import I3Frame

from .. import config as cfg
from ..utils.data_handling import DataStager
from ..utils.pixel_classes import RecoPixelVariation
from . import RecoInterface


class MillipedeOriginal(RecoInterface):
"""Reco logic for millipede."""
# Constants ########################################################
# Spline requirements
MIE_ABS_SPLINE = "ems_mie_z20_a10.abs.fits"
MIE_PROB_SPLINE = "ems_mie_z20_a10.prob.fits"

SPLINE_REQUIREMENTS = [ MIE_ABS_SPLINE, MIE_PROB_SPLINE ]

# Constants ########################################################
pulsesName = cfg.INPUT_PULSES_NAME
pulsesName_cleaned = pulsesName+'LatePulseCleaned'
SPEScale = 0.99

# Load Data ########################################################

# At HESE energies, deposited light is dominated by the stochastic losses
# (muon part emits so little light in comparison)
# This is why we can use cascade tables
_splinedir = os.path.expandvars("$I3_DATA/photon-tables/splines")
_base = os.path.join(_splinedir, "ems_mie_z20_a10.%s.fits")
for fname in [_base % "abs", _base % "prob"]:
if not os.path.exists(fname):
raise FileNotFoundError(fname)
cascade_service = photonics_service.I3PhotoSplineService(
_base % "abs", _base % "prob", timingSigma=0.0
datastager = DataStager(
local_paths=cfg.LOCAL_DATA_SOURCES,
local_subdir=cfg.LOCAL_SPLINE_SUBDIR,
remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}",
)
datastager.stage_files(SPLINE_REQUIREMENTS)
abs_spline: str = datastager.get_filepath(MIE_ABS_SPLINE)
prob_spline: str = datastager.get_filepath(MIE_PROB_SPLINE)

cascade_service = photonics_service.I3PhotoSplineService(abs_spline, prob_spline, timingSigma=0.0)
cascade_service.SetEfficiencies(SPEScale)
muon_service = None

def makeSurePulsesExist(frame, pulsesName) -> None:
if pulsesName not in frame:
raise RuntimeError("{0} not in frame".format(pulsesName))
Expand Down
29 changes: 21 additions & 8 deletions skymap_scanner/recos/millipede_wilks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@
from icecube.icetray import I3Frame

from .. import config as cfg
from ..utils.data_handling import DataStager
from ..utils.pixel_classes import RecoPixelVariation
from . import RecoInterface



class MillipedeWilks(RecoInterface):
"""Reco logic for millipede."""
# Spline requirements ##############################################
FTP_ABS_SPLINE = "cascade_single_spice_ftp-v1_flat_z20_a5.abs.fits"
FTP_PROB_SPLINE = "cascade_single_spice_ftp-v1_flat_z20_a5.prob.fits"
FTP_EFFD_SPLINE = "cascade_effectivedistance_spice_ftp-v1_z20.eff.fits"

SPLINE_REQUIREMENTS = [FTP_ABS_SPLINE, FTP_PROB_SPLINE, FTP_EFFD_SPLINE]
# Constants ########################################################

pulsesName_orig = cfg.INPUT_PULSES_NAME
Expand All @@ -44,16 +52,21 @@ class MillipedeWilks(RecoInterface):
# At HESE energies, deposited light is dominated by the stochastic losses
# (muon part emits so little light in comparison)
# This is why we can use cascade tables
_splinedir = os.path.expandvars("$I3_DATA/photon-tables/splines")
_base = os.path.join(_splinedir, "cascade_single_spice_ftp-v1_flat_z20_a5.%s.fits")
_effd = os.path.join(_splinedir, "cascade_effectivedistance_spice_ftp-v1_z20.eff.fits")
for fname in [_base % "abs", _base % "prob", _effd]:
if not os.path.exists(fname):
raise FileNotFoundError(fname)
datastager = DataStager(
local_paths=cfg.LOCAL_DATA_SOURCES,
local_subdir=cfg.LOCAL_SPLINE_SUBDIR,
remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}",
)

datastager.stage_files(SPLINE_REQUIREMENTS)

abs_spline: str = datastager.get_filepath(FTP_ABS_SPLINE)
prob_spline: str = datastager.get_filepath(FTP_PROB_SPLINE)
effd_spline: str = datastager.get_filepath(FTP_EFFD_SPLINE)

cascade_service = photonics_service.I3PhotoSplineService(
_base % "abs", _base % "prob", timingSigma=0.0,
effectivedistancetable = _effd,
abs_spline, prob_spline, timingSigma=0.0,
effectivedistancetable = effd_spline,
tiltTableDir = os.path.expandvars('$I3_BUILD/ice-models/resources/models/ICEMODEL/spice_ftp-v1/'),
quantileEpsilon=1
)
Expand Down
117 changes: 117 additions & 0 deletions skymap_scanner/utils/data_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from .. import config as cfg # type: ignore[import]
from pathlib import Path
import subprocess
from typing import Dict, List, Union

from . import LOGGER


class DataStager:
"""
Class to manage the staging of (spline) data from different sources (in-container, mountpoint, CVMFS, http).
Some similarity in the paths is assumed.
"""

def __init__(self, local_paths: List[Path], local_subdir: str, remote_path: str):
self.local_paths = local_paths
self.local_subdir = local_subdir
self.remote_path = remote_path
self.staging_path: Path = cfg.LOCAL_DATA_CACHE
self.staging_path.mkdir(exist_ok=True)

def stage_files(self, file_list: List[str]):
"""Checks local availability for filenames in a list, and retrieves the missing ones from the HTTP source.
Args:
file_list (List[str]): list of file filenames to look up / retrieve.
"""
LOGGER.debug(f"Staging files in filelist: {file_list}")
for basename in file_list:
try:
filepath: str = self.get_local_filepath(basename)
except FileNotFoundError:
LOGGER.debug(
f"File {basename} is not available on default local paths."
)
if (self.staging_path / basename).is_file():
LOGGER.debug("File is available on staging path.")
else:
LOGGER.debug("Staging from HTTP source.")
self.stage_file(basename)

else:
LOGGER.debug(f"File {basename} is available at {filepath}.")

def stage_file(self, basename: str):
"""Retrieves a file from the HTTP source.
Args:
basename (str): the basename of the file.
Raises:
RuntimeError: if the file retrieval fails.
"""
local_destination_path = self.staging_path / basename
http_source_path = f"{self.remote_path}/{basename}"
# not sure why we use the -O pattern here
cmd = [
"wget",
"-nv",
"-t",
"5",
"-O",
str(local_destination_path),
http_source_path,
]

subprocess.run(cmd, check=True)

if not local_destination_path.is_file():
raise RuntimeError(
f"Subprocess `wget` succeeded but the resulting file is invalid:\n-> {cmd}"
)

def get_filepath(self, filename: str) -> str:
"""Look up basename under the local paths and the staging path and returns the first valid filename.
Args:
basename (str): file basename to look up.
Returns:
str: valid filename.
"""
try:
local_filepath = self.get_local_filepath(filename)
return local_filepath
except FileNotFoundError:
filepath = self.staging_path / filename
if filepath.is_file():
LOGGER.info("File {filename} available at {filepath}.")
return str(filepath)
else:
raise FileNotFoundError(
f"File {filename} is not available in any local or staging path."
)

def get_local_filepath(self, filename: str) -> str:
"""Look up filename on local paths and return the first matching filename.
Args:
filename (str): the filename of the file to look up.
Returns:
str: the file path of the file if available
"""
LOGGER.debug(f"Look up file {filename}.")
for source in self.local_paths:
subdir = source / self.local_subdir
filepath = subdir / filename
LOGGER.debug(f"Trying to read {filepath}...")
if filepath.is_file():
LOGGER.debug(f"-> success.")
filename = str(filepath)
return filename
else:
LOGGER.debug(f"-> fail.")
# File was not found in local paths.
raise FileNotFoundError(f"File {filename} is not available on any local path.")
60 changes: 60 additions & 0 deletions tests/file_staging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
import subprocess
from typing import Dict


from skymap_scanner.utils.data_handling import DataStager
from skymap_scanner import config as cfg

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


# Build list of all local files.
local_file_list = []
for path in cfg.LOCAL_DATA_SOURCES:
subpath = path / cfg.LOCAL_SPLINE_SUBDIR
directory_content = subpath.glob("*")
for path in directory_content:
if path.is_file(): # skip directories
local_file_list.append(path.name) # store filename without path

# Declare at least one filename only expected to be available remotely.
remote_file_list = ["README"]

# Declare at least one filename that does not exist.
invalid_file_list = ["NONEXISTENT_FILE"]

datastager = DataStager(
local_paths=cfg.LOCAL_DATA_SOURCES,
local_subdir=cfg.LOCAL_SPLINE_SUBDIR,
remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}",
)

for file_list in [local_file_list, remote_file_list, invalid_file_list]:
try:
datastager.stage_files(file_list)
except subprocess.CalledProcessError:
logger.debug(f"Staging failed as expected for invalid file.")


# ensure that filepaths can be retrieved for all local files
local_filepaths: Dict[str, str] = dict()
for filename in local_file_list:
logger.debug(f"Testing local file: {filename}.")
local_filepaths[filename] = datastager.get_local_filepath(filename)
assert local_filepaths[filename] == datastager.get_filepath(filename)
logger.debug(f"File available at {local_filepaths[filename]}.")

for filename in remote_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
filepath: str = datastager.get_filepath(filename)
logger.debug(f"File available at {filepath}.")


for filename in invalid_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
try:
filepath = datastager.get_filepath(filename)
except FileNotFoundError:
logger.debug(f"File not available as expected.")

0 comments on commit a7ddf92

Please sign in to comment.