From e564d3d912fa19d1574e4e3e4955fadb8702b310 Mon Sep 17 00:00:00 2001 From: fenos Date: Sun, 1 Oct 2023 23:34:34 +0100 Subject: [PATCH] fix: increase queue concurrency --- src/config.ts | 9 +++++++++ src/queue/events/object-admin-delete.ts | 11 +++++++++-- src/queue/events/webhook.ts | 10 +++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/config.ts b/src/config.ts index 9b9cbafe..cc379c2d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -42,6 +42,10 @@ type StorageConfigType = { webhookURL?: string webhookApiKey?: string webhookQueuePullInterval?: number + webhookQueueTeamSize?: number + webhookQueueConcurrency?: number + adminDeleteQueueTeamSize?: number + adminDeleteConcurrency?: number enableImageTransformation: boolean imgProxyURL?: string imgProxyRequestTimeout: number @@ -146,6 +150,11 @@ export function getConfig(): StorageConfigType { webhookQueuePullInterval: parseInt( getOptionalConfigFromEnv('WEBHOOK_QUEUE_PULL_INTERVAL') || '700' ), + webhookQueueTeamSize: parseInt('QUEUE_WEBHOOKS_TEAM_SIZE') || 50, + webhookQueueConcurrency: parseInt('QUEUE_WEBHOOK_CONCURRENCY') || 5, + adminDeleteQueueTeamSize: parseInt('QUEUE_ADMIN_DELETE_TEAM_SIZE') || 50, + adminDeleteConcurrency: parseInt('QUEUE_ADMIN_DELETE_CONCURRENCY') || 5, + enableImageTransformation: getOptionalConfigFromEnv('ENABLE_IMAGE_TRANSFORMATION') === 'true', imgProxyRequestTimeout: parseInt( getOptionalConfigFromEnv('IMGPROXY_REQUEST_TIMEOUT') || '15', diff --git a/src/queue/events/object-admin-delete.ts b/src/queue/events/object-admin-delete.ts index ad00d02f..6f635d5a 100644 --- a/src/queue/events/object-admin-delete.ts +++ b/src/queue/events/object-admin-delete.ts @@ -1,6 +1,6 @@ import { BaseEvent, BasePayload } from './base-event' import { getConfig } from '../../config' -import { Job } from 'pg-boss' +import { Job, WorkOptions } from 'pg-boss' import { withOptionalVersion } from '../../storage/backend' import { logger } from '../../monitoring' @@ -10,11 +10,18 @@ export interface ObjectDeleteEvent extends BasePayload { version?: string } -const { globalS3Bucket } = getConfig() +const { globalS3Bucket, adminDeleteQueueTeamSize, adminDeleteConcurrency } = getConfig() export class ObjectAdminDelete extends BaseEvent { static queueName = 'object:admin:delete' + static getWorkerOptions(): WorkOptions { + return { + teamSize: adminDeleteQueueTeamSize, + teamConcurrency: adminDeleteConcurrency, + } + } + static async handle(job: Job) { logger.info({ job: JSON.stringify(job) }, 'Handling ObjectAdminDelete') diff --git a/src/queue/events/webhook.ts b/src/queue/events/webhook.ts index 7ddd8e6c..05982428 100644 --- a/src/queue/events/webhook.ts +++ b/src/queue/events/webhook.ts @@ -4,7 +4,13 @@ import axios from 'axios' import { getConfig } from '../../config' import { logger } from '../../monitoring' -const { webhookURL, webhookApiKey, webhookQueuePullInterval } = getConfig() +const { + webhookURL, + webhookApiKey, + webhookQueuePullInterval, + webhookQueueTeamSize, + webhookQueueConcurrency, +} = getConfig() interface WebhookEvent { event: { @@ -26,6 +32,8 @@ export class Webhook extends BaseEvent { static getWorkerOptions(): WorkOptions { return { newJobCheckInterval: webhookQueuePullInterval, + teamSize: webhookQueueTeamSize, + teamConcurrency: webhookQueueConcurrency, } }