diff --git a/banzai/celery.py b/banzai/celery.py index 28637130..4654f6e9 100644 --- a/banzai/celery.py +++ b/banzai/celery.py @@ -37,12 +37,14 @@ def configure_workers(**kwargs): app.conf.update(broker_url=os.getenv('TASK_HOST', 'redis://localhost:6379/0'), worker_hijack_root_logger=False) celery_task_queue_name = os.getenv('CELERY_TASK_QUEUE_NAME', 'celery') +celery_large_task_queue_name = os.getenv('CELERY_LARGE_TASK_QUEUE_NAME', 'celery_large') # Set up custom named celery task queue # https://docs.celeryproject.org/en/stable/userguide/routing.html#manual-routing app.conf.task_default_queue = celery_task_queue_name app.conf.task_queues = ( Queue(celery_task_queue_name, routing_key=f'{celery_task_queue_name}.#'), + Queue(celery_large_task_queue_name, routing_key=f'{celery_large_task_queue_name}.#') ) app.conf.task_default_exchange = 'tasks' app.conf.task_default_exchange_type = 'topic' diff --git a/banzai/main.py b/banzai/main.py index bd1c1fd5..5279dd48 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -62,7 +62,8 @@ def on_message(self, body, message): else: queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME process_image.apply_async(args=(body, vars(self.runtime_context)), - queue=queue_name) + queue=queue_name, + routing_key=f'{queue_name}#') message.ack() # acknowledge to the sender we got this message (it can be popped)