Skip to content

Commit

Permalink
feat: allow to disable queue workers
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jun 21, 2024
1 parent c715a7b commit 790fd65
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type StorageConfigType = {
logflareApiKey?: string
logflareSourceToken?: string
pgQueueEnable: boolean
pgQueueEnableWorkers?: boolean
pgQueueConnectionURL?: string
pgQueueDeleteAfterDays?: number
pgQueueArchiveCompletedAfterSeconds?: number
Expand Down Expand Up @@ -303,6 +304,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {

// Queue
pgQueueEnable: getOptionalConfigFromEnv('PG_QUEUE_ENABLE', 'ENABLE_QUEUE_EVENTS') === 'true',
pgQueueEnableWorkers: getOptionalConfigFromEnv('PG_QUEUE_WORKERS_ENABLE') !== 'false',
pgQueueConnectionURL: getOptionalConfigFromEnv('PG_QUEUE_CONNECTION_URL'),
pgQueueDeleteAfterDays: parseInt(
getOptionalConfigFromEnv('PG_QUEUE_DELETE_AFTER_DAYS') || '2',
Expand Down
5 changes: 4 additions & 1 deletion src/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export abstract class Queue {
pgQueueDeleteAfterDays,
pgQueueArchiveCompletedAfterSeconds,
pgQueueRetentionDays,
pgQueueEnableWorkers,
} = getConfig()

let url = pgQueueConnectionURL ?? databaseURL
Expand All @@ -52,7 +53,9 @@ export abstract class Queue {
expireInHours: 48,
})

registerWorkers()
if (pgQueueEnableWorkers) {
registerWorkers()
}

Queue.pgBoss.on('error', (error) => {
logSchema.error(logger, '[Queue] error', {
Expand Down
44 changes: 44 additions & 0 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { Queue } from './queue'
import { logger } from './monitoring'
import adminApp from './admin-app'
import { getConfig } from './config'

export async function main() {
const { requestTraceHeader, adminPort, host } = getConfig()

logger.info('[Queue] Starting Queue Worker')
const queue = await Queue.init()

const server = adminApp({
logger,
disableRequestLogging: true,
requestIdHeader: requestTraceHeader,
})

process.on('SIGTERM', async () => {
await server.close()
await Queue.stop()
})

await server.listen({ port: adminPort, host })

return new Promise<void>((resolve, reject) => {
queue.on('error', (err) => {
logger.info('[Queue] Error', err)
reject(err)
})

queue.on('stopped', () => {
logger.info('[Queue] Stopping')
resolve()
})
})
}

main()
.then(() => {
logger.info('[Queue] Worker Exited Successfully')
})
.catch(() => {
process.exit(1)
})

0 comments on commit 790fd65

Please sign in to comment.