From 38cb4e8fe89dfbfee6d345df993554a874567d6b Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 28 Jun 2024 15:16:02 +0200 Subject: [PATCH] fix: custom webhooks-max-sockets env --- Dockerfile | 2 +- package-lock.json | 105 +++++------ package.json | 8 +- src/config.ts | 8 +- src/http/plugins/log-request.ts | 7 +- src/http/plugins/storage.ts | 2 +- src/internal/database/multitenant-db.ts | 11 +- src/internal/monitoring/logger.ts | 3 + src/internal/queue/database.ts | 0 src/internal/queue/event.ts | 216 ++++++++++++++++++++++ src/internal/queue/index.ts | 1 + src/internal/queue/queue.ts | 15 +- src/{ => start}/server.ts | 16 +- src/{ => start}/shutdown.ts | 0 src/{ => start}/worker.ts | 4 +- src/storage/database/adapter.ts | 7 +- src/storage/database/knex.ts | 32 +++- src/storage/events/base-event.ts | 202 +------------------- src/storage/events/object-admin-delete.ts | 3 +- src/storage/events/object-created.ts | 3 +- src/storage/events/object-removed.ts | 3 +- src/storage/events/object-updated.ts | 3 +- src/storage/events/run-migrations.ts | 3 +- src/storage/events/webhook.ts | 5 +- src/storage/object.ts | 4 +- src/storage/protocols/s3/signature-v4.ts | 15 +- src/storage/uploader.ts | 50 +++-- 27 files changed, 417 insertions(+), 311 deletions(-) create mode 100644 src/internal/queue/database.ts create mode 100644 src/internal/queue/event.ts rename src/{ => start}/server.ts (94%) rename src/{ => start}/shutdown.ts (100%) rename src/{ => start}/worker.ts (96%) diff --git a/Dockerfile b/Dockerfile index b30fddde..2ee4c0e0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,4 +33,4 @@ COPY --from=production-deps /app/node_modules node_modules COPY --from=build /app/dist dist EXPOSE 5000 -CMD ["node", "dist/server.js"] \ No newline at end of file +CMD ["node", "dist/start/server.js"] \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index f1bcefdf..e65dbf82 100644 --- a/package-lock.json +++ b/package-lock.json @@ -53,7 +53,7 @@ "md5-file": "^5.0.0", "multistream": "^4.1.0", "object-sizeof": "^2.6.4", - "pg": "^8.11.3", + "pg": "^8.12.0", "pg-boss": "^9.0.3", "pg-listen": "^1.7.0", "pino": "^8.15.4", @@ -97,7 +97,7 @@ "stream-buffers": "^3.0.2", "ts-jest": "^29.0.3", "ts-node-dev": "^1.1.8", - "tsx": "^3.13.0", + "tsx": "^3.14.0", "tus-js-client": "^3.1.0", "typescript": "^4.5.5" }, @@ -6203,14 +6203,6 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "dev": true }, - "node_modules/buffer-writer": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", - "integrity": "sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==", - "engines": { - "node": ">=4" - } - }, "node_modules/callsites": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz", @@ -9554,11 +9546,6 @@ "node": ">=6" } }, - "node_modules/packet-reader": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/packet-reader/-/packet-reader-1.0.0.tgz", - "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==" - }, "node_modules/parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -9631,15 +9618,13 @@ } }, "node_modules/pg": { - "version": "8.11.3", - "resolved": "https://registry.npmjs.org/pg/-/pg-8.11.3.tgz", - "integrity": "sha512-+9iuvG8QfaaUrrph+kpF24cXkH1YOOUeArRNYIxq1viYHZagBxrTno7cecY1Fa44tJeZvaoG+Djpkc3JwehN5g==", - "dependencies": { - "buffer-writer": "2.0.0", - "packet-reader": "1.0.0", - "pg-connection-string": "^2.6.2", - "pg-pool": "^3.6.1", - "pg-protocol": "^1.6.0", + "version": "8.12.0", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.12.0.tgz", + "integrity": "sha512-A+LHUSnwnxrnL/tZ+OLfqR1SxLN3c/pgDztZ47Rpbsd4jUytsTtwQo/TLPRzPJMp/1pbhYVhH9cuSZLAajNfjQ==", + "dependencies": { + "pg-connection-string": "^2.6.4", + "pg-pool": "^3.6.2", + "pg-protocol": "^1.6.1", "pg-types": "^2.1.0", "pgpass": "1.x" }, @@ -9724,17 +9709,17 @@ } }, "node_modules/pg-pool": { - "version": "3.6.1", - "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.1.tgz", - "integrity": "sha512-jizsIzhkIitxCGfPRzJn1ZdcosIt3pz9Sh3V01fm1vZnbnCMgmGl5wvGGdNN2EL9Rmb0EcFoCkixH4Pu+sP9Og==", + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.2.tgz", + "integrity": "sha512-Htjbg8BlwXqSBQ9V8Vjtc+vzf/6fVUuak/3/XXKA9oxZprwW3IMDQTGHP+KDmVL7rtd+R1QjbnCFPuTHm3G4hg==", "peerDependencies": { "pg": ">=8.0" } }, "node_modules/pg-protocol": { - "version": "1.6.0", - "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.0.tgz", - "integrity": "sha512-M+PDm637OY5WM307051+bsDia5Xej6d9IR4GwJse1qA1DIhiKlksvrneZOYQq42OM+spubpcNYEo2FcKQrDk+Q==" + "version": "1.6.1", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.1.tgz", + "integrity": "sha512-jPIlvgoD63hrEuihvIg+tJhoGjUsLPn6poJY9N5CnlPd91c2T18T/9zBtLxZSb1EhYxBRoZJtzScCaWlYLtktg==" }, "node_modules/pg-types": { "version": "2.2.0", @@ -9751,6 +9736,11 @@ "node": ">=4" } }, + "node_modules/pg/node_modules/pg-connection-string": { + "version": "2.6.4", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.6.4.tgz", + "integrity": "sha512-v+Z7W/0EO707aNMaAEfiGnGL9sxxumwLl2fJvCQtMn9Fxsg+lPpPkdcyBSv/KFgpGdYkMfn+EI1Or2EHjpgLCA==" + }, "node_modules/pgpass": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz", @@ -11270,9 +11260,9 @@ "dev": true }, "node_modules/tsx": { - "version": "3.13.0", - "resolved": "https://registry.npmjs.org/tsx/-/tsx-3.13.0.tgz", - "integrity": "sha512-rjmRpTu3as/5fjNq/kOkOtihgLxuIz6pbKdj9xwP4J5jOLkBxw/rjN5ANw+KyrrOXV5uB7HC8+SrrSJxT65y+A==", + "version": "3.14.0", + "resolved": "https://registry.npmjs.org/tsx/-/tsx-3.14.0.tgz", + "integrity": "sha512-xHtFaKtHxM9LOklMmJdI3BEnQq/D5F73Of2E1GDrITi9sgoVkvIsrQUTY1G8FlmGtA+awCI4EBlTRRYxkL2sRg==", "dev": true, "dependencies": { "esbuild": "~0.18.20", @@ -16716,11 +16706,6 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "dev": true }, - "buffer-writer": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", - "integrity": "sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==" - }, "callsites": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz", @@ -19233,11 +19218,6 @@ "integrity": "sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ==", "dev": true }, - "packet-reader": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/packet-reader/-/packet-reader-1.0.0.tgz", - "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==" - }, "parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -19289,18 +19269,23 @@ "dev": true }, "pg": { - "version": "8.11.3", - "resolved": "https://registry.npmjs.org/pg/-/pg-8.11.3.tgz", - "integrity": "sha512-+9iuvG8QfaaUrrph+kpF24cXkH1YOOUeArRNYIxq1viYHZagBxrTno7cecY1Fa44tJeZvaoG+Djpkc3JwehN5g==", + "version": "8.12.0", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.12.0.tgz", + "integrity": "sha512-A+LHUSnwnxrnL/tZ+OLfqR1SxLN3c/pgDztZ47Rpbsd4jUytsTtwQo/TLPRzPJMp/1pbhYVhH9cuSZLAajNfjQ==", "requires": { - "buffer-writer": "2.0.0", - "packet-reader": "1.0.0", "pg-cloudflare": "^1.1.1", - "pg-connection-string": "^2.6.2", - "pg-pool": "^3.6.1", - "pg-protocol": "^1.6.0", + "pg-connection-string": "^2.6.4", + "pg-pool": "^3.6.2", + "pg-protocol": "^1.6.1", "pg-types": "^2.1.0", "pgpass": "1.x" + }, + "dependencies": { + "pg-connection-string": { + "version": "2.6.4", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.6.4.tgz", + "integrity": "sha512-v+Z7W/0EO707aNMaAEfiGnGL9sxxumwLl2fJvCQtMn9Fxsg+lPpPkdcyBSv/KFgpGdYkMfn+EI1Or2EHjpgLCA==" + } } }, "pg-boss": { @@ -19356,15 +19341,15 @@ } }, "pg-pool": { - "version": "3.6.1", - "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.1.tgz", - "integrity": "sha512-jizsIzhkIitxCGfPRzJn1ZdcosIt3pz9Sh3V01fm1vZnbnCMgmGl5wvGGdNN2EL9Rmb0EcFoCkixH4Pu+sP9Og==", + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.2.tgz", + "integrity": "sha512-Htjbg8BlwXqSBQ9V8Vjtc+vzf/6fVUuak/3/XXKA9oxZprwW3IMDQTGHP+KDmVL7rtd+R1QjbnCFPuTHm3G4hg==", "requires": {} }, "pg-protocol": { - "version": "1.6.0", - "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.0.tgz", - "integrity": "sha512-M+PDm637OY5WM307051+bsDia5Xej6d9IR4GwJse1qA1DIhiKlksvrneZOYQq42OM+spubpcNYEo2FcKQrDk+Q==" + "version": "1.6.1", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.1.tgz", + "integrity": "sha512-jPIlvgoD63hrEuihvIg+tJhoGjUsLPn6poJY9N5CnlPd91c2T18T/9zBtLxZSb1EhYxBRoZJtzScCaWlYLtktg==" }, "pg-types": { "version": "2.2.0", @@ -20493,9 +20478,9 @@ } }, "tsx": { - "version": "3.13.0", - "resolved": "https://registry.npmjs.org/tsx/-/tsx-3.13.0.tgz", - "integrity": "sha512-rjmRpTu3as/5fjNq/kOkOtihgLxuIz6pbKdj9xwP4J5jOLkBxw/rjN5ANw+KyrrOXV5uB7HC8+SrrSJxT65y+A==", + "version": "3.14.0", + "resolved": "https://registry.npmjs.org/tsx/-/tsx-3.14.0.tgz", + "integrity": "sha512-xHtFaKtHxM9LOklMmJdI3BEnQq/D5F73Of2E1GDrITi9sgoVkvIsrQUTY1G8FlmGtA+awCI4EBlTRRYxkL2sRg==", "dev": true, "requires": { "esbuild": "~0.18.20", diff --git a/package.json b/package.json index 56af9e57..6abab878 100644 --- a/package.json +++ b/package.json @@ -4,9 +4,9 @@ "description": "Supabase storage middleend", "main": "index.js", "scripts": { - "dev": "tsx watch ./src/server.ts | pino-pretty", + "dev": "tsx watch src/start/server.ts | pino-pretty", "build": "node ./build.js && resolve-tspaths", - "start": "NODE_ENV=production node dist/server.js", + "start": "NODE_ENV=production node dist/start/server.js", "migration:run": "tsx ./src/scripts/migrate-call.ts", "docs:export": "tsx ./src/scripts/export-docs.ts", "test:dummy-data": "tsx -r dotenv/config ./src/test/db/import-dummy-data.ts", @@ -69,7 +69,7 @@ "md5-file": "^5.0.0", "multistream": "^4.1.0", "object-sizeof": "^2.6.4", - "pg": "^8.11.3", + "pg": "^8.12.0", "pg-boss": "^9.0.3", "pg-listen": "^1.7.0", "pino": "^8.15.4", @@ -110,7 +110,7 @@ "stream-buffers": "^3.0.2", "ts-jest": "^29.0.3", "ts-node-dev": "^1.1.8", - "tsx": "^3.13.0", + "tsx": "^3.14.0", "tus-js-client": "^3.1.0", "typescript": "^4.5.5" }, diff --git a/src/config.ts b/src/config.ts index 3df35300..bfeeae38 100644 --- a/src/config.ts +++ b/src/config.ts @@ -9,6 +9,7 @@ export enum MultitenantMigrationStrategy { } type StorageConfigType = { + isProduction: boolean version: string exposeDocs: boolean keepAliveTimeout: number @@ -19,7 +20,7 @@ type StorageConfigType = { uploadFileSizeLimit: number uploadFileSizeLimitStandard?: number storageFilePath?: string - storageS3MaxSockets?: number + storageS3MaxSockets: number storageS3Bucket: string storageS3Endpoint?: string storageS3ForcePathStyle?: boolean @@ -69,6 +70,7 @@ type StorageConfigType = { webhookQueuePullInterval?: number webhookQueueTeamSize?: number webhookQueueConcurrency?: number + webhookMaxConnections: number adminDeleteQueueTeamSize?: number adminDeleteConcurrency?: number imageTransformationEnabled: boolean @@ -155,6 +157,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { envPaths.map((envPath) => dotenv.config({ path: envPath, override: false })) config = { + isProduction: process.env.NODE_ENV === 'production', exposeDocs: getOptionalConfigFromEnv('EXPOSE_DOCS') !== 'false', // Tenant tenantId: @@ -324,6 +327,9 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { ), webhookQueueTeamSize: parseInt(getOptionalConfigFromEnv('QUEUE_WEBHOOKS_TEAM_SIZE') || '50'), webhookQueueConcurrency: parseInt(getOptionalConfigFromEnv('QUEUE_WEBHOOK_CONCURRENCY') || '5'), + webhookMaxConnections: parseInt( + getOptionalConfigFromEnv('QUEUE_WEBHOOK_MAX_CONNECTIONS') || '500' + ), adminDeleteQueueTeamSize: parseInt( getOptionalConfigFromEnv('QUEUE_ADMIN_DELETE_TEAM_SIZE') || '50' ), diff --git a/src/http/plugins/log-request.ts b/src/http/plugins/log-request.ts index 0de1866b..26bd191e 100644 --- a/src/http/plugins/log-request.ts +++ b/src/http/plugins/log-request.ts @@ -11,6 +11,7 @@ declare module 'fastify' { executionError?: Error operation?: { type: string } resources?: string[] + startTime: number } interface FastifyContextConfig { @@ -21,6 +22,10 @@ declare module 'fastify' { export const logRequest = (options: RequestLoggerOptions) => fastifyPlugin(async (fastify) => { + fastify.addHook('onRequest', async (req) => { + req.startTime = Date.now() + }) + fastify.addHook('preHandler', async (req) => { const resourceFromParams = Object.values(req.params || {}).join('/') const resources = getFirstDefined( @@ -69,7 +74,7 @@ export const logRequest = (options: RequestLoggerOptions) => logSchema.request(req.log, buildLogMessage, { type: 'request', req, - responseTime: 0, + responseTime: (Date.now() - req.startTime) / 1000, error: error, owner: req.owner, operation: req.operation?.type ?? req.routeConfig.operation?.type, diff --git a/src/http/plugins/storage.ts b/src/http/plugins/storage.ts index 5dd95d2d..8c6127c3 100644 --- a/src/http/plugins/storage.ts +++ b/src/http/plugins/storage.ts @@ -15,7 +15,7 @@ const { storageBackendType } = getConfig() const storageBackend = createStorageBackend(storageBackendType) -export const storage = fastifyPlugin(async (fastify) => { +export const storage = fastifyPlugin(async function storagePlugin(fastify) { fastify.decorateRequest('storage', undefined) fastify.addHook('preHandler', async (request) => { const database = new StorageKnexDB(request.db, { diff --git a/src/internal/database/multitenant-db.ts b/src/internal/database/multitenant-db.ts index 51913af2..cd947a42 100644 --- a/src/internal/database/multitenant-db.ts +++ b/src/internal/database/multitenant-db.ts @@ -5,9 +5,18 @@ const { multitenantDatabaseUrl } = getConfig() export const multitenantKnex = Knex({ client: 'pg', - connection: multitenantDatabaseUrl, + connection: { + connectionString: multitenantDatabaseUrl, + connectionTimeoutMillis: 5000, + }, + version: '12', pool: { min: 0, max: 10, + createTimeoutMillis: 5000, + acquireTimeoutMillis: 5000, + idleTimeoutMillis: 5000, + reapIntervalMillis: 1000, + createRetryIntervalMillis: 100, }, }) diff --git a/src/internal/monitoring/logger.ts b/src/internal/monitoring/logger.ts index 8f70e672..cbaf4697 100644 --- a/src/internal/monitoring/logger.ts +++ b/src/internal/monitoring/logger.ts @@ -55,6 +55,7 @@ export interface EventLog { jodId: string type: 'event' event: string + region: string payload: string objectPath: string tenantId: string @@ -77,6 +78,8 @@ interface InfoLog { export const logSchema = { info: (logger: BaseLogger, message: string, log: InfoLog) => logger.info(log, message), + warning: (logger: BaseLogger, message: string, log: InfoLog | ErrorLog) => + logger.warn(log, message), request: (logger: BaseLogger, message: string, log: RequestLog) => { if (!log.res) { logger.warn(log, message) diff --git a/src/internal/queue/database.ts b/src/internal/queue/database.ts new file mode 100644 index 00000000..e69de29b diff --git a/src/internal/queue/event.ts b/src/internal/queue/event.ts new file mode 100644 index 00000000..4d6d3128 --- /dev/null +++ b/src/internal/queue/event.ts @@ -0,0 +1,216 @@ +import { Queue } from './queue' +import PgBoss, { BatchWorkOptions, Job, SendOptions, WorkOptions } from 'pg-boss' +import { getConfig } from '../../config' +import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics' +import { logger, logSchema } from '@internal/monitoring' + +export interface BasePayload { + $version?: string + singletonKey?: string + reqId?: string + tenant: { + ref: string + host: string + } +} + +export interface SlowRetryQueueOptions { + retryLimit: number + retryDelay: number +} + +const { pgQueueEnable, region } = getConfig() + +export type StaticThis> = BaseEventConstructor + +interface BaseEventConstructor> { + version: string + + new (...args: any): Base + + send( + this: StaticThis, + payload: Omit + ): Promise + + eventName(): string + getWorkerOptions(): WorkOptions | BatchWorkOptions +} + +export abstract class Event> { + public static readonly version: string = 'v1' + protected static queueName = '' + + constructor(public readonly payload: T & BasePayload) {} + + static eventName() { + return this.name + } + + static getQueueName() { + if (!this.queueName) { + throw new Error(`Queue name not set on ${this.constructor.name}`) + } + + return this.queueName + } + + static getQueueOptions>(payload: T['payload']): SendOptions | undefined { + return undefined + } + + static getWorkerOptions(): WorkOptions | BatchWorkOptions { + return {} + } + + static withSlowRetryQueue(): undefined | SlowRetryQueueOptions { + return undefined + } + + static getSlowRetryQueueName() { + if (!this.queueName) { + throw new Error(`Queue name not set on ${this.constructor.name}`) + } + + return this.queueName + '-slow' + } + + static batchSend[]>(messages: T) { + return Queue.getInstance().insert( + messages.map((message) => { + const sendOptions = (this.getQueueOptions(message.payload) as PgBoss.JobInsert) || {} + if (!message.payload.$version) { + ;(message.payload as (typeof message)['payload']).$version = this.version + } + return { + ...sendOptions, + name: this.getQueueName(), + data: message.payload, + } + }) + ) + } + + static send>(this: StaticThis, payload: Omit) { + if (!payload.$version) { + ;(payload as T['payload']).$version = this.version + } + const that = new this(payload) + return that.send() + } + + static sendSlowRetryQueue>( + this: StaticThis, + payload: Omit + ) { + if (!payload.$version) { + ;(payload as T['payload']).$version = this.version + } + const that = new this(payload) + return that.sendSlowRetryQueue() + } + + static handle(job: Job['payload']> | Job['payload']>[]) { + throw new Error('not implemented') + } + + async send(): Promise { + const constructor = this.constructor as typeof Event + + if (!pgQueueEnable) { + return constructor.handle({ + id: '__sync', + name: constructor.getQueueName(), + data: { + region, + ...this.payload, + $version: constructor.version, + }, + }) + } + + const timer = QueueJobSchedulingTime.startTimer() + const sendOptions = constructor.getQueueOptions(this.payload) + + try { + const res = await Queue.getInstance().send({ + name: constructor.getQueueName(), + data: { + region, + ...this.payload, + $version: constructor.version, + }, + options: sendOptions, + }) + + QueueJobScheduled.inc({ + name: constructor.getQueueName(), + }) + + return res + } catch (e) { + // If we can't queue the message for some reason, + // we run its handler right away. + // This might create some latency with the benefit of being more fault-tolerant + logSchema.warning( + logger, + `[Queue Sender] Error while sending job to queue, sending synchronously`, + { + type: 'queue', + error: e, + metadata: JSON.stringify(this.payload), + } + ) + return constructor.handle({ + id: '__sync', + name: constructor.getQueueName(), + data: { + region, + ...this.payload, + $version: constructor.version, + }, + }) + } finally { + timer({ + name: constructor.getQueueName(), + }) + } + } + + async sendSlowRetryQueue() { + const constructor = this.constructor as typeof Event + const slowRetryQueue = constructor.withSlowRetryQueue() + + if (!pgQueueEnable || !slowRetryQueue) { + return + } + + const timer = QueueJobSchedulingTime.startTimer() + const sendOptions = constructor.getQueueOptions(this.payload) || {} + + const res = await Queue.getInstance().send({ + name: constructor.getSlowRetryQueueName(), + data: { + region, + ...this.payload, + $version: constructor.version, + }, + options: { + retryBackoff: true, + startAfter: 60 * 60 * 30, // 30 mins + ...sendOptions, + ...slowRetryQueue, + }, + }) + + timer({ + name: constructor.getSlowRetryQueueName(), + }) + + QueueJobScheduled.inc({ + name: constructor.getSlowRetryQueueName(), + }) + + return res + } +} diff --git a/src/internal/queue/index.ts b/src/internal/queue/index.ts index 2de5032b..0210f46b 100644 --- a/src/internal/queue/index.ts +++ b/src/internal/queue/index.ts @@ -1 +1,2 @@ export * from './queue' +export * from './event' diff --git a/src/internal/queue/queue.ts b/src/internal/queue/queue.ts index 81b451d8..081db82a 100644 --- a/src/internal/queue/queue.ts +++ b/src/internal/queue/queue.ts @@ -1,13 +1,13 @@ import PgBoss, { Job, JobWithMetadata } from 'pg-boss' +import { ERRORS } from '@internal/errors' import { getConfig } from '../../config' -import { BaseEvent, BasePayload } from '../../storage/events' -import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics' import { logger, logSchema } from '../monitoring' -import { ERRORS } from '@internal/errors' +import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics' +import { BasePayload, Event } from './event' //eslint-disable-next-line @typescript-eslint/no-explicit-any -type SubclassOfBaseClass = (new (payload: any) => BaseEvent) & { - [K in keyof typeof BaseEvent]: (typeof BaseEvent)[K] +type SubclassOfBaseClass = (new (payload: any) => Event) & { + [K in keyof typeof Event]: (typeof Event)[K] } export abstract class Queue { @@ -59,6 +59,8 @@ export abstract class Queue { retryBackoff: true, retryLimit: 20, expireInHours: 48, + noSupervisor: pgQueueEnableWorkers === false, + noScheduling: pgQueueEnableWorkers === false, }) Queue.pgBoss.on('error', (error) => { @@ -121,10 +123,11 @@ export abstract class Queue { } const boss = this.pgBoss + const { isProduction } = getConfig() await boss.stop({ timeout: 20 * 1000, - graceful: true, + graceful: isProduction, destroy: true, }) diff --git a/src/server.ts b/src/start/server.ts similarity index 94% rename from src/server.ts rename to src/start/server.ts index e60b3e8d..df2fcf12 100644 --- a/src/server.ts +++ b/src/start/server.ts @@ -1,10 +1,10 @@ -import './internal/monitoring/otel' +import '@internal/monitoring/otel' import { FastifyInstance } from 'fastify' import { IncomingMessage, Server, ServerResponse } from 'http' -import build from './app' -import buildAdmin from './admin-app' -import { getConfig } from './config' +import build from '../app' +import buildAdmin from '../admin-app' +import { getConfig } from '../config' import { runMultitenantMigrations, runMigrationsOnTenant, @@ -16,6 +16,7 @@ import { logger, logSchema } from '@internal/monitoring' import { Queue } from '@internal/queue' import { registerWorkers } from '@storage/events' import { AsyncAbortController } from '@internal/concurrency' + import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown' const shutdownSignal = new AsyncAbortController() @@ -46,13 +47,12 @@ main() * Start Storage API server */ async function main() { - const { databaseURL, isMultitenant, pgQueueEnable, pgQueueEnableWorkers } = getConfig() + const { databaseURL, isMultitenant, pgQueueEnable } = getConfig() // Migrations if (isMultitenant) { await runMultitenantMigrations() await listenForTenantUpdate(PubSub) - startAsyncMigrations(shutdownSignal.nextGroup.signal) } else { await runMigrationsOnTenant(databaseURL) } @@ -70,6 +70,10 @@ async function main() { signal: shutdownSignal.nextGroup.signal, }) + if (isMultitenant) { + startAsyncMigrations(shutdownSignal.nextGroup.signal) + } + // HTTP Server const app = await httpServer(shutdownSignal.signal) diff --git a/src/shutdown.ts b/src/start/shutdown.ts similarity index 100% rename from src/shutdown.ts rename to src/start/shutdown.ts diff --git a/src/worker.ts b/src/start/worker.ts similarity index 96% rename from src/worker.ts rename to src/start/worker.ts index 2eea0c86..b14fc966 100644 --- a/src/worker.ts +++ b/src/start/worker.ts @@ -4,8 +4,8 @@ import { listenForTenantUpdate, PubSub } from '@internal/database' import { AsyncAbortController } from '@internal/concurrency' import { registerWorkers } from '@storage/events' -import { getConfig } from './config' -import adminApp from './admin-app' +import { getConfig } from '../config' +import adminApp from '../admin-app' import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown' const shutdownSignal = new AsyncAbortController() diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index 6c97e278..9e53ddbd 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -101,7 +101,12 @@ export interface Database { listBuckets(columns: string): Promise mustLockObject(bucketId: string, objectName: string, version?: string): Promise - waitObjectLock(bucketId: string, objectName: string, version?: string): Promise + waitObjectLock( + bucketId: string, + objectName: string, + version?: string, + opts?: { timeout?: number } + ): Promise updateBucket( bucketId: string, diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 6ab3261c..c771576b 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -530,10 +530,38 @@ export class StorageKnexDB implements Database { }) } - async waitObjectLock(bucketId: string, objectName: string, version?: string) { + async waitObjectLock( + bucketId: string, + objectName: string, + version?: string, + opts?: { timeout: number } + ) { return this.runQuery('WaitObjectLock', async (knex) => { const hash = hashStringToInt(`${bucketId}/${objectName}${version ? `/${version}` : ''}`) - await knex.raw(`SELECT pg_advisory_xact_lock(?)`, [hash]) + const query = knex.raw(`SELECT pg_advisory_xact_lock(?)`, [hash]) + + if (opts?.timeout) { + let timeoutInterval: undefined | NodeJS.Timeout + + try { + await Promise.race([ + query, + new Promise( + (_, reject) => + (timeoutInterval = setTimeout(() => reject(ERRORS.LockTimeout()), opts.timeout)) + ), + ]) + } catch (e) { + throw e + } finally { + if (timeoutInterval) { + clearTimeout(timeoutInterval) + } + } + } else { + await query + } + return true }) } diff --git a/src/storage/events/base-event.ts b/src/storage/events/base-event.ts index 846dc20d..c8624779 100644 --- a/src/storage/events/base-event.ts +++ b/src/storage/events/base-event.ts @@ -1,125 +1,21 @@ -import { Queue } from '@internal/queue' -import PgBoss, { BatchWorkOptions, Job, SendOptions, WorkOptions } from 'pg-boss' +import { Event as QueueBaseEvent, BasePayload, StaticThis, Event } from '@internal/queue' import { getPostgresConnection, getServiceKeyUser } from '@internal/database' -import { Storage } from '../index' import { StorageKnexDB } from '../database' import { createAgent, createStorageBackend } from '../backend' +import { Storage } from '../index' import { getConfig } from '../../config' -import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics' import { logger } from '@internal/monitoring' -export interface BasePayload { - $version?: string - singletonKey?: string - reqId?: string - tenant: { - ref: string - host: string - } -} - -export interface SlowRetryQueueOptions { - retryLimit: number - retryDelay: number -} - -const { pgQueueEnable, storageBackendType, storageS3Endpoint, region } = getConfig() +const { storageBackendType, storageS3Endpoint, region } = getConfig() const storageS3Protocol = storageS3Endpoint?.includes('http://') ? 'http' : 'https' const httpAgent = createAgent(storageS3Protocol) -type StaticThis> = BaseEventConstructor - -interface BaseEventConstructor> { - version: string - - new (...args: any): Base - - send( - this: StaticThis, - payload: Omit - ): Promise - - eventName(): string - getWorkerOptions(): WorkOptions | BatchWorkOptions -} - -export abstract class BaseEvent> { - public static readonly version: string = 'v1' - protected static queueName = '' - - constructor(public readonly payload: T & BasePayload) {} - - static eventName() { - return this.name - } - - static getQueueName() { - if (!this.queueName) { - throw new Error(`Queue name not set on ${this.constructor.name}`) - } - - return this.queueName - } - - static getQueueOptions>(payload: T['payload']): SendOptions | undefined { - return undefined - } - - static getWorkerOptions(): WorkOptions | BatchWorkOptions { - return {} - } - - static withSlowRetryQueue(): undefined | SlowRetryQueueOptions { - return undefined - } - - static getSlowRetryQueueName() { - if (!this.queueName) { - throw new Error(`Queue name not set on ${this.constructor.name}`) - } - - return this.queueName + '-slow' - } - - static batchSend[]>(messages: T) { - return Queue.getInstance().insert( - messages.map((message) => { - const sendOptions = (this.getQueueOptions(message.payload) as PgBoss.JobInsert) || {} - if (!message.payload.$version) { - ;(message.payload as (typeof message)['payload']).$version = this.version - } - return { - ...sendOptions, - name: this.getQueueName(), - data: message.payload, - } - }) - ) - } - - static send>( - this: StaticThis, - payload: Omit - ) { - if (!payload.$version) { - ;(payload as T['payload']).$version = this.version - } - const that = new this(payload) - return that.send() - } - - static sendSlowRetryQueue>( - this: StaticThis, - payload: Omit - ) { - if (!payload.$version) { - ;(payload as T['payload']).$version = this.version - } - const that = new this(payload) - return that.sendSlowRetryQueue() - } - - static async sendWebhook>( +export abstract class BaseEvent> extends QueueBaseEvent { + /** + * Sends a message as a webhook + * @param payload + */ + static async sendWebhook>( this: StaticThis, payload: Omit ) { @@ -155,10 +51,6 @@ export abstract class BaseEvent> { } } - static handle(job: Job['payload']> | Job['payload']>[]) { - throw new Error('not implemented') - } - protected static async createStorage(payload: BasePayload) { const adminUser = await getServiceKeyUser(payload.tenant.ref) @@ -181,80 +73,4 @@ export abstract class BaseEvent> { return new Storage(storageBackend, db) } - - async send(): Promise { - const constructor = this.constructor as typeof BaseEvent - - if (!pgQueueEnable) { - return constructor.handle({ - id: '', - name: constructor.getQueueName(), - data: { - region, - ...this.payload, - $version: constructor.version, - }, - }) - } - - const timer = QueueJobSchedulingTime.startTimer() - const sendOptions = constructor.getQueueOptions(this.payload) - - const res = await Queue.getInstance().send({ - name: constructor.getQueueName(), - data: { - region, - ...this.payload, - $version: constructor.version, - }, - options: sendOptions, - }) - - timer({ - name: constructor.getQueueName(), - }) - - QueueJobScheduled.inc({ - name: constructor.getQueueName(), - }) - - return res - } - - async sendSlowRetryQueue() { - const constructor = this.constructor as typeof BaseEvent - const slowRetryQueue = constructor.withSlowRetryQueue() - - if (!pgQueueEnable || !slowRetryQueue) { - return - } - - const timer = QueueJobSchedulingTime.startTimer() - const sendOptions = constructor.getQueueOptions(this.payload) || {} - - const res = await Queue.getInstance().send({ - name: constructor.getSlowRetryQueueName(), - data: { - region, - ...this.payload, - $version: constructor.version, - }, - options: { - retryBackoff: true, - startAfter: 60 * 60 * 30, // 30 mins - ...sendOptions, - ...slowRetryQueue, - }, - }) - - timer({ - name: constructor.getSlowRetryQueueName(), - }) - - QueueJobScheduled.inc({ - name: constructor.getSlowRetryQueueName(), - }) - - return res - } } diff --git a/src/storage/events/object-admin-delete.ts b/src/storage/events/object-admin-delete.ts index d22f89c4..47381be9 100644 --- a/src/storage/events/object-admin-delete.ts +++ b/src/storage/events/object-admin-delete.ts @@ -1,9 +1,10 @@ -import { BaseEvent, BasePayload } from './base-event' +import { BaseEvent } from './base-event' import { getConfig } from '../../config' import { Job, SendOptions, WorkOptions } from 'pg-boss' import { withOptionalVersion } from '../backend' import { logger, logSchema } from '@internal/monitoring' import { Storage } from '../index' +import { BasePayload } from '@internal/queue' export interface ObjectDeleteEvent extends BasePayload { name: string diff --git a/src/storage/events/object-created.ts b/src/storage/events/object-created.ts index ede20611..cf6e0fbb 100644 --- a/src/storage/events/object-created.ts +++ b/src/storage/events/object-created.ts @@ -1,4 +1,5 @@ -import { BaseEvent, BasePayload } from './base-event' +import { BasePayload } from '@internal/queue' +import { BaseEvent } from './base-event' import { ObjectMetadata } from '../backend' import { ObjectRemovedEvent } from './object-removed' diff --git a/src/storage/events/object-removed.ts b/src/storage/events/object-removed.ts index b333615d..df0355d2 100644 --- a/src/storage/events/object-removed.ts +++ b/src/storage/events/object-removed.ts @@ -1,4 +1,5 @@ -import { BaseEvent, BasePayload } from './base-event' +import { BasePayload } from '@internal/queue' +import { BaseEvent } from './base-event' export interface ObjectRemovedEvent extends BasePayload { name: string diff --git a/src/storage/events/object-updated.ts b/src/storage/events/object-updated.ts index 98b550d9..bc721e58 100644 --- a/src/storage/events/object-updated.ts +++ b/src/storage/events/object-updated.ts @@ -1,4 +1,5 @@ -import { BaseEvent, BasePayload } from './base-event' +import { BasePayload } from '@internal/queue' +import { BaseEvent } from './base-event' import { ObjectMetadata } from '../backend' interface ObjectUpdatedMetadataEvent extends BasePayload { diff --git a/src/storage/events/run-migrations.ts b/src/storage/events/run-migrations.ts index 4cb2e70f..e2cba669 100644 --- a/src/storage/events/run-migrations.ts +++ b/src/storage/events/run-migrations.ts @@ -1,4 +1,4 @@ -import { BaseEvent, BasePayload } from './base-event' +import { BaseEvent } from './base-event' import { areMigrationsUpToDate, getTenantConfig, @@ -8,6 +8,7 @@ import { } from '@internal/database' import { JobWithMetadata, SendOptions, WorkOptions } from 'pg-boss' import { logger, logSchema } from '@internal/monitoring' +import { BasePayload } from '@internal/queue' interface RunMigrationsPayload extends BasePayload { tenantId: string diff --git a/src/storage/events/webhook.ts b/src/storage/events/webhook.ts index 5e20b776..31842812 100644 --- a/src/storage/events/webhook.ts +++ b/src/storage/events/webhook.ts @@ -12,6 +12,7 @@ const { webhookQueuePullInterval, webhookQueueTeamSize, webhookQueueConcurrency, + webhookMaxConnections, } = getConfig() interface WebhookEvent { @@ -31,12 +32,12 @@ interface WebhookEvent { const httpAgent = webhookURL?.startsWith('https://') ? { httpsAgent: new HttpsAgent({ - maxSockets: 100, + maxSockets: webhookMaxConnections, }), } : { httpAgent: new HttpAgent({ - maxSockets: 100, + maxSockets: webhookMaxConnections, }), } diff --git a/src/storage/object.ts b/src/storage/object.ts index 6ccd0aa8..0642ea1e 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -395,7 +395,9 @@ export class ObjectStorage { const metadata = await this.backend.headObject(storageS3Bucket, s3DestinationKey, newVersion) return this.db.asSuperUser().withTransaction(async (db) => { - await db.waitObjectLock(this.bucketId, destinationObjectName) + await db.waitObjectLock(this.bucketId, destinationObjectName, undefined, { + timeout: 5000, + }) const sourceObject = await db.findObject(this.bucketId, sourceObjectName, 'id', { forUpdate: true, diff --git a/src/storage/protocols/s3/signature-v4.ts b/src/storage/protocols/s3/signature-v4.ts index 30502bc5..86bd1fb5 100644 --- a/src/storage/protocols/s3/signature-v4.ts +++ b/src/storage/protocols/s3/signature-v4.ts @@ -1,5 +1,6 @@ import crypto from 'crypto' import { ERRORS } from '@internal/errors' +import { signatureV4 } from '../../../http/plugins' interface SignatureV4Options { enforceRegion: boolean @@ -105,7 +106,7 @@ export class SignatureV4 { return { credentials: { accessKey, shortDate, region, service }, signedHeaders, - signature, + signature: signature as string, longDate, contentSha, sessionToken, @@ -114,11 +115,11 @@ export class SignatureV4 { static parseQuerySignature(query: Record) { const credentialPart = query['X-Amz-Credential'] - const signedHeaders = query['X-Amz-SignedHeaders'] - const signature = query['X-Amz-Signature'] - const longDate = query['X-Amz-Date'] - const contentSha = query['X-Amz-Content-Sha256'] - const sessionToken = query['X-Amz-Security-Token'] + const signedHeaders: string = query['X-Amz-SignedHeaders'] + const signature: string = query['X-Amz-Signature'] + const longDate: string = query['X-Amz-Date'] + const contentSha: string = query['X-Amz-Content-Sha256'] + const sessionToken: string | undefined = query['X-Amz-Security-Token'] const expires = query['X-Amz-Expires'] if (!validateTypeOfStrings(credentialPart, signedHeaders, signature, longDate)) { @@ -129,7 +130,7 @@ export class SignatureV4 { this.checkExpiration(longDate, expires) } - const credentialsPart = credentialPart.split('/') + const credentialsPart = credentialPart.split('/') as string[] if (credentialsPart.length !== 5) { throw ERRORS.InvalidSignature('Invalid credentials') } diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index 663272c2..e365f30a 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -9,6 +9,7 @@ import { getFileSizeLimit, isEmptyFolder } from './limits' import { Database } from './database' import { ObjectAdminDelete, ObjectCreatedPostEvent, ObjectCreatedPutEvent } from './events' import { getConfig } from '../config' +import { logger, logSchema } from '@internal/monitoring' interface UploaderOptions extends UploadObjectOptions { fileSizeLimit?: number | null @@ -141,20 +142,20 @@ export class Uploader { uploadType?: 'standard' | 's3' | 'resumable' }) { try { - return await this.db.withTransaction(async (db) => { - await db.waitObjectLock(bucketId, objectName) + return await this.db.asSuperUser().withTransaction(async (db) => { + await db.waitObjectLock(bucketId, objectName, undefined, { + timeout: 5000, + }) - const currentObj = await db - .asSuperUser() - .findObject(bucketId, objectName, 'id, version, metadata', { - forUpdate: true, - dontErrorOnEmpty: true, - }) + const currentObj = await db.findObject(bucketId, objectName, 'id, version, metadata', { + forUpdate: true, + dontErrorOnEmpty: true, + }) const isNew = !Boolean(currentObj) // update object - const newObject = await db.asSuperUser().upsertObject({ + const newObject = await db.upsertObject({ bucket_id: bucketId, name: objectName, metadata: objectMetadata, @@ -180,14 +181,29 @@ export class Uploader { const event = isUpsert && !isNew ? ObjectCreatedPutEvent : ObjectCreatedPostEvent events.push( - event.sendWebhook({ - tenant: this.db.tenant(), - name: objectName, - bucketId: bucketId, - metadata: objectMetadata, - reqId: this.db.reqId, - uploadType, - }) + event + .sendWebhook({ + tenant: this.db.tenant(), + name: objectName, + bucketId: bucketId, + metadata: objectMetadata, + reqId: this.db.reqId, + uploadType, + }) + .catch((e) => { + logSchema.error(logger, 'Failed to send webhook', { + type: 'event', + error: e, + project: this.db.tenantId, + metadata: JSON.stringify({ + name: objectName, + bucketId: bucketId, + metadata: objectMetadata, + reqId: this.db.reqId, + uploadType, + }), + }) + }) ) await Promise.all(events)