Skip to content

Commit

Permalink
fix: more robust shutdown process
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jun 27, 2024
1 parent 187251c commit 797f903
Show file tree
Hide file tree
Showing 13 changed files with 453 additions and 139 deletions.
73 changes: 73 additions & 0 deletions src/internal/concurrency/async-abort-controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* This special AbortController is used to wait for all the abort handlers to finish before resolving the promise.
*/
export class AsyncAbortController extends AbortController {
protected promises: Promise<any>[] = []
protected priority = 0
protected groups = new Map<number, AsyncAbortController[]>()

constructor() {
super()

const originalEventListener = this.signal.addEventListener

// Patch event addEventListener to keep track of listeners and their promises
this.signal.addEventListener = (type: string, listener: any, options: any) => {
if (type !== 'abort') {
return originalEventListener.call(this.signal, type, listener, options)
}

let resolving: undefined | (() => Promise<void>) = undefined
const promise = new Promise<void>(async (resolve, reject) => {
resolving = async (): Promise<void> => {
try {
const result = await listener()
resolve(result)
} catch (e) {
reject(e)
}
}
})
this.promises.push(promise)

if (!resolving) {
throw new Error('resolve is undefined')
}

return originalEventListener.call(this.signal, type, resolving, options)
}
}

protected _nextGroup?: AsyncAbortController

get nextGroup() {
if (!this._nextGroup) {
this._nextGroup = new AsyncAbortController()
this._nextGroup.priority = this.priority + 1
}

let existingGroups = this.groups.get(this._nextGroup.priority)
if (!existingGroups) {
existingGroups = []
}

existingGroups.push(this._nextGroup)
this.groups.set(this._nextGroup.priority, existingGroups)
return this._nextGroup
}

async abortAsync() {
this.abort()
while (this.promises.length > 0) {
const promises = this.promises.splice(0, 100)
await Promise.allSettled(promises)
}
await this.abortGroups()
}

protected async abortGroups() {
for (const [, group] of this.groups) {
await Promise.allSettled(group.map((g) => g.abortAsync()))
}
}
}
1 change: 1 addition & 0 deletions src/internal/concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './mutex'
export * from './async-abort-controller'
12 changes: 10 additions & 2 deletions src/internal/database/migrations/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { listTenantsToMigrate } from '../tenant'
import { multitenantKnex } from '../multitenant-db'
import { ProgressiveMigrations } from './progressive'
import { RunMigrationsOnTenants } from '@storage/events'
import { ERRORS } from '@internal/errors'

