Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/dropped messages #399

Merged
merged 8 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/build-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ concurrency: ${{ github.workflow }}-${{ github.ref }}
on:
push:
branches:
- "*"
- "**"
tags:
- "*"
pull_request:
Expand Down Expand Up @@ -47,6 +47,11 @@ jobs:
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}

- name: Build and also push Dockerimage
id: build-and-push
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.19.1 (2024-11-05)
-------------------
- Added extra logging and try excepts to catch frames that bypass silently

1.19.0 (2024-10-16)
-------------------
- Added the ability to read fits files that are pre downloaded and are already in memory
Expand Down
1 change: 1 addition & 0 deletions banzai/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def process_image(file_info: dict, runtime_context: dict):
:param file_info: Body of queue message: dict
:param runtime_context: Context object with runtime environment info
"""
logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')})
runtime_context = Context(runtime_context)
try:
if realtime_utils.need_to_process_image(file_info, runtime_context):
Expand Down
9 changes: 8 additions & 1 deletion banzai/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import argparse
import os.path
import logging
import traceback

from kombu import Exchange, Connection, Queue
from kombu.mixins import ConsumerMixin
Expand Down Expand Up @@ -47,7 +48,13 @@ def get_consumers(self, Consumer, channel):
return [consumer]

def on_message(self, body, message):
instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address)
logger.info('Received message', extra_tags={'filename': body['filename']})
try:
instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address)
except Exception:
logger.error(f'Could not get instrument from header. {traceback.format_exc()}', extra_tags={'filename': body['filename']})
message.ack()
return
if instrument is None or instrument.nx is None:
queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME
elif instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD:
Expand Down
12 changes: 10 additions & 2 deletions banzai/utils/realtime_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os

from banzai import dbs
from banzai.utils import file_utils, import_utils, image_utils
from banzai.data import HeaderOnly
Expand Down Expand Up @@ -48,6 +49,7 @@ def need_to_process_image(file_info, context):

if 'frameid' in file_info:
if 'version_set' not in file_info:
logger.info("Version set not available in file_info", extra_tags={"filename": file_info['filename']})
return True
checksum = file_info['version_set'][0].get('md5')
filename = file_info['filename']
Expand All @@ -57,7 +59,7 @@ def need_to_process_image(file_info, context):

logger.info("Checking if file needs to be processed", extra_tags={"filename": filename})
if not (filename.endswith('.fits') or filename.endswith('.fits.fz')):
logger.debug("Filename does not have a .fits extension, stopping reduction",
logger.error("Filename does not have a .fits extension, stopping reduction",
extra_tags={"filename": filename})
return False

Expand All @@ -70,6 +72,7 @@ def need_to_process_image(file_info, context):
# Check the md5.
# Reset the number of tries if the file has changed on disk/in s3
if image.checksum != checksum:
logger.info('File has changed on disk. Resetting success flags and tries', extra_tags={'filename': filename})
need_to_process = True
image.checksum = checksum
image.tries = 0
Expand All @@ -78,6 +81,7 @@ def need_to_process_image(file_info, context):

# Check if we need to try again
elif image.tries < context.max_tries and not image.success:
logger.info('File has not been successfully processed yet. Trying again.', extra_tags={'filename': filename})
need_to_process = True
dbs.commit_processed_image(image, context.db_address)

Expand All @@ -88,7 +92,11 @@ def need_to_process_image(file_info, context):
factory = import_utils.import_attribute(context.FRAME_FACTORY)()
test_image = factory.observation_frame_class(hdu_list=[HeaderOnly(file_info, name='')],
file_path=file_info['filename'])
test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address)
try:
test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address)
except Exception:
logger.error(f'Issue getting instrument from header. {logs.format_exception()}', extra_tags={'filename': filename})
need_to_process = False
if image_utils.get_reduction_level(test_image.meta) != '00':
logger.error('Image has nonzero reduction level. Aborting.', extra_tags={'filename': filename})
need_to_process = False
Expand Down
Loading