Skip to content

Commit

Permalink
fix: more robust shutdown process
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jun 26, 2024
1 parent 187251c commit 86a3ed9
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 136 deletions.
45 changes: 45 additions & 0 deletions src/internal/concurrency/async-abort-controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* This special AbortController is used to wait for all the abort handlers to finish before resolving the promise.
*/
export class AsyncAbortController extends AbortController {
protected promises: Promise<any>[] = []

constructor() {
super()
const originalEventListener = this.signal.addEventListener

// Patch event addEventListener to keep track of listeners and their promises
this.signal.addEventListener = (type: string, listener: any, options: any) => {
if (type !== 'abort') {
return originalEventListener.call(this.signal, type, listener, options)
}

let resolving: undefined | (() => Promise<void>) = undefined
const promise = new Promise<void>(async (resolve, reject) => {
resolving = async (): Promise<void> => {
try {
const result = await listener()
resolve(result)
} catch (e) {
reject(e)
}
}
})
this.promises.push(promise)

if (!resolving) {
throw new Error('resolve is undefined')
}

return originalEventListener.call(this.signal, type, resolving, options)
}
}

async abortAsync() {
this.abort()
while (this.promises.length > 0) {
const promises = this.promises.splice(0, 100)
await Promise.allSettled(promises)
}
}
}
1 change: 1 addition & 0 deletions src/internal/concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './mutex'
export * from './async-abort-controller'
4 changes: 4 additions & 0 deletions src/internal/database/migrations/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { listTenantsToMigrate } from '../tenant'
import { multitenantKnex } from '../multitenant-db'
import { ProgressiveMigrations } from './progressive'
import { RunMigrationsOnTenants } from '@storage/events'
import { ERRORS } from '@internal/errors'

const {
multitenantDatabaseUrl,
Expand Down Expand Up @@ -49,6 +50,9 @@ export const progressiveMigrations = new ProgressiveMigrations({
* @param signal
*/
export function startAsyncMigrations(signal: AbortSignal) {
if (signal.aborted) {
throw ERRORS.Aborted('Migration aborted')
}
switch (dbMigrationStrategy) {
case MultitenantMigrationStrategy.ON_REQUEST:
return
Expand Down
9 changes: 9 additions & 0 deletions src/internal/errors/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export enum ErrorCode {
MissingPart = 'MissingPart',
SlowDown = 'SlowDown',
TusError = 'TusError',
Aborted = 'Aborted',
}

export const ERRORS = {
Expand Down Expand Up @@ -363,6 +364,14 @@ export const ERRORS = {
httpStatusCode: 400,
message: `Part ${partNumber} is missing for upload id ${uploadId}`,
}),

Aborted: (message: string, originalError?: unknown) =>
new StorageBackendError({
code: ErrorCode.Aborted,
httpStatusCode: 500,
message: message,
originalError,
}),
}

export function isStorageError(errorType: ErrorCode, error: any): error is StorageBackendError {
Expand Down
13 changes: 12 additions & 1 deletion src/internal/pubsub/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import createSubscriber, { Subscriber } from 'pg-listen'
import { PubSubAdapter } from './adapter'
import EventEmitter from 'events'
import { ERRORS } from '@internal/errors'

export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
isConnected = false
Expand All @@ -22,10 +23,20 @@ export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
})
}

async connect(): Promise<void> {
async connect(opts?: { signal?: AbortSignal }): Promise<void> {
if (opts?.signal?.aborted) {
throw ERRORS.Aborted('Postgres pubsub connection aborted')
}

await this.subscriber.connect()
this.isConnected = true

if (opts?.signal) {
opts.signal.addEventListener('abort', async () => {
await this.close()
})
}

await Promise.all(
this.subscriber.notifications.eventNames().map(async (channel) => {
return this.subscriber.listenTo(channel as string)
Expand Down
38 changes: 31 additions & 7 deletions src/internal/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { getConfig } from '../../config'
import { BaseEvent, BasePayload } from '../../storage/events'
import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics'
import { logger, logSchema } from '../monitoring'
import { ERRORS } from '@internal/errors'

//eslint-disable-next-line @typescript-eslint/no-explicit-any
type SubclassOfBaseClass = (new (payload: any) => BaseEvent<any>) & {
Expand All @@ -13,11 +14,15 @@ export abstract class Queue {
protected static events: SubclassOfBaseClass[] = []
private static pgBoss?: PgBoss

static async init() {
static async init(opts: { signal?: AbortSignal; onMessage?: (job: Job) => void }) {
if (Queue.pgBoss) {
return Queue.pgBoss
}

if (opts.signal?.aborted) {
throw ERRORS.Aborted('Cannot start queue with aborted signal')
}

const {
isMultitenant,
databaseURL,
Expand Down Expand Up @@ -59,7 +64,18 @@ export abstract class Queue {
})

await Queue.pgBoss.start()
await Queue.startWorkers()
await Queue.startWorkers(opts.onMessage)

if (opts.signal) {
opts.signal.addEventListener('abort', async () => {
return Queue.stop().catch((e) => {
logSchema.error(logger, '[Queue] Error while stopping queue', {
error: e,
type: 'queue',
})
})
})
}

return Queue.pgBoss
}
Expand All @@ -85,25 +101,29 @@ export abstract class Queue {

await boss.stop({
timeout: 20 * 1000,
graceful: true,
destroy: true,
})

await new Promise((resolve) => {
boss.once('stopped', () => resolve(null))
boss.once('stopped', () => {
resolve(null)
})
})

Queue.pgBoss = undefined
}

protected static startWorkers() {
protected static startWorkers(onMessage?: (job: Job) => void) {
const workers: Promise<string>[] = []

Queue.events.forEach((event) => {
workers.push(Queue.registerTask(event.getQueueName(), event, true))
workers.push(Queue.registerTask(event.getQueueName(), event, true, onMessage))

const slowRetryQueue = event.withSlowRetryQueue()

if (slowRetryQueue) {
workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false))
workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false, onMessage))
}
})

Expand All @@ -113,14 +133,18 @@ export abstract class Queue {
protected static registerTask(
queueName: string,
event: SubclassOfBaseClass,
slowRetryQueueOnFail?: boolean
slowRetryQueueOnFail?: boolean,
onMessage?: (job: Job) => void
) {
const hasSlowRetryQueue = event.withSlowRetryQueue()
return Queue.getInstance().work(
queueName,
event.getWorkerOptions(),
async (job: Job<BasePayload>) => {
try {
if (onMessage) {
onMessage(job)
}
const res = await event.handle(job)

QueueJobCompleted.inc({
Expand Down
Loading

0 comments on commit 86a3ed9

Please sign in to comment.