const {
multitenantDatabaseUrl,
Expand Down Expand Up @@ -49,6 +50,9 @@ export const progressiveMigrations = new ProgressiveMigrations({
* @param signal
*/
export function startAsyncMigrations(signal: AbortSignal) {
if (signal.aborted) {
throw ERRORS.Aborted('Migration aborted')
}
switch (dbMigrationStrategy) {
case MultitenantMigrationStrategy.ON_REQUEST:
return
Expand Down Expand Up @@ -127,14 +131,18 @@ export async function runMigrationsOnAllTenants(signal: AbortSignal) {
* Runs multi-tenant migrations
*/
export async function runMultitenantMigrations(): Promise<void> {
logger.info('Running multitenant migrations')
logSchema.info(logger, '[Migrations] Running multitenant migrations', {
type: 'migrations',
})
await connectAndMigrate({
databaseUrl: multitenantDatabaseUrl,
migrationsDirectory: './migrations/multitenant',
shouldCreateStorageSchema: false,
waitForLock: true,
})
logger.info('Multitenant migrations completed')
logSchema.info(logger, '[Migrations] Completed', {
type: 'migrations',
})
}

/**
Expand Down
3 changes: 3 additions & 0 deletions src/internal/database/migrations/progressive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ export class ProgressiveMigrations {
signal.addEventListener('abort', () => {
if (this.watchInterval) {
clearInterval(this.watchInterval)
logSchema.info(logger, '[Migrations] Stopping', {
type: 'migrations',
})
this.drain().catch((e) => {
logSchema.error(logger, '[Migrations] Error creating migration jobs', {
type: 'migrations',
Expand Down
9 changes: 9 additions & 0 deletions src/internal/errors/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export enum ErrorCode {
MissingPart = 'MissingPart',
SlowDown = 'SlowDown',
TusError = 'TusError',
Aborted = 'Aborted',
}

export const ERRORS = {
Expand Down Expand Up @@ -363,6 +364,14 @@ export const ERRORS = {
httpStatusCode: 400,
message: `Part ${partNumber} is missing for upload id ${uploadId}`,
}),

Aborted: (message: string, originalError?: unknown) =>
new StorageBackendError({
code: ErrorCode.Aborted,
httpStatusCode: 500,
message: message,
originalError,
}),
}

export function isStorageError(errorType: ErrorCode, error: any): error is StorageBackendError {
Expand Down
17 changes: 15 additions & 2 deletions src/internal/monitoring/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base'
import * as grpc from '@grpc/grpc-js'
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'
import { IncomingMessage } from 'http'
import { logger, logSchema } from '@internal/monitoring/logger'

const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || ''

Expand Down Expand Up @@ -125,9 +126,21 @@ if (process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) {

// Gracefully shutdown the SDK on process exit
process.on('SIGTERM', () => {
logSchema.info(logger, '[Otel] Stopping', {
type: 'otel',
})
sdk
.shutdown()
.then(() => console.log('Tracing terminated'))
.catch((error) => console.error('Error terminating tracing', error))
.then(() => {
logSchema.info(logger, '[Otel] Exited', {
type: 'otel',
})
})
.catch((error) =>
logSchema.error(logger, '[Otel] Shutdown error', {
type: 'otel',
error: error,
})
)
})
}
2 changes: 1 addition & 1 deletion src/internal/pubsub/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface PubSubAdapter {
connect(): Promise<void>
start(): Promise<void>
publish(channel: string, message: any): Promise<void>
subscribe(channel: string, cb: (message: any) => void): Promise<void>
unsubscribe(channel: string, cb: (message: any) => void): Promise<void>
Expand Down
30 changes: 26 additions & 4 deletions src/internal/pubsub/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import EventEmitter from 'events'
import createSubscriber, { Subscriber } from 'pg-listen'
import { ERRORS } from '@internal/errors'
import { logger, logSchema } from '@internal/monitoring'
import { PubSubAdapter } from './adapter'
import EventEmitter from 'events'

export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
isConnected = false
Expand All @@ -22,22 +24,42 @@ export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
})
}

async connect(): Promise<void> {
async start(opts?: { signal?: AbortSignal }): Promise<void> {
if (opts?.signal?.aborted) {
throw ERRORS.Aborted('Postgres pubsub connection aborted')
}

await this.subscriber.connect()
this.isConnected = true

if (opts?.signal) {
opts.signal.addEventListener(
'abort',
async () => {
logSchema.info(logger, '[PubSub] Stopping', {
type: 'pubsub',
})
await this.close()
},
{ once: true }
)
}

await Promise.all(
this.subscriber.notifications.eventNames().map(async (channel) => {
return this.subscriber.listenTo(channel as string)
})
)
}

close(): Promise<void> {
async close(): Promise<void> {
this.subscriber.notifications.eventNames().forEach((event) => {
this.subscriber.notifications.removeAllListeners(event)
})
return this.subscriber.close()
await this.subscriber.close()
logSchema.info(logger, '[PubSub] Exited', {
type: 'pubsub',
})
}

async publish(channel: string, payload: unknown): Promise<void> {
Expand Down
61 changes: 54 additions & 7 deletions src/internal/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { getConfig } from '../../config'
import { BaseEvent, BasePayload } from '../../storage/events'
import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics'
import { logger, logSchema } from '../monitoring'
import { ERRORS } from '@internal/errors'

//eslint-disable-next-line @typescript-eslint/no-explicit-any
type SubclassOfBaseClass = (new (payload: any) => BaseEvent<any>) & {
Expand All @@ -13,11 +14,19 @@ export abstract class Queue {
protected static events: SubclassOfBaseClass[] = []
private static pgBoss?: PgBoss

static async init() {
static async start(opts: {
signal?: AbortSignal
onMessage?: (job: Job) => void
registerWorkers?: () => void
}) {
if (Queue.pgBoss) {
return Queue.pgBoss
}

if (opts.signal?.aborted) {
throw ERRORS.Aborted('Cannot start queue with aborted signal')
}

const {
isMultitenant,
databaseURL,
Expand All @@ -26,6 +35,7 @@ export abstract class Queue {
pgQueueDeleteAfterDays,
pgQueueArchiveCompletedAfterSeconds,
pgQueueRetentionDays,
pgQueueEnableWorkers,
} = getConfig()

let url = pgQueueConnectionURL ?? databaseURL
Expand Down Expand Up @@ -59,7 +69,36 @@ export abstract class Queue {
})

await Queue.pgBoss.start()
await Queue.startWorkers()

if (opts.registerWorkers && pgQueueEnableWorkers) {
opts.registerWorkers()
}

await Queue.startWorkers(opts.onMessage)

if (opts.signal) {
opts.signal.addEventListener(
'abort',
async () => {
logSchema.info(logger, '[Queue] Stopping', {
type: 'queue',
})
return Queue.stop()
.then(() => {
logSchema.info(logger, '[Queue] Exited', {
type: 'queue',
})
})
.catch((e) => {
logSchema.error(logger, '[Queue] Error while stopping queue', {
error: e,
type: 'queue',
})
})
},
{ once: true }
)
}

return Queue.pgBoss
}
Expand All @@ -85,25 +124,29 @@ export abstract class Queue {

await boss.stop({
timeout: 20 * 1000,
graceful: true,
destroy: true,
})

await new Promise((resolve) => {
boss.once('stopped', () => resolve(null))
boss.once('stopped', () => {
resolve(null)
})
})

Queue.pgBoss = undefined
}

protected static startWorkers() {
protected static startWorkers(onMessage?: (job: Job) => void) {
const workers: Promise<string>[] = []

Queue.events.forEach((event) => {
workers.push(Queue.registerTask(event.getQueueName(), event, true))
workers.push(Queue.registerTask(event.getQueueName(), event, true, onMessage))

const slowRetryQueue = event.withSlowRetryQueue()

if (slowRetryQueue) {
workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false))
workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false, onMessage))
}
})

Expand All @@ -113,14 +156,18 @@ export abstract class Queue {
protected static registerTask(
queueName: string,
event: SubclassOfBaseClass,
slowRetryQueueOnFail?: boolean
slowRetryQueueOnFail?: boolean,
onMessage?: (job: Job) => void
) {
const hasSlowRetryQueue = event.withSlowRetryQueue()
return Queue.getInstance().work(
queueName,
event.getWorkerOptions(),
async (job: Job<BasePayload>) => {
try {
if (onMessage) {
onMessage(job)
}
const res = await event.handle(job)

QueueJobCompleted.inc({
Expand Down
Loading

0 comments on commit 797f903

Please sign in to comment.