diff --git a/banzai/celery.py b/banzai/celery.py index ed24675d..5e1c9998 100644 --- a/banzai/celery.py +++ b/banzai/celery.py @@ -116,7 +116,8 @@ def schedule_calibration_stacking(site: str, runtime_context: dict, min_date=Non 'instrument': instrument.camera, 'frame_type': frame_type}) stack_calibrations.apply_async(args=(stacking_min_date, stacking_max_date, instrument.id, frame_type, vars(runtime_context), blocks_for_calibration), - countdown=message_delay_in_seconds) + countdown=message_delay_in_seconds, + queue=runtime_context.CELERY_STACK_QUEUE_NAME) else: logger.warning('No scheduled calibration blocks found.', extra_tags={'site': site, 'min_date': min_date, 'max_date': max_date, diff --git a/banzai/settings.py b/banzai/settings.py index 348194ad..327c3fae 100644 --- a/banzai/settings.py +++ b/banzai/settings.py @@ -143,3 +143,5 @@ LOSSLESS_EXTENSIONS = [] CELERY_TASK_QUEUE_NAME = os.getenv('CELERY_TASK_QUEUE_NAME', 'celery') + +CELERY_STACK_QUEUE_NAME = os.getenv('CELERY_STACK_QUEUE_NAME', 'celery') diff --git a/banzai/tests/test_celery.py b/banzai/tests/test_celery.py index 5f2471dd..7bd91802 100644 --- a/banzai/tests/test_celery.py +++ b/banzai/tests/test_celery.py @@ -180,7 +180,8 @@ def setup(self): self.max_date = '2019-02-20T09:55:09' self.context = Context({'db_address': 'db_address', 'CALIBRATION_IMAGE_TYPES': ['BIAS'], 'CALIBRATION_STACK_DELAYS': {'BIAS': 300}, - 'CALIBRATION_STACKER_STAGES': {'BIAS': ['banzai.bias.BiasMaker']}}) + 'CALIBRATION_STACKER_STAGES': {'BIAS': ['banzai.bias.BiasMaker']}, + 'CELERY_STACK_QUEUE_NAME': 'celery-stack'}) self.frame_type = 'BIAS' self.fake_blocks_response_json = fake_blocks_response_json self.fake_inst = FakeInstrument(site='coj', camera='2m0-SciCam-Spectral', enclosure='clma', telescope='2m0a') @@ -198,7 +199,7 @@ def test_submit_stacking_tasks_to_queue_no_delay(self, mock_filter_blocks, mock_ mock_stack_calibrations.assert_called_with(args=(self.min_date, self.max_date, self.fake_inst.id, self.frame_type, vars(self.context), mock_filter_blocks.return_value), - countdown=0) + countdown=0, queue='celery-stack') @mock.patch('banzai.celery.stack_calibrations.apply_async') @mock.patch('banzai.celery.dbs.get_instruments_at_site') @@ -215,7 +216,8 @@ def test_submit_stacking_tasks_to_queue_with_delay(self, mock_filter_blocks, moc mock_stack_calibrations.assert_called_with(args=(self.min_date, self.max_date, self.fake_inst.id, self.frame_type, vars(self.context), mock_filter_blocks.return_value), - countdown=(60+CALIBRATION_STACK_DELAYS['BIAS'])) + countdown=(60+CALIBRATION_STACK_DELAYS['BIAS']), + queue='celery-stack') @mock.patch('banzai.calibrations.make_master_calibrations') @mock.patch('banzai.celery.dbs.get_individual_cal_frames') diff --git a/helm-chart/banzai/templates/_helpers.tpl b/helm-chart/banzai/templates/_helpers.tpl index 40693ef5..1923f4c5 100644 --- a/helm-chart/banzai/templates/_helpers.tpl +++ b/helm-chart/banzai/templates/_helpers.tpl @@ -175,6 +175,8 @@ Celery task queue configuration value: {{ .Values.banzai.queueName | quote }} - name: CELERY_TASK_QUEUE_NAME value: {{ .Values.banzai.celeryTaskQueueName | quote }} +- name: CELERY_STACK_QUEUE_NAME + value: {{ .Values.banzai.celeryStackQueueName | quote }} - name: BANZAI_WORKER_LOGLEVEL value: {{ .Values.banzai.banzaiWorkerLogLevel | quote }} {{- end -}} diff --git a/helm-chart/banzai/templates/stacker.yaml b/helm-chart/banzai/templates/stacker.yaml new file mode 100644 index 00000000..19691860 --- /dev/null +++ b/helm-chart/banzai/templates/stacker.yaml @@ -0,0 +1,70 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "banzai.fullname" . -}} -workers + labels: +{{ include "banzai.labels" . | indent 4 }} +spec: + selector: + matchLabels: + app.kubernetes.io/name: {{ include "banzai.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} + template: + metadata: + labels: + app.kubernetes.io/name: {{ include "banzai.name" . }} + app.kubernetes.io/instance: "{{ .Release.Name }}" + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + + containers: + - name: {{ .Chart.Name }} + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + command: + - "celery" + - "-A" + - "banzai" + - "worker" + - "--concurrency" + - "1" + - "-l" + - "info" + - "-Q" + - "$(CELERY_STACK_QUEUE_NAME)" + env: + - name: OMP_NUM_THREADS + value: "8" + {{- include "banzai.Env" . | nindent 12 }} + resources: + requests: + cpu: "0.3" + memory: "1Gi" + limits: + cpu: "8" + memory: "16Gi" + volumeMounts: + - name: tmp + mountPath: /tmp + readOnly: false + volumes: + - name: tmp + emptyDir: + sizeLimit: 100Gi + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + tolerations: + - dedicated: "banzai-disk" diff --git a/helm-chart/banzai/values-dev.yaml b/helm-chart/banzai/values-dev.yaml index 3db77471..752fd94a 100644 --- a/helm-chart/banzai/values-dev.yaml +++ b/helm-chart/banzai/values-dev.yaml @@ -29,6 +29,7 @@ banzai: fitsExchange: archived_fits queueName: banzai_dev_pipeline celeryTaskQueueName: banzai_imaging + celeryStackQueueName: banzai_imaging_stacking # CronJob configuration to periodically update instrument table in BANZAI DB instrumentTableCronjob: diff --git a/helm-chart/banzai/values-prod.yaml b/helm-chart/banzai/values-prod.yaml index c65277e7..63634976 100644 --- a/helm-chart/banzai/values-prod.yaml +++ b/helm-chart/banzai/values-prod.yaml @@ -38,6 +38,7 @@ banzai: fitsExchange: archived_fits queueName: banzai_pipeline celeryTaskQueueName: banzai_imaging + celeryStackQueueName: banzai_imaging_stacking # CronJob configuration to periodically update instrument table in BANZAI DB instrumentTableCronjob: