From 3529ae9a50d93b36000e3e7b6af4199617c72c8d Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 21 Jun 2024 12:05:04 +0200 Subject: [PATCH] feat: allow to disable queue workers --- src/config.ts | 2 ++ src/queue/queue.ts | 5 ++++- src/worker.ts | 31 +++++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 src/worker.ts diff --git a/src/config.ts b/src/config.ts index 016f4182..3df35300 100644 --- a/src/config.ts +++ b/src/config.ts @@ -59,6 +59,7 @@ type StorageConfigType = { logflareApiKey?: string logflareSourceToken?: string pgQueueEnable: boolean + pgQueueEnableWorkers?: boolean pgQueueConnectionURL?: string pgQueueDeleteAfterDays?: number pgQueueArchiveCompletedAfterSeconds?: number @@ -303,6 +304,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { // Queue pgQueueEnable: getOptionalConfigFromEnv('PG_QUEUE_ENABLE', 'ENABLE_QUEUE_EVENTS') === 'true', + pgQueueEnableWorkers: getOptionalConfigFromEnv('PG_QUEUE_WORKERS_ENABLE') !== 'false', pgQueueConnectionURL: getOptionalConfigFromEnv('PG_QUEUE_CONNECTION_URL'), pgQueueDeleteAfterDays: parseInt( getOptionalConfigFromEnv('PG_QUEUE_DELETE_AFTER_DAYS') || '2', diff --git a/src/queue/queue.ts b/src/queue/queue.ts index c8778869..6710a64b 100644 --- a/src/queue/queue.ts +++ b/src/queue/queue.ts @@ -27,6 +27,7 @@ export abstract class Queue { pgQueueDeleteAfterDays, pgQueueArchiveCompletedAfterSeconds, pgQueueRetentionDays, + pgQueueEnableWorkers, } = getConfig() let url = pgQueueConnectionURL ?? databaseURL @@ -52,7 +53,9 @@ export abstract class Queue { expireInHours: 48, }) - registerWorkers() + if (pgQueueEnableWorkers) { + registerWorkers() + } Queue.pgBoss.on('error', (error) => { logSchema.error(logger, '[Queue] error', { diff --git a/src/worker.ts b/src/worker.ts new file mode 100644 index 00000000..d4d4d7bf --- /dev/null +++ b/src/worker.ts @@ -0,0 +1,31 @@ +import { Queue } from './queue' +import { logger } from './monitoring' + +export async function main() { + logger.info('[Queue] Starting Queue Worker') + const queue = await Queue.init() + + process.on('SIGTERM', async () => { + await Queue.stop() + }) + + return new Promise((resolve, reject) => { + queue.on('error', (err) => { + logger.info('[Queue] Error', err) + reject(err) + }) + + queue.on('stopped', () => { + logger.info('[Queue] Stopping') + resolve() + }) + }) +} + +main() + .then(() => { + logger.info('[Queue] Worker Exited Successfully') + }) + .catch(() => { + process.exit(1) + })