Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More robust shutdown process #511

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions src/internal/concurrency/async-abort-controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* This special AbortController is used to wait for all the abort handlers to finish before resolving the promise.
*/
export class AsyncAbortController extends AbortController {
protected promises: Promise<any>[] = []
protected priority = 0
protected groups = new Map<number, AsyncAbortController[]>()

constructor() {
super()

const originalEventListener = this.signal.addEventListener

// Patch event addEventListener to keep track of listeners and their promises
this.signal.addEventListener = (type: string, listener: any, options: any) => {
if (type !== 'abort') {
return originalEventListener.call(this.signal, type, listener, options)
}

let resolving: undefined | (() => Promise<void>) = undefined
const promise = new Promise<void>(async (resolve, reject) => {
resolving = async (): Promise<void> => {
try {
const result = await listener()
resolve(result)
} catch (e) {
reject(e)
}
}
})
this.promises.push(promise)

if (!resolving) {
throw new Error('resolve is undefined')
}

return originalEventListener.call(this.signal, type, resolving, options)
}
}

protected _nextGroup?: AsyncAbortController

get nextGroup() {
if (!this._nextGroup) {
this._nextGroup = new AsyncAbortController()
this._nextGroup.priority = this.priority + 1
}

let existingGroups = this.groups.get(this._nextGroup.priority)
if (!existingGroups) {
existingGroups = []
}

existingGroups.push(this._nextGroup)
this.groups.set(this._nextGroup.priority, existingGroups)
return this._nextGroup
}

async abortAsync() {
this.abort()
while (this.promises.length > 0) {
const promises = this.promises.splice(0, 100)
await Promise.allSettled(promises)
}
await this.abortGroups()
}

protected async abortGroups() {
for (const [, group] of this.groups) {
await Promise.allSettled(group.map((g) => g.abortAsync()))
}
}
}
1 change: 1 addition & 0 deletions src/internal/concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './mutex'
export * from './async-abort-controller'
12 changes: 10 additions & 2 deletions src/internal/database/migrations/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { listTenantsToMigrate } from '../tenant'
import { multitenantKnex } from '../multitenant-db'
import { ProgressiveMigrations } from './progressive'
import { RunMigrationsOnTenants } from '@storage/events'
import { ERRORS } from '@internal/errors'

