From 797f9033796979a02d35585ab6889c23c0232b1c Mon Sep 17 00:00:00 2001 From: fenos Date: Wed, 26 Jun 2024 15:43:41 +0200 Subject: [PATCH] fix: more robust shutdown process --- .../concurrency/async-abort-controller.ts | 73 +++++++ src/internal/concurrency/index.ts | 1 + src/internal/database/migrations/migrate.ts | 12 +- .../database/migrations/progressive.ts | 3 + src/internal/errors/codes.ts | 9 + src/internal/monitoring/otel.ts | 17 +- src/internal/pubsub/adapter.ts | 2 +- src/internal/pubsub/postgres.ts | 30 ++- src/internal/queue/queue.ts | 61 +++++- src/server.ts | 187 ++++++++++-------- src/shutdown.ts | 92 +++++++++ src/test/rls.test.ts | 22 ++- src/worker.ts | 83 ++++---- 13 files changed, 453 insertions(+), 139 deletions(-) create mode 100644 src/internal/concurrency/async-abort-controller.ts create mode 100644 src/shutdown.ts diff --git a/src/internal/concurrency/async-abort-controller.ts b/src/internal/concurrency/async-abort-controller.ts new file mode 100644 index 00000000..65cb5238 --- /dev/null +++ b/src/internal/concurrency/async-abort-controller.ts @@ -0,0 +1,73 @@ +/** + * This special AbortController is used to wait for all the abort handlers to finish before resolving the promise. + */ +export class AsyncAbortController extends AbortController { + protected promises: Promise[] = [] + protected priority = 0 + protected groups = new Map() + + constructor() { + super() + + const originalEventListener = this.signal.addEventListener + + // Patch event addEventListener to keep track of listeners and their promises + this.signal.addEventListener = (type: string, listener: any, options: any) => { + if (type !== 'abort') { + return originalEventListener.call(this.signal, type, listener, options) + } + + let resolving: undefined | (() => Promise) = undefined + const promise = new Promise(async (resolve, reject) => { + resolving = async (): Promise => { + try { + const result = await listener() + resolve(result) + } catch (e) { + reject(e) + } + } + }) + this.promises.push(promise) + + if (!resolving) { + throw new Error('resolve is undefined') + } + + return originalEventListener.call(this.signal, type, resolving, options) + } + } + + protected _nextGroup?: AsyncAbortController + + get nextGroup() { + if (!this._nextGroup) { + this._nextGroup = new AsyncAbortController() + this._nextGroup.priority = this.priority + 1 + } + + let existingGroups = this.groups.get(this._nextGroup.priority) + if (!existingGroups) { + existingGroups = [] + } + + existingGroups.push(this._nextGroup) + this.groups.set(this._nextGroup.priority, existingGroups) + return this._nextGroup + } + + async abortAsync() { + this.abort() + while (this.promises.length > 0) { + const promises = this.promises.splice(0, 100) + await Promise.allSettled(promises) + } + await this.abortGroups() + } + + protected async abortGroups() { + for (const [, group] of this.groups) { + await Promise.allSettled(group.map((g) => g.abortAsync())) + } + } +} diff --git a/src/internal/concurrency/index.ts b/src/internal/concurrency/index.ts index 3e525a78..86792b45 100644 --- a/src/internal/concurrency/index.ts +++ b/src/internal/concurrency/index.ts @@ -1 +1,2 @@ export * from './mutex' +export * from './async-abort-controller' diff --git a/src/internal/database/migrations/migrate.ts b/src/internal/database/migrations/migrate.ts index ed101ec0..b78746a7 100644 --- a/src/internal/database/migrations/migrate.ts +++ b/src/internal/database/migrations/migrate.ts @@ -11,6 +11,7 @@ import { listTenantsToMigrate } from '../tenant' import { multitenantKnex } from '../multitenant-db' import { ProgressiveMigrations } from './progressive' import { RunMigrationsOnTenants } from '@storage/events' +import { ERRORS } from '@internal/errors' const { multitenantDatabaseUrl, @@ -49,6 +50,9 @@ export const progressiveMigrations = new ProgressiveMigrations({ * @param signal */ export function startAsyncMigrations(signal: AbortSignal) { + if (signal.aborted) { + throw ERRORS.Aborted('Migration aborted') + } switch (dbMigrationStrategy) { case MultitenantMigrationStrategy.ON_REQUEST: return @@ -127,14 +131,18 @@ export async function runMigrationsOnAllTenants(signal: AbortSignal) { * Runs multi-tenant migrations */ export async function runMultitenantMigrations(): Promise { - logger.info('Running multitenant migrations') + logSchema.info(logger, '[Migrations] Running multitenant migrations', { + type: 'migrations', + }) await connectAndMigrate({ databaseUrl: multitenantDatabaseUrl, migrationsDirectory: './migrations/multitenant', shouldCreateStorageSchema: false, waitForLock: true, }) - logger.info('Multitenant migrations completed') + logSchema.info(logger, '[Migrations] Completed', { + type: 'migrations', + }) } /** diff --git a/src/internal/database/migrations/progressive.ts b/src/internal/database/migrations/progressive.ts index 4b08299e..9ae647e8 100644 --- a/src/internal/database/migrations/progressive.ts +++ b/src/internal/database/migrations/progressive.ts @@ -19,6 +19,9 @@ export class ProgressiveMigrations { signal.addEventListener('abort', () => { if (this.watchInterval) { clearInterval(this.watchInterval) + logSchema.info(logger, '[Migrations] Stopping', { + type: 'migrations', + }) this.drain().catch((e) => { logSchema.error(logger, '[Migrations] Error creating migration jobs', { type: 'migrations', diff --git a/src/internal/errors/codes.ts b/src/internal/errors/codes.ts index 2216430b..ac6ca84c 100644 --- a/src/internal/errors/codes.ts +++ b/src/internal/errors/codes.ts @@ -36,6 +36,7 @@ export enum ErrorCode { MissingPart = 'MissingPart', SlowDown = 'SlowDown', TusError = 'TusError', + Aborted = 'Aborted', } export const ERRORS = { @@ -363,6 +364,14 @@ export const ERRORS = { httpStatusCode: 400, message: `Part ${partNumber} is missing for upload id ${uploadId}`, }), + + Aborted: (message: string, originalError?: unknown) => + new StorageBackendError({ + code: ErrorCode.Aborted, + httpStatusCode: 500, + message: message, + originalError, + }), } export function isStorageError(errorType: ErrorCode, error: any): error is StorageBackendError { diff --git a/src/internal/monitoring/otel.ts b/src/internal/monitoring/otel.ts index b966a6e2..1c53ae5f 100644 --- a/src/internal/monitoring/otel.ts +++ b/src/internal/monitoring/otel.ts @@ -21,6 +21,7 @@ import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base' import * as grpc from '@grpc/grpc-js' import { HttpInstrumentation } from '@opentelemetry/instrumentation-http' import { IncomingMessage } from 'http' +import { logger, logSchema } from '@internal/monitoring/logger' const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || '' @@ -125,9 +126,21 @@ if (process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) { // Gracefully shutdown the SDK on process exit process.on('SIGTERM', () => { + logSchema.info(logger, '[Otel] Stopping', { + type: 'otel', + }) sdk .shutdown() - .then(() => console.log('Tracing terminated')) - .catch((error) => console.error('Error terminating tracing', error)) + .then(() => { + logSchema.info(logger, '[Otel] Exited', { + type: 'otel', + }) + }) + .catch((error) => + logSchema.error(logger, '[Otel] Shutdown error', { + type: 'otel', + error: error, + }) + ) }) } diff --git a/src/internal/pubsub/adapter.ts b/src/internal/pubsub/adapter.ts index ec8af47d..62330cdb 100644 --- a/src/internal/pubsub/adapter.ts +++ b/src/internal/pubsub/adapter.ts @@ -1,5 +1,5 @@ export interface PubSubAdapter { - connect(): Promise + start(): Promise publish(channel: string, message: any): Promise subscribe(channel: string, cb: (message: any) => void): Promise unsubscribe(channel: string, cb: (message: any) => void): Promise diff --git a/src/internal/pubsub/postgres.ts b/src/internal/pubsub/postgres.ts index bdec108b..b0b6020a 100644 --- a/src/internal/pubsub/postgres.ts +++ b/src/internal/pubsub/postgres.ts @@ -1,6 +1,8 @@ +import EventEmitter from 'events' import createSubscriber, { Subscriber } from 'pg-listen' +import { ERRORS } from '@internal/errors' +import { logger, logSchema } from '@internal/monitoring' import { PubSubAdapter } from './adapter' -import EventEmitter from 'events' export class PostgresPubSub extends EventEmitter implements PubSubAdapter { isConnected = false @@ -22,10 +24,27 @@ export class PostgresPubSub extends EventEmitter implements PubSubAdapter { }) } - async connect(): Promise { + async start(opts?: { signal?: AbortSignal }): Promise { + if (opts?.signal?.aborted) { + throw ERRORS.Aborted('Postgres pubsub connection aborted') + } + await this.subscriber.connect() this.isConnected = true + if (opts?.signal) { + opts.signal.addEventListener( + 'abort', + async () => { + logSchema.info(logger, '[PubSub] Stopping', { + type: 'pubsub', + }) + await this.close() + }, + { once: true } + ) + } + await Promise.all( this.subscriber.notifications.eventNames().map(async (channel) => { return this.subscriber.listenTo(channel as string) @@ -33,11 +52,14 @@ export class PostgresPubSub extends EventEmitter implements PubSubAdapter { ) } - close(): Promise { + async close(): Promise { this.subscriber.notifications.eventNames().forEach((event) => { this.subscriber.notifications.removeAllListeners(event) }) - return this.subscriber.close() + await this.subscriber.close() + logSchema.info(logger, '[PubSub] Exited', { + type: 'pubsub', + }) } async publish(channel: string, payload: unknown): Promise { diff --git a/src/internal/queue/queue.ts b/src/internal/queue/queue.ts index 8af874b3..81b451d8 100644 --- a/src/internal/queue/queue.ts +++ b/src/internal/queue/queue.ts @@ -3,6 +3,7 @@ import { getConfig } from '../../config' import { BaseEvent, BasePayload } from '../../storage/events' import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics' import { logger, logSchema } from '../monitoring' +import { ERRORS } from '@internal/errors' //eslint-disable-next-line @typescript-eslint/no-explicit-any type SubclassOfBaseClass = (new (payload: any) => BaseEvent) & { @@ -13,11 +14,19 @@ export abstract class Queue { protected static events: SubclassOfBaseClass[] = [] private static pgBoss?: PgBoss - static async init() { + static async start(opts: { + signal?: AbortSignal + onMessage?: (job: Job) => void + registerWorkers?: () => void + }) { if (Queue.pgBoss) { return Queue.pgBoss } + if (opts.signal?.aborted) { + throw ERRORS.Aborted('Cannot start queue with aborted signal') + } + const { isMultitenant, databaseURL, @@ -26,6 +35,7 @@ export abstract class Queue { pgQueueDeleteAfterDays, pgQueueArchiveCompletedAfterSeconds, pgQueueRetentionDays, + pgQueueEnableWorkers, } = getConfig() let url = pgQueueConnectionURL ?? databaseURL @@ -59,7 +69,36 @@ export abstract class Queue { }) await Queue.pgBoss.start() - await Queue.startWorkers() + + if (opts.registerWorkers && pgQueueEnableWorkers) { + opts.registerWorkers() + } + + await Queue.startWorkers(opts.onMessage) + + if (opts.signal) { + opts.signal.addEventListener( + 'abort', + async () => { + logSchema.info(logger, '[Queue] Stopping', { + type: 'queue', + }) + return Queue.stop() + .then(() => { + logSchema.info(logger, '[Queue] Exited', { + type: 'queue', + }) + }) + .catch((e) => { + logSchema.error(logger, '[Queue] Error while stopping queue', { + error: e, + type: 'queue', + }) + }) + }, + { once: true } + ) + } return Queue.pgBoss } @@ -85,25 +124,29 @@ export abstract class Queue { await boss.stop({ timeout: 20 * 1000, + graceful: true, + destroy: true, }) await new Promise((resolve) => { - boss.once('stopped', () => resolve(null)) + boss.once('stopped', () => { + resolve(null) + }) }) Queue.pgBoss = undefined } - protected static startWorkers() { + protected static startWorkers(onMessage?: (job: Job) => void) { const workers: Promise[] = [] Queue.events.forEach((event) => { - workers.push(Queue.registerTask(event.getQueueName(), event, true)) + workers.push(Queue.registerTask(event.getQueueName(), event, true, onMessage)) const slowRetryQueue = event.withSlowRetryQueue() if (slowRetryQueue) { - workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false)) + workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false, onMessage)) } }) @@ -113,7 +156,8 @@ export abstract class Queue { protected static registerTask( queueName: string, event: SubclassOfBaseClass, - slowRetryQueueOnFail?: boolean + slowRetryQueueOnFail?: boolean, + onMessage?: (job: Job) => void ) { const hasSlowRetryQueue = event.withSlowRetryQueue() return Queue.getInstance().work( @@ -121,6 +165,9 @@ export abstract class Queue { event.getWorkerOptions(), async (job: Job) => { try { + if (onMessage) { + onMessage(job) + } const res = await event.handle(job) QueueJobCompleted.inc({ diff --git a/src/server.ts b/src/server.ts index 0dd610e1..e60b3e8d 100644 --- a/src/server.ts +++ b/src/server.ts @@ -9,75 +9,83 @@ import { runMultitenantMigrations, runMigrationsOnTenant, startAsyncMigrations, - TenantConnection, listenForTenantUpdate, PubSub, - multitenantKnex, } from '@internal/database' import { logger, logSchema } from '@internal/monitoring' import { Queue } from '@internal/queue' import { registerWorkers } from '@storage/events' +import { AsyncAbortController } from '@internal/concurrency' +import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown' -const serverSignal = new AbortController() +const shutdownSignal = new AsyncAbortController() -process.on('uncaughtException', (e) => { - logSchema.error(logger, 'uncaught exception', { - type: 'uncaughtException', - error: e, - }) - process.exit(1) -}) +bindShutdownSignals(shutdownSignal) // Start API server main() .then(() => { - logger.info('[Server] Started Successfully') + logSchema.info(logger, '[Server] Started Successfully', { + type: 'server', + }) }) - .catch((e) => { - logSchema.error(logger, 'Server shutdown with error', { + .catch(async (e) => { + logSchema.error(logger, 'Server not started with error', { type: 'startupError', error: e, }) + + await shutdown(shutdownSignal) + process.exit(1) + }) + .catch(() => { + process.exit(1) }) /** * Start Storage API server */ async function main() { - const { - databaseURL, - isMultitenant, - requestTraceHeader, - adminRequestIdHeader, - adminPort, - port, - host, - pgQueueEnable, - pgQueueEnableWorkers, - exposeDocs, - } = getConfig() + const { databaseURL, isMultitenant, pgQueueEnable, pgQueueEnableWorkers } = getConfig() // Migrations if (isMultitenant) { await runMultitenantMigrations() await listenForTenantUpdate(PubSub) - startAsyncMigrations(serverSignal.signal) + startAsyncMigrations(shutdownSignal.nextGroup.signal) } else { await runMigrationsOnTenant(databaseURL) } // Queue if (pgQueueEnable) { - if (pgQueueEnableWorkers) { - registerWorkers() - } - await Queue.init() + await Queue.start({ + signal: shutdownSignal.nextGroup.signal, + registerWorkers: registerWorkers, + }) } // Pubsub - await PubSub.connect() + await PubSub.start({ + signal: shutdownSignal.nextGroup.signal, + }) // HTTP Server + const app = await httpServer(shutdownSignal.signal) + + // HTTP Server Admin + if (isMultitenant) { + await httpAdminApp(app, shutdownSignal.signal) + } +} + +/** + * Starts HTTP API Server + * @param signal + */ +async function httpServer(signal: AbortSignal) { + const { exposeDocs, requestTraceHeader, port, host } = getConfig() + const app: FastifyInstance = build({ logger, disableRequestLogging: true, @@ -85,61 +93,82 @@ async function main() { requestIdHeader: requestTraceHeader, }) - app.listen({ port, host }, (err) => { - if (err) { - logSchema.error(logger, `Server failed to start`, { - type: 'serverStartError', - error: err, - }) - process.exit(1) - } + const closePromise = createServerClosedPromise(app.server, () => { + logSchema.info(logger, '[Server] Exited', { + type: 'server', + }) }) - // HTTP Server Admin - let adminApp: FastifyInstance | undefined = undefined + try { + signal.addEventListener( + 'abort', + async () => { + logSchema.info(logger, '[Server] Stopping', { + type: 'server', + }) - if (isMultitenant) { - adminApp = buildAdmin( - { - logger, - disableRequestLogging: true, - requestIdHeader: adminRequestIdHeader, + await closePromise }, - app + { once: true } ) + await app.listen({ port, host, signal }) - try { - await adminApp.listen({ port: adminPort, host }) - } catch (err) { - logSchema.error(adminApp.log, 'Failed to start admin app', { - type: 'adminAppStartError', - error: err, - }) - process.exit(1) - } + return app + } catch (err) { + logSchema.error(logger, `Server failed to start`, { + type: 'serverStartError', + error: err, + }) + throw err } +} - process.on('SIGTERM', async () => { - try { - logger.info('Received SIGTERM, shutting down') - await Promise.allSettled([app.close(), adminApp?.close()]) - await Promise.allSettled([ - serverSignal.abort(), - Queue.stop(), - TenantConnection.stop(), - PubSub.close(), - multitenantKnex.destroy(), - ]) - - if (process.env.NODE_ENV !== 'production') { - process.exit(0) - } - } catch (e) { - logSchema.error(logger, 'shutdown error', { - type: 'SIGTERM', - error: e, - }) - process.exit(1) - } +/** + * Starts HTTP Admin endpoints + * @param app + * @param signal + */ +async function httpAdminApp( + app: FastifyInstance, + signal: AbortSignal +) { + const { adminRequestIdHeader, adminPort, host } = getConfig() + + const adminApp = buildAdmin( + { + logger, + disableRequestLogging: true, + requestIdHeader: adminRequestIdHeader, + }, + app + ) + + const closePromise = createServerClosedPromise(adminApp.server, () => { + logSchema.info(logger, '[Admin Server] Exited', { + type: 'server', + }) }) + + signal.addEventListener( + 'abort', + async () => { + logSchema.info(logger, '[Admin Server] Stopping', { + type: 'server', + }) + + await closePromise + }, + { once: true } + ) + + try { + await adminApp.listen({ port: adminPort, host, signal }) + } catch (err) { + logSchema.error(adminApp.log, 'Failed to start admin app', { + type: 'adminAppStartError', + error: err, + }) + throw err + } + return adminApp } diff --git a/src/shutdown.ts b/src/shutdown.ts new file mode 100644 index 00000000..cd571dfa --- /dev/null +++ b/src/shutdown.ts @@ -0,0 +1,92 @@ +import { logger, logSchema } from '@internal/monitoring' +import { AsyncAbortController } from '@internal/concurrency' +import { multitenantKnex, TenantConnection } from '@internal/database' +import http from 'http' + +/** + * Binds shutdown handlers to the process + * @param serverSignal + */ +export function bindShutdownSignals(serverSignal: AsyncAbortController) { + // Register handlers + process.on('uncaughtException', (e) => { + logSchema.error(logger, 'uncaught exception', { + type: 'uncaughtException', + error: e, + }) + process.exit(1) + }) + + // Shutdown handler + process.on('SIGTERM', async () => { + logSchema.info(logger, '[Server] Received SIGTERM, shutting down', { + type: 'shutdown', + }) + try { + await shutdown(serverSignal) + logSchema.info(logger, '[Server] SIGTERM Shutdown successfully', { + type: 'shutdown', + }) + } catch (e) { + logSchema.error(logger, '[Server] SIGTERM Shutdown with error', { + type: 'shutdown', + error: e, + }) + process.exit(1) + } + }) +} + +/** + * Gracefully shuts down the server + * @param serverSignal + */ +export async function shutdown(serverSignal: AsyncAbortController) { + try { + const errors: unknown[] = [] + + await serverSignal.abortAsync().catch((e) => { + logSchema.error(logger, 'Failed to abort server signal', { + type: 'shutdown', + error: e, + }) + errors.push(e) + }) + + await multitenantKnex.destroy().catch((e) => { + logSchema.error(logger, 'Failed to close database connection', { + type: 'shutdown', + error: e, + }) + errors.push(e) + }) + + await TenantConnection.stop().catch((e) => { + logSchema.error(logger, 'Failed to close tenant connection', { + type: 'shutdown', + error: e, + }) + }) + + if (errors.length > 0) { + throw errors[errors.length - 1] + } + } catch (e) { + logSchema.error(logger, 'shutdown error', { + type: 'shutdown', + error: e, + }) + throw e + } finally { + logger.flush() + } +} + +export function createServerClosedPromise(server: http.Server, cb: () => Promise | void) { + return new Promise((res) => { + server.once('close', async () => { + await cb() + res() + }) + }) +} diff --git a/src/test/rls.test.ts b/src/test/rls.test.ts index 6702404d..53cb57b2 100644 --- a/src/test/rls.test.ts +++ b/src/test/rls.test.ts @@ -1,20 +1,22 @@ import { randomUUID } from 'crypto' -import { CreateBucketCommand, S3Client } from '@aws-sdk/client-s3' -import { StorageKnexDB } from '../storage/database' -import app from '../app' -import { getConfig } from '../config' -import { checkBucketExists } from './common' -import { createStorageBackend } from '../storage/backend' import { Knex, knex } from 'knex' -import { signJWT } from '../internal/auth' import fs from 'fs' import path from 'path' -import { Storage } from '../storage' -import { getPostgresConnection } from '../internal/database' import FormData from 'form-data' import yaml from 'js-yaml' import Mustache from 'mustache' -import { getServiceKeyUser } from '../internal/database/tenant' +import { CreateBucketCommand, S3Client } from '@aws-sdk/client-s3' + +import { StorageKnexDB } from '@storage/database' +import { createStorageBackend } from '@storage/backend' +import { getPostgresConnection } from '@internal/database' +import { getServiceKeyUser } from '@internal/database' +import { signJWT } from '@internal/auth' + +import app from '../app' +import { getConfig } from '../config' +import { checkBucketExists } from './common' +import { Storage } from '../storage' interface Policy { name: string diff --git a/src/worker.ts b/src/worker.ts index 9ae0a342..2eea0c86 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,8 +1,31 @@ import { Queue } from '@internal/queue' import { logger, logSchema } from '@internal/monitoring' +import { listenForTenantUpdate, PubSub } from '@internal/database' +import { AsyncAbortController } from '@internal/concurrency' import { registerWorkers } from '@storage/events' + import { getConfig } from './config' import adminApp from './admin-app' +import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown' + +const shutdownSignal = new AsyncAbortController() + +bindShutdownSignals(shutdownSignal) + +// Start the Worker +main() + .then(async () => { + logSchema.info(logger, '[Server] Started successfully', { + type: 'server', + }) + }) + .catch(async () => { + await shutdown(shutdownSignal) + process.exit(1) + }) + .catch(() => { + process.exit(1) + }) /** * Starts Storage Worker @@ -11,9 +34,23 @@ export async function main() { const { requestTraceHeader, adminPort, host } = getConfig() logger.info('[Queue] Starting Queue Worker') - registerWorkers() - const queue = await Queue.init() + await listenForTenantUpdate(PubSub) + + await Promise.all([ + Queue.start({ + signal: shutdownSignal.signal, + registerWorkers, + onMessage: (job) => + logger.info(`[Worker] Job Received ${job.name} ${job.id}`, { + type: 'worker', + job: JSON.stringify(job), + }), + }), + PubSub.start({ + signal: shutdownSignal.nextGroup.nextGroup.signal, + }), + ]) const server = adminApp({ logger, @@ -21,40 +58,18 @@ export async function main() { requestIdHeader: requestTraceHeader, }) - process.on('SIGTERM', async () => { - logger.info('[Worker] Stopping') - await server.close() - await Queue.stop() - }) - - await server.listen({ port: adminPort, host }) - - return new Promise((resolve, reject) => { - queue.on('error', (err) => { - logger.info('[Queue] Error', err) - reject(err) - }) - - queue.on('stopped', () => { - logger.info('[Queue] Stopping') - resolve() + const shutdownPromise = createServerClosedPromise(server.server, () => { + logSchema.info(logger, '[Admin Server] Exited', { + type: 'server', }) }) -} -process.on('uncaughtException', (e) => { - logSchema.error(logger, 'uncaught exception', { - type: 'uncaughtException', - error: e, + shutdownSignal.nextGroup.signal.addEventListener('abort', async () => { + logSchema.info(logger, '[Admin Server] Stopping', { + type: 'server', + }) + await shutdownPromise }) - logger.flush() - process.exit(1) -}) -main() - .then(() => { - logger.info('[Queue] Worker Exited Successfully') - }) - .catch(() => { - process.exit(1) - }) + await server.listen({ port: adminPort, host, signal: shutdownSignal.nextGroup.signal }) +}