Skip to content

Commit

Permalink
Avoid Inactivty Timeout (#185)
Browse files Browse the repository at this point in the history
Co-authored-by: wipacdevbot <[email protected]>
  • Loading branch information
ric-evans and wipacdevbot authored May 4, 2023
1 parent f068de1 commit f05a82f
Showing 1 changed file with 25 additions and 22 deletions.
47 changes: 25 additions & 22 deletions skymap_scanner/server/start_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ async def _serve_and_collect(

max_nside_thresholded = None # -> generates first nside
collected_all_sent = False
async with collector.finder_context(), from_clients_queue.open_sub() as sub:
async with collector.finder_context():
while True:
#
# SEND PIXELS -- the next logical round of pixels (not necessarily the next nside)
Expand Down Expand Up @@ -456,27 +456,30 @@ async def _serve_and_collect(
#
LOGGER.info("Receiving recos from clients...")
collected_all_sent = False
async for msg in sub:
if not isinstance(msg['reco_pixel_variation'], RecoPixelVariation):
raise ValueError(
f"Message not {RecoPixelVariation}: {type(msg['reco_pixel_variation'])}"
)
try:
await collector.collect(msg['reco_pixel_variation'], msg['runtime'])
except ExtraRecoPixelVariationException as e:
logging.error(e)

# if we've got enough pixfins, let's get a jump on the next round
if max_nside_thresholded := collector.get_max_nside_thresholded():
collected_all_sent = collector.has_collected_all_sent()
# NOTE: POTENTIAL END-GAME SCENARIO
# nsides=[8,64,512]. 512 & 64 have been done for a long time.
# Now, 8 just thresholded. We don't know if a re-refinement
# is needed. (IOW were/are the most recent nside-8 pixels very
# important?) AND if they turn out to not warrant re-refinement,
# we need to know if we collected everything AKA we're done!
LOGGER.info(f"Threshold met (max={max_nside_thresholded})")
break
async with from_clients_queue.open_sub() as sub: # re-open to avoid inactivity timeout (applicable for rabbitmq)
async for msg in sub:
if not isinstance(msg['reco_pixel_variation'], RecoPixelVariation):
raise ValueError(
f"Message not {RecoPixelVariation}: {type(msg['reco_pixel_variation'])}"
)
try:
await collector.collect(
msg['reco_pixel_variation'], msg['runtime']
)
except ExtraRecoPixelVariationException as e:
logging.error(e)

# if we've got enough pixfins, let's get a jump on the next round
if max_nside_thresholded := collector.get_max_nside_thresholded():
collected_all_sent = collector.has_collected_all_sent()
# NOTE: POTENTIAL END-GAME SCENARIO
# nsides=[8,64,512]. 512 & 64 have been done for a long time.
# Now, 8 just thresholded. We don't know if a re-refinement
# is needed. (IOW were/are the most recent nside-8 pixels very
# important?) AND if they turn out to not warrant re-refinement,
# we need to know if we collected everything AKA we're done!
LOGGER.info(f"Threshold met (max={max_nside_thresholded})")
break

# do-while loop logic
if max_nside_thresholded:
Expand Down

0 comments on commit f05a82f

Please sign in to comment.