const {
multitenantDatabaseUrl,
Expand Down Expand Up @@ -49,6 +50,9 @@ export const progressiveMigrations = new ProgressiveMigrations({
* @param signal
*/
export function startAsyncMigrations(signal: AbortSignal) {
if (signal.aborted) {
throw ERRORS.Aborted('Migration aborted')
}
switch (dbMigrationStrategy) {
case MultitenantMigrationStrategy.ON_REQUEST:
return
Expand Down Expand Up @@ -127,14 +131,18 @@ export async function runMigrationsOnAllTenants(signal: AbortSignal) {
* Runs multi-tenant migrations
*/
export async function runMultitenantMigrations(): Promise<void> {
logger.info('Running multitenant migrations')
logSchema.info(logger, '[Migrations] Running multitenant migrations', {
type: 'migrations',
})
await connectAndMigrate({
databaseUrl: multitenantDatabaseUrl,
migrationsDirectory: './migrations/multitenant',
shouldCreateStorageSchema: false,
waitForLock: true,
})
logger.info('Multitenant migrations completed')
logSchema.info(logger, '[Migrations] Completed', {
type: 'migrations',
})
}

/**
Expand Down
3 changes: 3 additions & 0 deletions src/internal/database/migrations/progressive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ export class ProgressiveMigrations {
signal.addEventListener('abort', () => {
if (this.watchInterval) {
clearInterval(this.watchInterval)
logSchema.info(logger, '[Migrations] Stopping', {
type: 'migrations',
})
this.drain().catch((e) => {
logSchema.error(logger, '[Migrations] Error creating migration jobs', {
type: 'migrations',
Expand Down
9 changes: 9 additions & 0 deletions src/internal/errors/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export enum ErrorCode {
MissingPart = 'MissingPart',
SlowDown = 'SlowDown',
TusError = 'TusError',
Aborted = 'Aborted',
}

export const ERRORS = {
Expand Down Expand Up @@ -363,6 +364,14 @@ export const ERRORS = {
httpStatusCode: 400,
message: `Part ${partNumber} is missing for upload id ${uploadId}`,
}),

Aborted: (message: string, originalError?: unknown) =>
new StorageBackendError({
code: ErrorCode.Aborted,
httpStatusCode: 500,
message: message,
originalError,
}),
}

export function isStorageError(errorType: ErrorCode, error: any): error is StorageBackendError {
Expand Down
17 changes: 15 additions & 2 deletions src/internal/monitoring/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base'
import * as grpc from '@grpc/grpc-js'
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'
import { IncomingMessage } from 'http'
import { logger, logSchema } from '@internal/monitoring/logger'

const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || ''

Expand Down Expand Up @@ -125,9 +126,21 @@ if (process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) {

// Gracefully shutdown the SDK on process exit
process.on('SIGTERM', () => {
logSchema.info(logger, '[Otel] Stopping', {
type: 'otel',
})
sdk
.shutdown()
.then(() => console.log('Tracing terminated'))
.catch((error) => console.error('Error terminating tracing', error))
.then(() => {
logSchema.info(logger, '[Otel] Exited', {
type: 'otel',
})
})
.catch((error) =>
logSchema.error(logger, '[Otel] Shutdown error', {
type: 'otel',
error: error,
})
)
})
}
2 changes: 1 addition & 1 deletion src/internal/pubsub/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface PubSubAdapter {
connect(): Promise<void>
start(): Promise<void>
publish(channel: string, message: any): Promise<void>
subscribe(channel: string, cb: (message: any) => void): Promise<void>
unsubscribe(channel: string, cb: (message: any) => void): Promise<void>
Expand Down
30 changes: 26 additions & 4 deletions src/internal/pubsub/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import EventEmitter from 'events'
import createSubscriber, { Subscriber } from 'pg-listen'
import { ERRORS } from '@internal/errors'
import { logger, logSchema } from '@internal/monitoring'
import { PubSubAdapter } from './adapter'
import EventEmitter from 'events'

export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
isConnected = false
Expand All @@ -22,22 +24,42 @@ export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
})
}

async connect(): Promise<void> {
async start(opts?: { signal?: AbortSignal }): Promise<void> {
if (opts?.signal?.aborted) {
throw ERRORS.Aborted('Postgres pubsub connection aborted')
}

await this.subscriber.connect()
this.isConnected = true

if (opts?.signal) {
opts.signal.addEventListener(
'abort',
async () => {
logSchema.info(logger, '[PubSub] Stopping', {
type: 'pubsub',
})
await this.close()
},
{ once: true }
)
}

await Promise.all(
this.subscriber.notifications.eventNames().map(async (channel) => {
return this.subscriber.listenTo(channel as string)
})
)
}

close(): Promise<void> {
async close(): Promise<void> {
this.subscriber.notifications.eventNames().forEach((event) => {
this.subscriber.notifications.removeAllListeners(event)
})
return this.subscriber.close()
await this.subscriber.close()
logSchema.info(logger, '[PubSub] Exited', {
type: 'pubsub',
})
}

async publish(channel: string, payload: unknown): Promise<void> {
Expand Down
61 changes: 54 additions & 7 deletions src/internal/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { getConfig } from '../../config'
import { BaseEvent, BasePayload } from '../../storage/events'
import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics'
import { logger, logSchema } from '../monitoring'
import { ERRORS } from '@internal/errors'

//eslint-disable-next-line @typescript-eslint/no-explicit-any
type SubclassOfBaseClass = (new (payload: any) => BaseEvent<any>) & {
Expand All @@ -13,11 +14,19 @@ export abstract class Queue {
protected static events: SubclassOfBaseClass[] = []
private static pgBoss?: PgBoss

static async init() {
static async start(opts: {
signal?: AbortSignal
onMessage?: (job: Job) => void
registerWorkers?: () => void
}) {
if (Queue.pgBoss) {
return Queue.pgBoss
}

if (opts.signal?.aborted) {
throw ERRORS.Aborted('Cannot start queue with aborted signal')
}

const {
isMultitenant,
databaseURL,
Expand All @@ -26,6 +35,7 @@ export abstract class Queue {
pgQueueDeleteAfterDays,
pgQueueArchiveCompletedAfterSeconds,
pgQueueRetentionDays,
pgQueueEnableWorkers,
} = getConfig()

let url = pgQueueConnectionURL ?? databaseURL
Expand Down Expand Up @@ -59,7 +69,36 @@ export abstract class Queue {
})

await Queue.pgBoss.start()
await Queue.startWorkers()

if (opts.registerWorkers && pgQueueEnableWorkers) {
opts.registerWorkers()
}

await Queue.startWorkers(opts.onMessage)

if (opts.signal) {
opts.signal.addEventListener(
'abort',
async () => {
logSchema.info(logger, '[Queue] Stopping', {
type: 'queue',
})
return Queue.stop()
.then(() => {
logSchema.info(logger, '[Queue] Exited', {
type: 'queue',
})
})
.catch((e) => {
logSchema.error(logger, '[Queue] Error while stopping queue', {
error: e,
type: 'queue',
})
})
},
{ once: true }
)
}

return Queue.pgBoss
}
Expand All @@ -85,25 +124,29 @@ export abstract class Queue {

await boss.stop({
timeout: 20 * 1000,
graceful: true,
destroy: true,
})

await new Promise((resolve) => {
boss.once('stopped', () => resolve(null))
boss.once('stopped', () => {
resolve(null)
})
})

Queue.pgBoss = undefined
}

protected static startWorkers() {
protected static startWorkers(onMessage?: (job: Job) => void) {
const workers: Promise<string>[] = []

Queue.events.forEach((event) => {
workers.push(Queue.registerTask(event.getQueueName(), event, true))
workers.push(Queue.registerTask(event.getQueueName(), event, true, onMessage))

const slowRetryQueue = event.withSlowRetryQueue()

if (slowRetryQueue) {
workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false))
workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false, onMessage))
}
})

Expand All @@ -113,14 +156,18 @@ export abstract class Queue {
protected static registerTask(
queueName: string,
event: SubclassOfBaseClass,
slowRetryQueueOnFail?: boolean
slowRetryQueueOnFail?: boolean,
onMessage?: (job: Job) => void
) {
const hasSlowRetryQueue = event.withSlowRetryQueue()
return Queue.getInstance().work(
queueName,
event.getWorkerOptions(),
async (job: Job<BasePayload>) => {
try {
if (onMessage) {
onMessage(job)
}
const res = await event.handle(job)

QueueJobCompleted.inc({
Expand Down
Loading
Loading