From 35eb78adc5f6a7d881fd78cabd87849e35d3fdd9 Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 21 Jun 2024 12:05:04 +0200 Subject: [PATCH] feat: allow to disable queue workers --- src/config.ts | 2 ++ src/queue/queue.ts | 5 ++++- src/worker.ts | 54 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 src/worker.ts diff --git a/src/config.ts b/src/config.ts index 016f4182..3df35300 100644 --- a/src/config.ts +++ b/src/config.ts @@ -59,6 +59,7 @@ type StorageConfigType = { logflareApiKey?: string logflareSourceToken?: string pgQueueEnable: boolean + pgQueueEnableWorkers?: boolean pgQueueConnectionURL?: string pgQueueDeleteAfterDays?: number pgQueueArchiveCompletedAfterSeconds?: number @@ -303,6 +304,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { // Queue pgQueueEnable: getOptionalConfigFromEnv('PG_QUEUE_ENABLE', 'ENABLE_QUEUE_EVENTS') === 'true', + pgQueueEnableWorkers: getOptionalConfigFromEnv('PG_QUEUE_WORKERS_ENABLE') !== 'false', pgQueueConnectionURL: getOptionalConfigFromEnv('PG_QUEUE_CONNECTION_URL'), pgQueueDeleteAfterDays: parseInt( getOptionalConfigFromEnv('PG_QUEUE_DELETE_AFTER_DAYS') || '2', diff --git a/src/queue/queue.ts b/src/queue/queue.ts index c8778869..6710a64b 100644 --- a/src/queue/queue.ts +++ b/src/queue/queue.ts @@ -27,6 +27,7 @@ export abstract class Queue { pgQueueDeleteAfterDays, pgQueueArchiveCompletedAfterSeconds, pgQueueRetentionDays, + pgQueueEnableWorkers, } = getConfig() let url = pgQueueConnectionURL ?? databaseURL @@ -52,7 +53,9 @@ export abstract class Queue { expireInHours: 48, }) - registerWorkers() + if (pgQueueEnableWorkers) { + registerWorkers() + } Queue.pgBoss.on('error', (error) => { logSchema.error(logger, '[Queue] error', { diff --git a/src/worker.ts b/src/worker.ts new file mode 100644 index 00000000..cf233159 --- /dev/null +++ b/src/worker.ts @@ -0,0 +1,54 @@ +import { Queue } from './queue' +import { logger, logSchema } from './monitoring' +import adminApp from './admin-app' +import { getConfig } from './config' + +export async function main() { + const { requestTraceHeader, adminPort, host } = getConfig() + + logger.info('[Queue] Starting Queue Worker') + const queue = await Queue.init() + + const server = adminApp({ + logger, + disableRequestLogging: true, + requestIdHeader: requestTraceHeader, + }) + + process.on('SIGTERM', async () => { + logger.info('[Worker] Stopping') + await server.close() + await Queue.stop() + }) + + await server.listen({ port: adminPort, host }) + + return new Promise((resolve, reject) => { + queue.on('error', (err) => { + logger.info('[Queue] Error', err) + reject(err) + }) + + queue.on('stopped', () => { + logger.info('[Queue] Stopping') + resolve() + }) + }) +} + +process.on('uncaughtException', (e) => { + logSchema.error(logger, 'uncaught exception', { + type: 'uncaughtException', + error: e, + }) + logger.flush() + process.exit(1) +}) + +main() + .then(() => { + logger.info('[Queue] Worker Exited Successfully') + }) + .catch(() => { + process.exit(1) + })