diff --git a/skymap_scanner/server/start_scan.py b/skymap_scanner/server/start_scan.py index 0788d97e5..dc5e3c529 100644 --- a/skymap_scanner/server/start_scan.py +++ b/skymap_scanner/server/start_scan.py @@ -427,7 +427,7 @@ async def _serve_and_collect( max_nside_thresholded = None # -> generates first nside collected_all_sent = False - async with collector.finder_context(), from_clients_queue.open_sub() as sub: + async with collector.finder_context(): while True: # # SEND PIXELS -- the next logical round of pixels (not necessarily the next nside) @@ -456,27 +456,30 @@ async def _serve_and_collect( # LOGGER.info("Receiving recos from clients...") collected_all_sent = False - async for msg in sub: - if not isinstance(msg['reco_pixel_variation'], RecoPixelVariation): - raise ValueError( - f"Message not {RecoPixelVariation}: {type(msg['reco_pixel_variation'])}" - ) - try: - await collector.collect(msg['reco_pixel_variation'], msg['runtime']) - except ExtraRecoPixelVariationException as e: - logging.error(e) - - # if we've got enough pixfins, let's get a jump on the next round - if max_nside_thresholded := collector.get_max_nside_thresholded(): - collected_all_sent = collector.has_collected_all_sent() - # NOTE: POTENTIAL END-GAME SCENARIO - # nsides=[8,64,512]. 512 & 64 have been done for a long time. - # Now, 8 just thresholded. We don't know if a re-refinement - # is needed. (IOW were/are the most recent nside-8 pixels very - # important?) AND if they turn out to not warrant re-refinement, - # we need to know if we collected everything AKA we're done! - LOGGER.info(f"Threshold met (max={max_nside_thresholded})") - break + async with from_clients_queue.open_sub() as sub: # re-open to avoid inactivity timeout (applicable for rabbitmq) + async for msg in sub: + if not isinstance(msg['reco_pixel_variation'], RecoPixelVariation): + raise ValueError( + f"Message not {RecoPixelVariation}: {type(msg['reco_pixel_variation'])}" + ) + try: + await collector.collect( + msg['reco_pixel_variation'], msg['runtime'] + ) + except ExtraRecoPixelVariationException as e: + logging.error(e) + + # if we've got enough pixfins, let's get a jump on the next round + if max_nside_thresholded := collector.get_max_nside_thresholded(): + collected_all_sent = collector.has_collected_all_sent() + # NOTE: POTENTIAL END-GAME SCENARIO + # nsides=[8,64,512]. 512 & 64 have been done for a long time. + # Now, 8 just thresholded. We don't know if a re-refinement + # is needed. (IOW were/are the most recent nside-8 pixels very + # important?) AND if they turn out to not warrant re-refinement, + # we need to know if we collected everything AKA we're done! + LOGGER.info(f"Threshold met (max={max_nside_thresholded})") + break # do-while loop logic if max_nside_thresholded: