Skip to content

Commit

Permalink
fix: increase queue concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Oct 2, 2023
1 parent f1076a0 commit e564d3d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 3 deletions.
9 changes: 9 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type StorageConfigType = {
webhookURL?: string
webhookApiKey?: string
webhookQueuePullInterval?: number
webhookQueueTeamSize?: number
webhookQueueConcurrency?: number
adminDeleteQueueTeamSize?: number
adminDeleteConcurrency?: number
enableImageTransformation: boolean
imgProxyURL?: string
imgProxyRequestTimeout: number
Expand Down Expand Up @@ -146,6 +150,11 @@ export function getConfig(): StorageConfigType {
webhookQueuePullInterval: parseInt(
getOptionalConfigFromEnv('WEBHOOK_QUEUE_PULL_INTERVAL') || '700'
),
webhookQueueTeamSize: parseInt('QUEUE_WEBHOOKS_TEAM_SIZE') || 50,
webhookQueueConcurrency: parseInt('QUEUE_WEBHOOK_CONCURRENCY') || 5,
adminDeleteQueueTeamSize: parseInt('QUEUE_ADMIN_DELETE_TEAM_SIZE') || 50,
adminDeleteConcurrency: parseInt('QUEUE_ADMIN_DELETE_CONCURRENCY') || 5,

enableImageTransformation: getOptionalConfigFromEnv('ENABLE_IMAGE_TRANSFORMATION') === 'true',
imgProxyRequestTimeout: parseInt(
getOptionalConfigFromEnv('IMGPROXY_REQUEST_TIMEOUT') || '15',
Expand Down
11 changes: 9 additions & 2 deletions src/queue/events/object-admin-delete.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BaseEvent, BasePayload } from './base-event'
import { getConfig } from '../../config'
import { Job } from 'pg-boss'
import { Job, WorkOptions } from 'pg-boss'
import { withOptionalVersion } from '../../storage/backend'
import { logger } from '../../monitoring'

Expand All @@ -10,11 +10,18 @@ export interface ObjectDeleteEvent extends BasePayload {
version?: string
}

const { globalS3Bucket } = getConfig()
const { globalS3Bucket, adminDeleteQueueTeamSize, adminDeleteConcurrency } = getConfig()

export class ObjectAdminDelete extends BaseEvent<ObjectDeleteEvent> {
static queueName = 'object:admin:delete'

static getWorkerOptions(): WorkOptions {
return {
teamSize: adminDeleteQueueTeamSize,
teamConcurrency: adminDeleteConcurrency,
}
}

static async handle(job: Job<ObjectDeleteEvent>) {
logger.info({ job: JSON.stringify(job) }, 'Handling ObjectAdminDelete')

Expand Down
10 changes: 9 additions & 1 deletion src/queue/events/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import axios from 'axios'
import { getConfig } from '../../config'
import { logger } from '../../monitoring'

const { webhookURL, webhookApiKey, webhookQueuePullInterval } = getConfig()
const {
webhookURL,
webhookApiKey,
webhookQueuePullInterval,
webhookQueueTeamSize,
webhookQueueConcurrency,
} = getConfig()

interface WebhookEvent {
event: {
Expand All @@ -26,6 +32,8 @@ export class Webhook extends BaseEvent<WebhookEvent> {
static getWorkerOptions(): WorkOptions {
return {
newJobCheckInterval: webhookQueuePullInterval,
teamSize: webhookQueueTeamSize,
teamConcurrency: webhookQueueConcurrency,
}
}

Expand Down

0 comments on commit e564d3d

Please sign in to comment.