diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 13045afc1..9d8cded08 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,10 +9,10 @@ on: - '**' -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - # don't cancel on main/master/default - cancel-in-progress: ${{ format('refs/heads/{0}', github.event.repository.default_branch) != github.ref }} +# concurrency: +# group: ${{ github.workflow }}-${{ github.ref }} +# # don't cancel on main/master/default +# cancel-in-progress: ${{ format('refs/heads/{0}', github.event.repository.default_branch) != github.ref }} env: @@ -23,7 +23,7 @@ env: SKYSCAN_CACHE_DIR: /home/runner/work/skymap_scanner/skymap_scanner/cache SKYSCAN_OUTPUT_DIR: /home/runner/work/skymap_scanner/skymap_scanner/output SKYSCAN_DEBUG_DIR: /home/runner/work/skymap_scanner/skymap_scanner/debug - EWMS_PILOT_DUMP_SUBPROC_OUTPUT: False # get logs in "reco-icetray logs" step instead + EWMS_PILOT_DUMP_SUBPROC_OUTPUT: False # get logs in "client/pilot subprocesses stderr & stdout" step instead # see source tests/env-vars.sh @@ -78,7 +78,7 @@ jobs: uses: actions/checkout@v3 with: token: ${{ secrets.PERSONAL_ACCESS_TOKEN }} - - uses: WIPACrepo/wipac-dev-py-setup-action@v2.5 + - uses: WIPACrepo/wipac-dev-py-setup-action@v2.6 if: github.actor != ${{ env.BOT_NAME }} # no auto-updates for bots # should match all 'git_committer_name' uses with: git_committer_name: ${{ env.BOT_NAME }} @@ -188,7 +188,7 @@ jobs: ./resources/launch_scripts/wait_for_file.sh ./startup.json $CLIENT_STARTER_WAIT_FOR_STARTUP_JSON # Launch Clients - nclients=$(( $_CLIENTS_PER_CPU * $(nproc) )) + nclients=$(( $_NCLIENTS * $(nproc) )) echo "Launching $nclients clients" mkdir $SKYSCAN_DEBUG_DIR export EWMS_PILOT_TASK_TIMEOUT=1800 # 30 mins @@ -231,8 +231,11 @@ jobs: 1.0, 0.65, ] + concurrency: [ + 'multitask', + "multiclient", + ] env: - _CLIENTS_PER_CPU: 1 # there isn't any improvement when >1 SKYSCAN_BROKER_ADDRESS: user1@localhost/test SKYSCAN_BROKER_AUTH: password # using this would override password in address services: @@ -272,10 +275,24 @@ jobs: export _EVENTS_FILE=$(realpath $REALTIME_EVENTS_DIR/hese_event_01.json) export _NSIDES="${{ matrix.nsides }}" export _PREDICTIVE_SCANNING_THRESHOLD=${{ matrix.predictive_scanning_threshold }} + export _NCLIENTS=$(nproc) + + if [[ ${{ matrix.concurrency }} == "multiclient" ]]; then + # if not using concurrent tasks, then use multiple clients + export _NCLIENTS=$(nproc) + export EWMS_PILOT_CONCURRENT_TASKS=0 + else + export _NCLIENTS=1 + export EWMS_PILOT_CONCURRENT_TASKS=$(nproc) + fi + + free # dump memory stats cd ./resources/launch_scripts ./local-scan.sh + free # dump memory stats + - name: check no nsides skipped run: | ls $SKYSCAN_OUTPUT_DIR @@ -291,7 +308,7 @@ jobs: assert len(pydict) == nsides.count(":") ' - - name: reco-icetray logs + - name: client/pilot subprocesses stderr & stdout if: always() run: | sudo apt install tree @@ -319,13 +336,15 @@ jobs: run00136766-evt000007637140-GOLD.pkl, run00136662-evt000035405932-BRONZE.pkl ] + concurrency: [ + 'multitask', + "multiclient", + ] exclude: # splinempe should not run on HESE - reco_algo: splinempe eventfile: hese_event_01.json - env: - _CLIENTS_PER_CPU: 1 # there isn't any improvement when >1 SKYSCAN_BROKER_ADDRESS: user1@localhost/test SKYSCAN_BROKER_AUTH: password # using this would override password in address services: @@ -372,9 +391,24 @@ jobs: export _EVENTS_FILE=$(realpath $REALTIME_EVENTS_DIR/${{ matrix.eventfile }}) export _NSIDES="1:0" + if [[ ${{ matrix.concurrency }} == "multiclient" ]]; then + # if not using concurrent tasks, then use multiple clients + export _NCLIENTS=$(nproc) + export EWMS_PILOT_CONCURRENT_TASKS=0 + else + export _NCLIENTS=1 + export EWMS_PILOT_CONCURRENT_TASKS=$(nproc) + fi + + ls /dev/shm + free # dump memory stats + cd ./resources/launch_scripts ./local-scan.sh + free # dump memory stats + ls /dev/shm + - name: test output against known result (.json) run: | ls $SKYSCAN_OUTPUT_DIR @@ -389,7 +423,7 @@ jobs: --assert \ || (cat $(ls *.diff.json) && false) - - name: reco-icetray logs + - name: client/pilot subprocesses stderr & stdout if: always() run: | sudo apt install tree @@ -488,6 +522,7 @@ jobs: icecube/skymap_scanner:local \ python -m skymap_scanner.client.reco_icetray \ --in-pkl /local/pkls/in.pkl \ + --reco-algo ${{ matrix.reco_algo }} \ --gcdqp-packet-json /local/pkls/GCDQp_packet.json \ --baseline-gcd-file $(jq -r '.baseline_GCD_file' tests/data/reco_pixel_pkls/${{ matrix.reco_algo }}/${{ matrix.dir }}/startup.json) \ --out-pkl /local/pkls/out.pkl diff --git a/dependencies-from-Dockerfile.log b/dependencies-from-Dockerfile.log index 4acffdff9..c4e8a88d7 100644 --- a/dependencies-from-Dockerfile.log +++ b/dependencies-from-Dockerfile.log @@ -26,13 +26,13 @@ decorator==4.4.2 defusedxml==0.7.1 docutils==0.20.1 et-xmlfile==1.0.1 -ewms-pilot==0.14.0 +ewms-pilot==0.15.1 fonttools==4.29.1 fs==2.4.12 gast==0.5.2 healpy==1.15.0 htchirp==2.0 -htcondor==10.9.0 +htcondor==23.0.0 html5lib==1.1 humanfriendly==10.0 hypothesis==6.36.0 @@ -126,7 +126,7 @@ typing_extensions==4.8.0 ufoLib2==0.13.1 unicodedata2==14.0.0 UNKNOWN @ file:///root/nuflux-2.0.4 -urllib3==2.0.5 +urllib3==2.0.6 urwid==2.1.2 wcwidth==0.2.5 webencodings==0.5.1 diff --git a/dependencies-from-Dockerfile_pulsar.log b/dependencies-from-Dockerfile_pulsar.log index bd3a86e5f..637b1a20f 100644 --- a/dependencies-from-Dockerfile_pulsar.log +++ b/dependencies-from-Dockerfile_pulsar.log @@ -26,7 +26,7 @@ decorator==4.4.2 defusedxml==0.7.1 docutils==0.20.1 et-xmlfile==1.0.1 -ewms-pilot==0.14.0 +ewms-pilot==0.15.1 fonttools==4.29.1 fs==2.4.12 gast==0.5.2 @@ -126,7 +126,7 @@ typing_extensions==4.8.0 ufoLib2==0.13.1 unicodedata2==14.0.0 UNKNOWN @ file:///root/nuflux-2.0.4 -urllib3==2.0.5 +urllib3==2.0.6 urwid==2.1.2 wcwidth==0.2.5 webencodings==0.5.1 diff --git a/resources/launch_scripts/docker/launch_client.sh b/resources/launch_scripts/docker/launch_client.sh index 0908e56b6..6160d557d 100755 --- a/resources/launch_scripts/docker/launch_client.sh +++ b/resources/launch_scripts/docker/launch_client.sh @@ -70,7 +70,8 @@ fi # Run docker run --network="host" $pull_policy --rm -i \ - --shm-size=6gb \ + --ipc="host" \ + --shm-size=32gb \ $DOCKERMOUNT_ARGS \ --env PY_COLORS=1 \ $(env | grep '^SKYSCAN_' | awk '$0="--env "$0') \ diff --git a/resources/launch_scripts/local-scan.sh b/resources/launch_scripts/local-scan.sh index e064d5121..853725414 100755 --- a/resources/launch_scripts/local-scan.sh +++ b/resources/launch_scripts/local-scan.sh @@ -48,11 +48,13 @@ fi # Launch Clients -clients_per_cpu=${_CLIENTS_PER_CPU:-"1"} -nclients=$(( $clients_per_cpu * $(nproc) )) +nclients=${_NCLIENTS:-"1"} echo "Launching $nclients clients" export EWMS_PILOT_TASK_TIMEOUT=1800 # 30 mins for i in $( seq 1 $nclients ); do + if [[ $i == "2" ]]; then + sleep 60 # sleep past race condition + fi ./docker/launch_client.sh \ --client-startup-json ./startup.json \ --debug-directory $SKYSCAN_DEBUG_DIR \ @@ -61,6 +63,9 @@ for i in $( seq 1 $nclients ); do done +free -c 6 -s 10 # dump memory stats for 1 min + + # Wait for Everyone wait -n # for server for i in $( seq 1 $nclients ); do diff --git a/skymap_scanner/client/__init__.py b/skymap_scanner/client/__init__.py index e69de29bb..a08a6e9ee 100644 --- a/skymap_scanner/client/__init__.py +++ b/skymap_scanner/client/__init__.py @@ -0,0 +1,5 @@ +"""Init.""" + +import logging + +LOGGER = logging.getLogger("skyscan.client") diff --git a/skymap_scanner/client/__main__.py b/skymap_scanner/client/__main__.py index 5ab089393..2346ac67a 100644 --- a/skymap_scanner/client/__main__.py +++ b/skymap_scanner/client/__main__.py @@ -1,6 +1,7 @@ """Entry-point to start up client service.""" -from . import client +from . import LOGGER, client if __name__ == "__main__": client.main() + LOGGER.info("Done.") diff --git a/skymap_scanner/client/client.py b/skymap_scanner/client/client.py index c0130f911..897bb7375 100644 --- a/skymap_scanner/client/client.py +++ b/skymap_scanner/client/client.py @@ -3,19 +3,20 @@ import argparse import asyncio import json -import logging +import os from pathlib import Path import ewms_pilot from wipac_dev_tools import argparse_tools, logging_tools from .. import config as cfg - -LOGGER = logging.getLogger("skyscan.client") +from . import LOGGER def main() -> None: """Start up Client service.""" + LOGGER.critical(f'/dev/shm: {os.listdir("/dev/shm")}') + parser = argparse.ArgumentParser( description=( "Start up client daemon to perform reco scans on pixels " @@ -74,31 +75,37 @@ def main() -> None: " --out-pkl {{OUTFILE}}" # ^^^ " --gcdqp-packet-json GCDQp_packet.json" f" --baseline-gcd-file {startup_json_dict['baseline_GCD_file']}" + f" --reco-algo {startup_json_dict['reco_algo']}" + ) + + init_cmd = ( + "python -m skymap_scanner.client.prepare " + f" --reco-algo {startup_json_dict['reco_algo']}" ) # go! LOGGER.info( f"Starting up a Skymap Scanner client for event: {startup_json_dict['mq_basename']=}" ) - asyncio.run( - ewms_pilot.consume_and_reply( - cmd=cmd, - broker_client=cfg.ENV.SKYSCAN_BROKER_CLIENT, - broker_address=cfg.ENV.SKYSCAN_BROKER_ADDRESS, - auth_token=cfg.ENV.SKYSCAN_BROKER_AUTH, - queue_incoming=f"to-clients-{startup_json_dict['mq_basename']}", - queue_outgoing=f"from-clients-{startup_json_dict['mq_basename']}", - ftype_to_subproc=".pkl", - ftype_from_subproc=".pkl", - timeout_incoming=cfg.ENV.SKYSCAN_MQ_TIMEOUT_TO_CLIENTS, - timeout_outgoing=cfg.ENV.SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS, - timeout_wait_for_first_message=cfg.ENV.SKYSCAN_MQ_CLIENT_TIMEOUT_WAIT_FOR_FIRST_MESSAGE, - debug_dir=args.debug_directory, - task_timeout=cfg.ENV.EWMS_PILOT_TASK_TIMEOUT, + try: + asyncio.run( + ewms_pilot.consume_and_reply( + cmd=cmd, + init_cmd=init_cmd, + broker_client=cfg.ENV.SKYSCAN_BROKER_CLIENT, + broker_address=cfg.ENV.SKYSCAN_BROKER_ADDRESS, + auth_token=cfg.ENV.SKYSCAN_BROKER_AUTH, + queue_incoming=f"to-clients-{startup_json_dict['mq_basename']}", + queue_outgoing=f"from-clients-{startup_json_dict['mq_basename']}", + ftype_to_subproc=".pkl", + ftype_from_subproc=".pkl", + timeout_incoming=cfg.ENV.SKYSCAN_MQ_TIMEOUT_TO_CLIENTS, + timeout_outgoing=cfg.ENV.SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS, + timeout_wait_for_first_message=cfg.ENV.SKYSCAN_MQ_CLIENT_TIMEOUT_WAIT_FOR_FIRST_MESSAGE, + debug_dir=args.debug_directory, + task_timeout=cfg.ENV.EWMS_PILOT_TASK_TIMEOUT, + ) ) - ) - LOGGER.info("Done.") - - -if __name__ == "__main__": - main() + except: + LOGGER.critical(f'/dev/shm: {os.listdir("/dev/shm")}') + raise diff --git a/skymap_scanner/client/prepare.py b/skymap_scanner/client/prepare.py new file mode 100644 index 000000000..9c419dc0e --- /dev/null +++ b/skymap_scanner/client/prepare.py @@ -0,0 +1,51 @@ +"""Setup common resources for future IceTray instances of a specified +reconstruction.""" + + +import argparse +import logging +import os + +from wipac_dev_tools import logging_tools + +from .. import config as cfg +from .. import recos + +LOGGER = logging.getLogger("skyscan.client.prepare") + + +def main() -> None: + """Reco a single pixel.""" + parser = argparse.ArgumentParser( + description="Setup common resources for future IceTray instances of a specified reconstruction.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + # "physics" args + parser.add_argument( + "--reco-algo", + required=True, + choices=recos.get_all_reco_algos(), + help="The reconstruction algorithm prepare for", + ) + + args = parser.parse_args() + logging_tools.set_level( + cfg.ENV.SKYSCAN_LOG, # type: ignore[arg-type] + first_party_loggers="skyscan", + third_party_level=cfg.ENV.SKYSCAN_LOG_THIRD_PARTY, # type: ignore[arg-type] + use_coloredlogs=True, + ) + logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") + + # go! + RecoAlgo = recos.get_reco_interface_object(args.reco_algo) + reco = RecoAlgo() + reco.setup_reco(LOGGER) + + LOGGER.critical(f'/dev/shm: {os.listdir("/dev/shm")}') + + +if __name__ == "__main__": + main() + LOGGER.info("Done.") diff --git a/skymap_scanner/client/reco_icetray.py b/skymap_scanner/client/reco_icetray.py index 60048f22c..d26967950 100644 --- a/skymap_scanner/client/reco_icetray.py +++ b/skymap_scanner/client/reco_icetray.py @@ -11,7 +11,6 @@ from pathlib import Path from typing import Any, List, Union -from icecube.icetray import I3Tray # type: ignore[import] from icecube import ( # type: ignore[import] # noqa: F401 dataio, frame_object_diff, @@ -20,6 +19,7 @@ photonics_service, ) from icecube.frame_object_diff.segments import uncompress # type: ignore[import] +from icecube.icetray import I3Tray # type: ignore[import] from wipac_dev_tools import argparse_tools, logging_tools from .. import config as cfg @@ -146,7 +146,7 @@ def notifyStart(frame): # create instance of reco_algo object RecoAlgo = recos.get_reco_interface_object(reco_algo) reco = RecoAlgo() - reco.setup_reco() + reco.setup_reco(LOGGER) # perform fit tray.AddSegment( @@ -241,7 +241,13 @@ def main() -> None: ), ) - # extra "physics" args + # "physics" args + parser.add_argument( + "--reco-algo", + required=True, + choices=recos.get_all_reco_algos(), + help="The reconstruction algorithm to use", + ) parser.add_argument( "--gcdqp-packet-json", dest="GCDQp_packet_json", @@ -274,10 +280,11 @@ def main() -> None: ) logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") + LOGGER.critical(f'/dev/shm: {os.listdir("/dev/shm")}') + # get PFrame with open(args.in_pkl, "rb") as f: msg = pickle.load(f) - reco_algo = msg[cfg.MSG_KEY_RECO_ALGO] pframe = msg[cfg.MSG_KEY_PFRAME] # get GCDQp_packet @@ -288,14 +295,14 @@ def main() -> None: # go! reco_pixel( - reco_algo, + args.reco_algo, pframe, GCDQp_packet, str(args.baseline_GCD_file), args.out_pkl, ) - LOGGER.info("Done reco'ing pixel.") if __name__ == "__main__": main() + LOGGER.info("Done.") diff --git a/skymap_scanner/config.py b/skymap_scanner/config.py index 236c3e483..c8fe5420c 100644 --- a/skymap_scanner/config.py +++ b/skymap_scanner/config.py @@ -44,7 +44,6 @@ STATEDICT_BASELINE_GCD_FILE: Final = "baseline_GCD_file" STATEDICT_NSIDES: Final = "nsides" # -MSG_KEY_RECO_ALGO: Final = "reco_algo" MSG_KEY_PFRAME: Final = "pframe" BASELINE_GCD_FILENAME = "base_GCD_for_diff.i3" diff --git a/skymap_scanner/recos/__init__.py b/skymap_scanner/recos/__init__.py index 447fc3bdc..bb145fd69 100644 --- a/skymap_scanner/recos/__init__.py +++ b/skymap_scanner/recos/__init__.py @@ -67,10 +67,13 @@ def prepare_frames(self, tray, name, **kwargs) -> None: pass @abstractmethod - def setup_reco(self): - """Performs the necessary operations to prepare the execution of the reconstruction traysegment. + def setup_reco(self, logger): + """Performs the necessary operations to prepare the execution of the + reconstruction traysegment. - This method is expected to perform "expensive" operations such as fetching spline data and initializing IceTray spline services. + This method is expected to perform "expensive" operations such + as fetching spline data and initializing IceTray spline + services. """ pass diff --git a/skymap_scanner/recos/dummy.py b/skymap_scanner/recos/dummy.py index a753ffc09..9277b38ce 100644 --- a/skymap_scanner/recos/dummy.py +++ b/skymap_scanner/recos/dummy.py @@ -34,8 +34,8 @@ def __init__(self): self.refine_time = True self.add_fallback_position = False - def setup_reco(self): - pass + def setup_reco(self, logger): + logger.info("Dummy reco has nothing to setup :)") @staticmethod def get_vertex_variations() -> List[dataclasses.I3Position]: diff --git a/skymap_scanner/recos/millipede_original.py b/skymap_scanner/recos/millipede_original.py index 2ef5cb34f..3f3b859ab 100644 --- a/skymap_scanner/recos/millipede_original.py +++ b/skymap_scanner/recos/millipede_original.py @@ -97,7 +97,7 @@ def __init__(self): self.refine_time = False self.add_fallback_position = False - def setup_reco(self): + def setup_reco(self, logger): datastager = self.get_datastager() datastager.stage_files(self.SPLINE_REQUIREMENTS) @@ -105,7 +105,9 @@ def setup_reco(self): abs_spline = datastager.get_filepath(self.MIE_ABS_SPLINE) prob_spline = datastager.get_filepath(self.MIE_PROB_SPLINE) + logger.info("Starting I3PhotoSplineService...") self.cascade_service = photonics_service.I3PhotoSplineService(abs_spline, prob_spline, timingSigma=0.0) + logger.info("Started I3PhotoSplineService.") self.cascade_service.SetEfficiencies(self.SPEScale) diff --git a/skymap_scanner/recos/millipede_wilks.py b/skymap_scanner/recos/millipede_wilks.py index ee3a970fd..6f7d901c9 100644 --- a/skymap_scanner/recos/millipede_wilks.py +++ b/skymap_scanner/recos/millipede_wilks.py @@ -55,11 +55,11 @@ def __init__(self): @staticmethod def get_vertex_variations() -> List[dataclasses.I3Position]: - """Returns a list of vectors referenced to the origin that will be used to generate the vertex position variations. - """ + """Returns a list of vectors referenced to the origin that will be used + to generate the vertex position variations.""" return VertexGenerator.point() - def setup_reco(self): + def setup_reco(self, logger): datastager = self.get_datastager() datastager.stage_files(self.SPLINE_REQUIREMENTS) @@ -70,6 +70,7 @@ def setup_reco(self): effp_spline: str = datastager.get_filepath(self.FTP_EFFP_SPLINE) tmod_spline: str = datastager.get_filepath(self.FTP_TMOD_SPLINE) + logger.info("Starting I3PhotoSplineService...") self.cascade_service = photonics_service.I3PhotoSplineService( abs_spline, prob_spline, timingSigma=0.0, effectivedistancetable = effd_spline, @@ -78,6 +79,7 @@ def setup_reco(self): effectivedistancetableprob = effp_spline, effectivedistancetabletmod = tmod_spline ) + logger.info("Started I3PhotoSplineService.") self.muon_service = None diff --git a/skymap_scanner/recos/splinempe.py b/skymap_scanner/recos/splinempe.py index 96fda512c..db5d9a824 100644 --- a/skymap_scanner/recos/splinempe.py +++ b/skymap_scanner/recos/splinempe.py @@ -264,7 +264,7 @@ def extract_seed(frame): def get_vertex_variations(self): return VertexGenerator.cylinder() - def setup_reco(self): + def setup_reco(self, logger): datastager = self.get_datastager() datastager.stage_files(self.SPLINE_REQUIREMENTS) @@ -274,6 +274,7 @@ def setup_reco(self): StochTimingSpline: str = datastager.get_filepath(self.MIE_STOCH_PROB) StochAmplitudeSpline: str = datastager.get_filepath(self.MIE_STOCH_ABS) + logger.info("Starting I3PhotoSplineServices...") self.bare_mu_spline = I3PhotoSplineService( BareMuAmplitudeSpline, BareMuTimingSpline, @@ -289,10 +290,11 @@ def setup_reco(self): BareMuTimingSpline, timingSigma=1000, ) + logger.info("Started I3PhotoSplineServices.") @traysegment def traysegment(self, tray, name, logger, **kwargs): - """SplineMPE reco""" + """SplineMPE reco.""" def checkName(frame: I3Frame, name: str) -> None: if name not in frame: diff --git a/skymap_scanner/server/__main__.py b/skymap_scanner/server/__main__.py index 714033891..297223460 100644 --- a/skymap_scanner/server/__main__.py +++ b/skymap_scanner/server/__main__.py @@ -1,6 +1,7 @@ """Entry-point to start up server.""" -from . import start_scan +from . import LOGGER, start_scan if __name__ == "__main__": start_scan.main() + LOGGER.info("Done.") diff --git a/skymap_scanner/server/start_scan.py b/skymap_scanner/server/start_scan.py index 67aff6278..ca0d8a807 100644 --- a/skymap_scanner/server/start_scan.py +++ b/skymap_scanner/server/start_scan.py @@ -15,14 +15,13 @@ import healpy # type: ignore[import] import mqclient as mq import numpy -from icecube.icetray import I3Units # type: ignore[import] from icecube import ( # type: ignore[import] astro, dataclasses, full_event_followup, icetray, ) -from rest_tools.client import RestClient +from icecube.icetray import I3Units # type: ignore[import] from skyreader import EventMetadata from wipac_dev_tools import argparse_tools, logging_tools @@ -358,7 +357,6 @@ async def scan( total_n_pixfin = await _serve_and_collect( to_clients_queue, from_clients_queue, - reco_algo, nsides_dict, pixeler, reporter, @@ -378,7 +376,6 @@ async def scan( async def _send_pixels( to_clients_queue: mq.Queue, - reco_algo: str, pixeler: PixelsToReco, already_sent_pixvars: Set[SentPixelVariation], nside_subprogression: NSideProgression, @@ -394,7 +391,6 @@ async def _send_pixels( LOGGER.info(f"Sending message M#{i} {pframe_tuple(pframe)}...") await pub.send( { - cfg.MSG_KEY_RECO_ALGO: reco_algo, cfg.MSG_KEY_PFRAME: pframe, } ) @@ -411,7 +407,6 @@ async def _send_pixels( async def _serve_and_collect( to_clients_queue: mq.Queue, from_clients_queue: mq.Queue, - reco_algo: str, nsides_dict: NSidesDict, pixeler: PixelsToReco, reporter: Reporter, @@ -440,7 +435,6 @@ async def _serve_and_collect( # sent_pixvars = await _send_pixels( to_clients_queue, - reco_algo, pixeler, collector.sent_pixvars, # we want to open re-refinement for all nsides <= max_nside_thresholded @@ -508,6 +502,7 @@ def write_startup_json( nside_progression: NSideProgression, baseline_GCD_file: str, GCDQp_packet: List[icetray.I3Frame], + reco_algo: str, ) -> str: """Write startup JSON file for client-spawning. @@ -531,6 +526,7 @@ def write_startup_json( GCDQp_packet, pnf_framing=False ) ), + "reco_algo": reco_algo, } with open(client_startup_json, "w") as f: @@ -711,6 +707,7 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]: args.nside_progression, state_dict[cfg.STATEDICT_BASELINE_GCD_FILE], state_dict[cfg.STATEDICT_GCDQP_PACKET], + args.reco_algo, ) # make mq connections @@ -746,8 +743,3 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]: predictive_scanning_threshold=args.predictive_scanning_threshold, ) ) - LOGGER.info("Done.") - - -if __name__ == "__main__": - main() diff --git a/skymap_scanner/utils/data_handling.py b/skymap_scanner/utils/data_handling.py index f44f0e972..c3b1dda69 100644 --- a/skymap_scanner/utils/data_handling.py +++ b/skymap_scanner/utils/data_handling.py @@ -1,14 +1,15 @@ -from .. import config as cfg # type: ignore[import] -from pathlib import Path import subprocess +from pathlib import Path from typing import Dict, List, Union +from .. import config as cfg # type: ignore[import] from . import LOGGER class DataStager: - """ - Class to manage the staging of (spline) data from different sources (in-container, mountpoint, CVMFS, http). + """Class to manage the staging of (spline) data from different sources (in- + container, mountpoint, CVMFS, http). + Some similarity in the paths is assumed. """ @@ -20,27 +21,28 @@ def __init__(self, local_paths: List[Path], local_subdir: str, remote_path: str) self.staging_path.mkdir(exist_ok=True) def stage_files(self, file_list: List[str]): - """Checks local availability for filenames in a list, and retrieves the missing ones from the HTTP source. + """Checks local availability for filenames in a list, and retrieves the + missing ones from the HTTP source. Args: file_list (List[str]): list of file filenames to look up / retrieve. """ - LOGGER.debug(f"Staging files in filelist: {file_list}") + LOGGER.info(f"Staging files in filelist: {file_list}") for basename in file_list: try: filepath: str = self.get_local_filepath(basename) except FileNotFoundError: - LOGGER.debug( - f"File {basename} is not available on default local paths." - ) + LOGGER.info(f"File {basename} is not available on default local paths.") if (self.staging_path / basename).is_file(): - LOGGER.debug("File is available on staging path.") + LOGGER.info("File is available on staging path.") else: - LOGGER.debug("Staging from HTTP source.") + LOGGER.info("Staging from HTTP source.") self.stage_file(basename) else: - LOGGER.debug(f"File {basename} is available at {filepath}.") + LOGGER.info(f"File {basename} is available at {filepath}.") + + LOGGER.info(f"Finished staging files in filelist: {file_list}") def stage_file(self, basename: str): """Retrieves a file from the HTTP source. @@ -72,7 +74,8 @@ def stage_file(self, basename: str): ) def get_filepath(self, filename: str) -> str: - """Look up basename under the local paths and the staging path and returns the first valid filename. + """Look up basename under the local paths and the staging path and + returns the first valid filename. Args: basename (str): file basename to look up. @@ -86,7 +89,7 @@ def get_filepath(self, filename: str) -> str: except FileNotFoundError: filepath = self.staging_path / filename if filepath.is_file(): - LOGGER.info("File {filename} available at {filepath}.") + LOGGER.info(f"File {filename} available at {filepath}.") return str(filepath) else: raise FileNotFoundError( @@ -94,7 +97,8 @@ def get_filepath(self, filename: str) -> str: ) def get_local_filepath(self, filename: str) -> str: - """Look up filename on local paths and return the first matching filename. + """Look up filename on local paths and return the first matching + filename. Args: filename (str): the filename of the file to look up. @@ -108,10 +112,9 @@ def get_local_filepath(self, filename: str) -> str: filepath = subdir / filename LOGGER.debug(f"Trying to read {filepath}...") if filepath.is_file(): - LOGGER.debug(f"-> success.") - filename = str(filepath) - return filename - else: - LOGGER.debug(f"-> fail.") - # File was not found in local paths. - raise FileNotFoundError(f"File {filename} is not available on any local path.") + LOGGER.debug(f"File found {filename}.") + return str(filepath) + + raise FileNotFoundError( + f"File {filename} is not available on any local path: {self.local_paths}." + ) diff --git a/skymap_scanner/utils/load_scan_state.py b/skymap_scanner/utils/load_scan_state.py index f873ea7de..f8a13adff 100644 --- a/skymap_scanner/utils/load_scan_state.py +++ b/skymap_scanner/utils/load_scan_state.py @@ -47,10 +47,10 @@ def get_baseline_gcd_frames(baseline_GCD_file, GCDQp_packet) -> List[icetray.I3F try: baseline_GCD_frames = load_framepacket_from_file(baseline_GCD_file) except: - LOGGER.debug(" -> failed") + LOGGER.debug(f"Failed to read GCD from {baseline_GCD_file}") raise RuntimeError("Unable to read baseline GCD. In the current design, this is unexpected. Possibly a bug or data corruption!") if baseline_GCD_frames is not None: - LOGGER.debug(" -> success") + LOGGER.debug("Retrieved GCD from file") # NOTE: Legacy code used to scan a list of GCD_BASE_DIRS. # It is now assumed that assume that the passed `baseline_GCD_file` is a valid path to a baseline GCD file. @@ -155,7 +155,7 @@ def load_GCDQp_state(event_metadata: EventMetadata, cache_dir="./cache/") -> dic LOGGER.debug("load_GCDQp_state => reading source baseline GCD from {0}".format(read_path)) source_baseline_GCD_framepacket = load_framepacket_from_file(read_path) except: - LOGGER.debug(" -> failed") + LOGGER.debug(f"Failed to load source baseline GCD from {read_path}") source_baseline_GCD_framepacket = None if source_baseline_GCD_framepacket is None: raise RuntimeError(f"load_GCDQp_state => Could not read the source GCD file \"{source_baseline_GCD_metadata}\"") diff --git a/tests/data/reco_pixel_pkls/millipede_original/BRONZE/startup.json b/tests/data/reco_pixel_pkls/millipede_original/BRONZE/startup.json index 1064b57f0..2c6bf1028 100644 --- a/tests/data/reco_pixel_pkls/millipede_original/BRONZE/startup.json +++ b/tests/data/reco_pixel_pkls/millipede_original/BRONZE/startup.json @@ -1,4 +1,5 @@ { + "reco_algo": "millipede_original", "mq_basename": "run00136662.evt000035405932.HESE-1-1", "baseline_GCD_file": "/opt/i3-data/baseline_gcds/baseline_gcd_135417.i3", "GCDQp_packet": { diff --git a/tests/data/reco_pixel_pkls/millipede_original/GOLD/startup.json b/tests/data/reco_pixel_pkls/millipede_original/GOLD/startup.json index cf860c045..cc21a3193 100644 --- a/tests/data/reco_pixel_pkls/millipede_original/GOLD/startup.json +++ b/tests/data/reco_pixel_pkls/millipede_original/GOLD/startup.json @@ -1,4 +1,5 @@ { + "reco_algo": "millipede_original", "mq_basename": "run00136766.evt000007637140.HESE-1-1", "baseline_GCD_file": "/opt/i3-data/baseline_gcds/baseline_gcd_135417.i3", "GCDQp_packet": { diff --git a/tests/data/reco_pixel_pkls/millipede_original/JSON/startup.json b/tests/data/reco_pixel_pkls/millipede_original/JSON/startup.json index 0bb64909e..715dcdcf9 100644 --- a/tests/data/reco_pixel_pkls/millipede_original/JSON/startup.json +++ b/tests/data/reco_pixel_pkls/millipede_original/JSON/startup.json @@ -1,4 +1,5 @@ { + "reco_algo": "millipede_original", "mq_basename": "run00127907.evt000020178442.HESE-1-1", "baseline_GCD_file": "/opt/i3-data/baseline_gcds/2016_01_08_Run127381.i3", "GCDQp_packet": { diff --git a/tests/data/reco_pixel_pkls/millipede_wilks/BRONZE/startup.json b/tests/data/reco_pixel_pkls/millipede_wilks/BRONZE/startup.json index 93a391290..592181a7d 100644 --- a/tests/data/reco_pixel_pkls/millipede_wilks/BRONZE/startup.json +++ b/tests/data/reco_pixel_pkls/millipede_wilks/BRONZE/startup.json @@ -1,4 +1,5 @@ { + "reco_algo": "millipede_wilks", "scan_id": "35405932-1:0-1684764148", "mq_basename": "35405932-1:0-1684764148", "baseline_GCD_file": "/opt/i3-data/baseline_gcds/baseline_gcd_135417.i3", diff --git a/tests/data/reco_pixel_pkls/millipede_wilks/GOLD/startup.json b/tests/data/reco_pixel_pkls/millipede_wilks/GOLD/startup.json index 40d57cb72..758df3e93 100644 --- a/tests/data/reco_pixel_pkls/millipede_wilks/GOLD/startup.json +++ b/tests/data/reco_pixel_pkls/millipede_wilks/GOLD/startup.json @@ -1,4 +1,5 @@ { + "reco_algo": "millipede_wilks", "scan_id": "7637140-1:0-1684845570", "mq_basename": "7637140-1:0-1684845570", "baseline_GCD_file": "/opt/i3-data/baseline_gcds/baseline_gcd_135417.i3", diff --git a/tests/data/reco_pixel_pkls/millipede_wilks/JSON/startup.json b/tests/data/reco_pixel_pkls/millipede_wilks/JSON/startup.json index a1564b058..5cb92db1c 100644 --- a/tests/data/reco_pixel_pkls/millipede_wilks/JSON/startup.json +++ b/tests/data/reco_pixel_pkls/millipede_wilks/JSON/startup.json @@ -1,4 +1,5 @@ { + "reco_algo": "millipede_wilks", "scan_id": "20178442-1:0-1684859396", "mq_basename": "20178442-1:0-1684859396", "baseline_GCD_file": "/opt/i3-data/baseline_gcds/2016_01_08_Run127381.i3", diff --git a/tests/data/reco_pixel_pkls/splinempe/BRONZE/startup.json b/tests/data/reco_pixel_pkls/splinempe/BRONZE/startup.json index 34a453a26..63ad583a1 100644 --- a/tests/data/reco_pixel_pkls/splinempe/BRONZE/startup.json +++ b/tests/data/reco_pixel_pkls/splinempe/BRONZE/startup.json @@ -1,4 +1,5 @@ { + "reco_algo": "splinempe", "scan_id": "35405932-1:0-1689867940", "mq_basename": "35405932-1:0-1689867940", "baseline_GCD_file": "/opt/i3-data/baseline_gcds/baseline_gcd_135417.i3", diff --git a/tests/data/reco_pixel_pkls/splinempe/GOLD/startup.json b/tests/data/reco_pixel_pkls/splinempe/GOLD/startup.json index 6446dcebb..4b073f4d7 100644 --- a/tests/data/reco_pixel_pkls/splinempe/GOLD/startup.json +++ b/tests/data/reco_pixel_pkls/splinempe/GOLD/startup.json @@ -1,4 +1,5 @@ { + "reco_algo": "splinempe", "scan_id": "7637140-1:0-1689846722", "mq_basename": "7637140-1:0-1689846722", "baseline_GCD_file": "/opt/i3-data/baseline_gcds/baseline_gcd_135417.i3", diff --git a/tests/env-vars.sh b/tests/env-vars.sh index 6150b0c99..976415e2a 100755 --- a/tests/env-vars.sh +++ b/tests/env-vars.sh @@ -20,6 +20,6 @@ export SKYSCAN_MINI_TEST=${SKYSCAN_MINI_TEST:-'yes'} export SKYSCAN_LOG=${SKYSCAN_LOG:-"DEBUG"} export SKYSCAN_LOG_THIRD_PARTY=${SKYSCAN_LOG_THIRD_PARTY:-"INFO"} -export CLIENT_STARTER_WAIT_FOR_STARTUP_JSON=${CLIENT_STARTER_WAIT_FOR_STARTUP_JSON:-12} +export CLIENT_STARTER_WAIT_FOR_STARTUP_JSON=${CLIENT_STARTER_WAIT_FOR_STARTUP_JSON:-120} set +ex # file is sourced so turn off \ No newline at end of file