Skip to content

Commit

Permalink
Update celery config to correctly handle both large and small task qu…
Browse files Browse the repository at this point in the history
…eues.
  • Loading branch information
mgdaily committed Nov 11, 2024
1 parent c8f13f6 commit c4f12e2
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
2 changes: 2 additions & 0 deletions banzai/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ def configure_workers(**kwargs):
app.conf.update(broker_url=os.getenv('TASK_HOST', 'redis://localhost:6379/0'),
worker_hijack_root_logger=False)
celery_task_queue_name = os.getenv('CELERY_TASK_QUEUE_NAME', 'celery')
celery_large_task_queue_name = os.getenv('CELERY_LARGE_TASK_QUEUE_NAME', 'celery_large')

# Set up custom named celery task queue
# https://docs.celeryproject.org/en/stable/userguide/routing.html#manual-routing
app.conf.task_default_queue = celery_task_queue_name
app.conf.task_queues = (
Queue(celery_task_queue_name, routing_key=f'{celery_task_queue_name}.#'),
Queue(celery_large_task_queue_name, routing_key=f'{celery_large_task_queue_name}.#')
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
Expand Down
3 changes: 2 additions & 1 deletion banzai/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def on_message(self, body, message):
else:
queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME
process_image.apply_async(args=(body, vars(self.runtime_context)),
queue=queue_name)
queue=queue_name,
routing_key=f'{queue_name}#')
message.ack() # acknowledge to the sender we got this message (it can be popped)


Expand Down

0 comments on commit c4f12e2

Please sign in to comment.