From 077e2ae56739bccaf4788660136d9224d3a0ff5a Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 28 Jun 2024 15:16:02 +0200 Subject: [PATCH] fix: custom webhooks-max-sockets env --- Dockerfile | 2 +- package-lock.json | 91 +++++++++++-------------- package.json | 6 +- src/config.ts | 6 +- src/http/plugins/storage.ts | 2 +- src/internal/database/multitenant-db.ts | 11 ++- src/internal/monitoring/logger.ts | 2 + src/{ => start}/server.ts | 11 +-- src/{ => start}/shutdown.ts | 0 src/{ => start}/worker.ts | 4 +- src/storage/database/adapter.ts | 7 +- src/storage/database/knex.ts | 32 ++++++++- src/storage/events/base-event.ts | 64 ++++++++++++----- src/storage/events/webhook.ts | 5 +- src/storage/object.ts | 4 +- src/storage/uploader.ts | 49 ++++++++----- 16 files changed, 188 insertions(+), 108 deletions(-) rename src/{ => start}/server.ts (94%) rename src/{ => start}/shutdown.ts (100%) rename src/{ => start}/worker.ts (96%) diff --git a/Dockerfile b/Dockerfile index b30fddde..2ee4c0e0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,4 +33,4 @@ COPY --from=production-deps /app/node_modules node_modules COPY --from=build /app/dist dist EXPOSE 5000 -CMD ["node", "dist/server.js"] \ No newline at end of file +CMD ["node", "dist/start/server.js"] \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index f1bcefdf..b8a46bf1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -53,7 +53,7 @@ "md5-file": "^5.0.0", "multistream": "^4.1.0", "object-sizeof": "^2.6.4", - "pg": "^8.11.3", + "pg": "^8.12.0", "pg-boss": "^9.0.3", "pg-listen": "^1.7.0", "pino": "^8.15.4", @@ -6203,14 +6203,6 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "dev": true }, - "node_modules/buffer-writer": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", - "integrity": "sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==", - "engines": { - "node": ">=4" - } - }, "node_modules/callsites": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz", @@ -9554,11 +9546,6 @@ "node": ">=6" } }, - "node_modules/packet-reader": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/packet-reader/-/packet-reader-1.0.0.tgz", - "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==" - }, "node_modules/parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -9631,15 +9618,13 @@ } }, "node_modules/pg": { - "version": "8.11.3", - "resolved": "https://registry.npmjs.org/pg/-/pg-8.11.3.tgz", - "integrity": "sha512-+9iuvG8QfaaUrrph+kpF24cXkH1YOOUeArRNYIxq1viYHZagBxrTno7cecY1Fa44tJeZvaoG+Djpkc3JwehN5g==", - "dependencies": { - "buffer-writer": "2.0.0", - "packet-reader": "1.0.0", - "pg-connection-string": "^2.6.2", - "pg-pool": "^3.6.1", - "pg-protocol": "^1.6.0", + "version": "8.12.0", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.12.0.tgz", + "integrity": "sha512-A+LHUSnwnxrnL/tZ+OLfqR1SxLN3c/pgDztZ47Rpbsd4jUytsTtwQo/TLPRzPJMp/1pbhYVhH9cuSZLAajNfjQ==", + "dependencies": { + "pg-connection-string": "^2.6.4", + "pg-pool": "^3.6.2", + "pg-protocol": "^1.6.1", "pg-types": "^2.1.0", "pgpass": "1.x" }, @@ -9724,17 +9709,17 @@ } }, "node_modules/pg-pool": { - "version": "3.6.1", - "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.1.tgz", - "integrity": "sha512-jizsIzhkIitxCGfPRzJn1ZdcosIt3pz9Sh3V01fm1vZnbnCMgmGl5wvGGdNN2EL9Rmb0EcFoCkixH4Pu+sP9Og==", + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.2.tgz", + "integrity": "sha512-Htjbg8BlwXqSBQ9V8Vjtc+vzf/6fVUuak/3/XXKA9oxZprwW3IMDQTGHP+KDmVL7rtd+R1QjbnCFPuTHm3G4hg==", "peerDependencies": { "pg": ">=8.0" } }, "node_modules/pg-protocol": { - "version": "1.6.0", - "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.0.tgz", - "integrity": "sha512-M+PDm637OY5WM307051+bsDia5Xej6d9IR4GwJse1qA1DIhiKlksvrneZOYQq42OM+spubpcNYEo2FcKQrDk+Q==" + "version": "1.6.1", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.1.tgz", + "integrity": "sha512-jPIlvgoD63hrEuihvIg+tJhoGjUsLPn6poJY9N5CnlPd91c2T18T/9zBtLxZSb1EhYxBRoZJtzScCaWlYLtktg==" }, "node_modules/pg-types": { "version": "2.2.0", @@ -9751,6 +9736,11 @@ "node": ">=4" } }, + "node_modules/pg/node_modules/pg-connection-string": { + "version": "2.6.4", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.6.4.tgz", + "integrity": "sha512-v+Z7W/0EO707aNMaAEfiGnGL9sxxumwLl2fJvCQtMn9Fxsg+lPpPkdcyBSv/KFgpGdYkMfn+EI1Or2EHjpgLCA==" + }, "node_modules/pgpass": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz", @@ -16716,11 +16706,6 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "dev": true }, - "buffer-writer": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", - "integrity": "sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==" - }, "callsites": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz", @@ -19233,11 +19218,6 @@ "integrity": "sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ==", "dev": true }, - "packet-reader": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/packet-reader/-/packet-reader-1.0.0.tgz", - "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==" - }, "parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -19289,18 +19269,23 @@ "dev": true }, "pg": { - "version": "8.11.3", - "resolved": "https://registry.npmjs.org/pg/-/pg-8.11.3.tgz", - "integrity": "sha512-+9iuvG8QfaaUrrph+kpF24cXkH1YOOUeArRNYIxq1viYHZagBxrTno7cecY1Fa44tJeZvaoG+Djpkc3JwehN5g==", + "version": "8.12.0", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.12.0.tgz", + "integrity": "sha512-A+LHUSnwnxrnL/tZ+OLfqR1SxLN3c/pgDztZ47Rpbsd4jUytsTtwQo/TLPRzPJMp/1pbhYVhH9cuSZLAajNfjQ==", "requires": { - "buffer-writer": "2.0.0", - "packet-reader": "1.0.0", "pg-cloudflare": "^1.1.1", - "pg-connection-string": "^2.6.2", - "pg-pool": "^3.6.1", - "pg-protocol": "^1.6.0", + "pg-connection-string": "^2.6.4", + "pg-pool": "^3.6.2", + "pg-protocol": "^1.6.1", "pg-types": "^2.1.0", "pgpass": "1.x" + }, + "dependencies": { + "pg-connection-string": { + "version": "2.6.4", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.6.4.tgz", + "integrity": "sha512-v+Z7W/0EO707aNMaAEfiGnGL9sxxumwLl2fJvCQtMn9Fxsg+lPpPkdcyBSv/KFgpGdYkMfn+EI1Or2EHjpgLCA==" + } } }, "pg-boss": { @@ -19356,15 +19341,15 @@ } }, "pg-pool": { - "version": "3.6.1", - "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.1.tgz", - "integrity": "sha512-jizsIzhkIitxCGfPRzJn1ZdcosIt3pz9Sh3V01fm1vZnbnCMgmGl5wvGGdNN2EL9Rmb0EcFoCkixH4Pu+sP9Og==", + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.2.tgz", + "integrity": "sha512-Htjbg8BlwXqSBQ9V8Vjtc+vzf/6fVUuak/3/XXKA9oxZprwW3IMDQTGHP+KDmVL7rtd+R1QjbnCFPuTHm3G4hg==", "requires": {} }, "pg-protocol": { - "version": "1.6.0", - "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.0.tgz", - "integrity": "sha512-M+PDm637OY5WM307051+bsDia5Xej6d9IR4GwJse1qA1DIhiKlksvrneZOYQq42OM+spubpcNYEo2FcKQrDk+Q==" + "version": "1.6.1", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.1.tgz", + "integrity": "sha512-jPIlvgoD63hrEuihvIg+tJhoGjUsLPn6poJY9N5CnlPd91c2T18T/9zBtLxZSb1EhYxBRoZJtzScCaWlYLtktg==" }, "pg-types": { "version": "2.2.0", diff --git a/package.json b/package.json index 56af9e57..2d102fb4 100644 --- a/package.json +++ b/package.json @@ -4,9 +4,9 @@ "description": "Supabase storage middleend", "main": "index.js", "scripts": { - "dev": "tsx watch ./src/server.ts | pino-pretty", + "dev": "tsx watch src/start/server.ts | pino-pretty", "build": "node ./build.js && resolve-tspaths", - "start": "NODE_ENV=production node dist/server.js", + "start": "NODE_ENV=production node dist/start/server.js", "migration:run": "tsx ./src/scripts/migrate-call.ts", "docs:export": "tsx ./src/scripts/export-docs.ts", "test:dummy-data": "tsx -r dotenv/config ./src/test/db/import-dummy-data.ts", @@ -69,7 +69,7 @@ "md5-file": "^5.0.0", "multistream": "^4.1.0", "object-sizeof": "^2.6.4", - "pg": "^8.11.3", + "pg": "^8.12.0", "pg-boss": "^9.0.3", "pg-listen": "^1.7.0", "pino": "^8.15.4", diff --git a/src/config.ts b/src/config.ts index 3df35300..d719e5f7 100644 --- a/src/config.ts +++ b/src/config.ts @@ -19,7 +19,7 @@ type StorageConfigType = { uploadFileSizeLimit: number uploadFileSizeLimitStandard?: number storageFilePath?: string - storageS3MaxSockets?: number + storageS3MaxSockets: number storageS3Bucket: string storageS3Endpoint?: string storageS3ForcePathStyle?: boolean @@ -69,6 +69,7 @@ type StorageConfigType = { webhookQueuePullInterval?: number webhookQueueTeamSize?: number webhookQueueConcurrency?: number + webhookMaxConnections: number adminDeleteQueueTeamSize?: number adminDeleteConcurrency?: number imageTransformationEnabled: boolean @@ -324,6 +325,9 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { ), webhookQueueTeamSize: parseInt(getOptionalConfigFromEnv('QUEUE_WEBHOOKS_TEAM_SIZE') || '50'), webhookQueueConcurrency: parseInt(getOptionalConfigFromEnv('QUEUE_WEBHOOK_CONCURRENCY') || '5'), + webhookMaxConnections: parseInt( + getOptionalConfigFromEnv('QUEUE_WEBHOOK_MAX_CONNECTIONS') || '500' + ), adminDeleteQueueTeamSize: parseInt( getOptionalConfigFromEnv('QUEUE_ADMIN_DELETE_TEAM_SIZE') || '50' ), diff --git a/src/http/plugins/storage.ts b/src/http/plugins/storage.ts index 5dd95d2d..8c6127c3 100644 --- a/src/http/plugins/storage.ts +++ b/src/http/plugins/storage.ts @@ -15,7 +15,7 @@ const { storageBackendType } = getConfig() const storageBackend = createStorageBackend(storageBackendType) -export const storage = fastifyPlugin(async (fastify) => { +export const storage = fastifyPlugin(async function storagePlugin(fastify) { fastify.decorateRequest('storage', undefined) fastify.addHook('preHandler', async (request) => { const database = new StorageKnexDB(request.db, { diff --git a/src/internal/database/multitenant-db.ts b/src/internal/database/multitenant-db.ts index 51913af2..cd947a42 100644 --- a/src/internal/database/multitenant-db.ts +++ b/src/internal/database/multitenant-db.ts @@ -5,9 +5,18 @@ const { multitenantDatabaseUrl } = getConfig() export const multitenantKnex = Knex({ client: 'pg', - connection: multitenantDatabaseUrl, + connection: { + connectionString: multitenantDatabaseUrl, + connectionTimeoutMillis: 5000, + }, + version: '12', pool: { min: 0, max: 10, + createTimeoutMillis: 5000, + acquireTimeoutMillis: 5000, + idleTimeoutMillis: 5000, + reapIntervalMillis: 1000, + createRetryIntervalMillis: 100, }, }) diff --git a/src/internal/monitoring/logger.ts b/src/internal/monitoring/logger.ts index 8f70e672..e6028adc 100644 --- a/src/internal/monitoring/logger.ts +++ b/src/internal/monitoring/logger.ts @@ -77,6 +77,8 @@ interface InfoLog { export const logSchema = { info: (logger: BaseLogger, message: string, log: InfoLog) => logger.info(log, message), + warning: (logger: BaseLogger, message: string, log: InfoLog | ErrorLog) => + logger.warn(log, message), request: (logger: BaseLogger, message: string, log: RequestLog) => { if (!log.res) { logger.warn(log, message) diff --git a/src/server.ts b/src/start/server.ts similarity index 94% rename from src/server.ts rename to src/start/server.ts index e60b3e8d..ea223dd0 100644 --- a/src/server.ts +++ b/src/start/server.ts @@ -1,10 +1,10 @@ -import './internal/monitoring/otel' +import '@internal/monitoring/otel' import { FastifyInstance } from 'fastify' import { IncomingMessage, Server, ServerResponse } from 'http' -import build from './app' -import buildAdmin from './admin-app' -import { getConfig } from './config' +import build from '../app' +import buildAdmin from '../admin-app' +import { getConfig } from '../config' import { runMultitenantMigrations, runMigrationsOnTenant, @@ -16,6 +16,7 @@ import { logger, logSchema } from '@internal/monitoring' import { Queue } from '@internal/queue' import { registerWorkers } from '@storage/events' import { AsyncAbortController } from '@internal/concurrency' + import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown' const shutdownSignal = new AsyncAbortController() @@ -46,7 +47,7 @@ main() * Start Storage API server */ async function main() { - const { databaseURL, isMultitenant, pgQueueEnable, pgQueueEnableWorkers } = getConfig() + const { databaseURL, isMultitenant, pgQueueEnable } = getConfig() // Migrations if (isMultitenant) { diff --git a/src/shutdown.ts b/src/start/shutdown.ts similarity index 100% rename from src/shutdown.ts rename to src/start/shutdown.ts diff --git a/src/worker.ts b/src/start/worker.ts similarity index 96% rename from src/worker.ts rename to src/start/worker.ts index 2eea0c86..b14fc966 100644 --- a/src/worker.ts +++ b/src/start/worker.ts @@ -4,8 +4,8 @@ import { listenForTenantUpdate, PubSub } from '@internal/database' import { AsyncAbortController } from '@internal/concurrency' import { registerWorkers } from '@storage/events' -import { getConfig } from './config' -import adminApp from './admin-app' +import { getConfig } from '../config' +import adminApp from '../admin-app' import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown' const shutdownSignal = new AsyncAbortController() diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index 6c97e278..9e53ddbd 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -101,7 +101,12 @@ export interface Database { listBuckets(columns: string): Promise mustLockObject(bucketId: string, objectName: string, version?: string): Promise - waitObjectLock(bucketId: string, objectName: string, version?: string): Promise + waitObjectLock( + bucketId: string, + objectName: string, + version?: string, + opts?: { timeout?: number } + ): Promise updateBucket( bucketId: string, diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 6ab3261c..c771576b 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -530,10 +530,38 @@ export class StorageKnexDB implements Database { }) } - async waitObjectLock(bucketId: string, objectName: string, version?: string) { + async waitObjectLock( + bucketId: string, + objectName: string, + version?: string, + opts?: { timeout: number } + ) { return this.runQuery('WaitObjectLock', async (knex) => { const hash = hashStringToInt(`${bucketId}/${objectName}${version ? `/${version}` : ''}`) - await knex.raw(`SELECT pg_advisory_xact_lock(?)`, [hash]) + const query = knex.raw(`SELECT pg_advisory_xact_lock(?)`, [hash]) + + if (opts?.timeout) { + let timeoutInterval: undefined | NodeJS.Timeout + + try { + await Promise.race([ + query, + new Promise( + (_, reject) => + (timeoutInterval = setTimeout(() => reject(ERRORS.LockTimeout()), opts.timeout)) + ), + ]) + } catch (e) { + throw e + } finally { + if (timeoutInterval) { + clearTimeout(timeoutInterval) + } + } + } else { + await query + } + return true }) } diff --git a/src/storage/events/base-event.ts b/src/storage/events/base-event.ts index 846dc20d..c05ac931 100644 --- a/src/storage/events/base-event.ts +++ b/src/storage/events/base-event.ts @@ -6,7 +6,7 @@ import { StorageKnexDB } from '../database' import { createAgent, createStorageBackend } from '../backend' import { getConfig } from '../../config' import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics' -import { logger } from '@internal/monitoring' +import { logger, logSchema } from '@internal/monitoring' export interface BasePayload { $version?: string @@ -187,7 +187,7 @@ export abstract class BaseEvent> { if (!pgQueueEnable) { return constructor.handle({ - id: '', + id: '__sync', name: constructor.getQueueName(), data: { region, @@ -200,25 +200,53 @@ export abstract class BaseEvent> { const timer = QueueJobSchedulingTime.startTimer() const sendOptions = constructor.getQueueOptions(this.payload) - const res = await Queue.getInstance().send({ - name: constructor.getQueueName(), - data: { - region, - ...this.payload, - $version: constructor.version, - }, - options: sendOptions, - }) + try { + const res = await Queue.getInstance().send({ + name: constructor.getQueueName(), + data: { + region, + ...this.payload, + $version: constructor.version, + }, + options: sendOptions, + }) - timer({ - name: constructor.getQueueName(), - }) + timer({ + name: constructor.getQueueName(), + }) - QueueJobScheduled.inc({ - name: constructor.getQueueName(), - }) + QueueJobScheduled.inc({ + name: constructor.getQueueName(), + }) - return res + return res + } catch (e) { + if (e instanceof Error && e.message.includes('ECONNREFUSED')) { + // in case we get a ECONNREFUSED (database disconnected) + // we will try to send the event synchronously + // this allows for the logic to still work, even if the queue is down + // it will add a bit more latency though. + logSchema.warning( + logger, + `[Queue Sender] Error while sending job to queue, sending synchronously`, + { + type: 'queue', + error: e, + metadata: JSON.stringify(this.payload), + } + ) + return constructor.handle({ + id: '__sync', + name: constructor.getQueueName(), + data: { + region, + ...this.payload, + $version: constructor.version, + }, + }) + } + throw e + } } async sendSlowRetryQueue() { diff --git a/src/storage/events/webhook.ts b/src/storage/events/webhook.ts index 5e20b776..31842812 100644 --- a/src/storage/events/webhook.ts +++ b/src/storage/events/webhook.ts @@ -12,6 +12,7 @@ const { webhookQueuePullInterval, webhookQueueTeamSize, webhookQueueConcurrency, + webhookMaxConnections, } = getConfig() interface WebhookEvent { @@ -31,12 +32,12 @@ interface WebhookEvent { const httpAgent = webhookURL?.startsWith('https://') ? { httpsAgent: new HttpsAgent({ - maxSockets: 100, + maxSockets: webhookMaxConnections, }), } : { httpAgent: new HttpAgent({ - maxSockets: 100, + maxSockets: webhookMaxConnections, }), } diff --git a/src/storage/object.ts b/src/storage/object.ts index 6ccd0aa8..0642ea1e 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -395,7 +395,9 @@ export class ObjectStorage { const metadata = await this.backend.headObject(storageS3Bucket, s3DestinationKey, newVersion) return this.db.asSuperUser().withTransaction(async (db) => { - await db.waitObjectLock(this.bucketId, destinationObjectName) + await db.waitObjectLock(this.bucketId, destinationObjectName, undefined, { + timeout: 5000, + }) const sourceObject = await db.findObject(this.bucketId, sourceObjectName, 'id', { forUpdate: true, diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index 663272c2..210689da 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -9,6 +9,7 @@ import { getFileSizeLimit, isEmptyFolder } from './limits' import { Database } from './database' import { ObjectAdminDelete, ObjectCreatedPostEvent, ObjectCreatedPutEvent } from './events' import { getConfig } from '../config' +import { logger, logSchema } from '@internal/monitoring' interface UploaderOptions extends UploadObjectOptions { fileSizeLimit?: number | null @@ -141,20 +142,20 @@ export class Uploader { uploadType?: 'standard' | 's3' | 'resumable' }) { try { - return await this.db.withTransaction(async (db) => { - await db.waitObjectLock(bucketId, objectName) + return await this.db.asSuperUser().withTransaction(async (db) => { + await db.waitObjectLock(bucketId, objectName, undefined, { + timeout: 5000, + }) - const currentObj = await db - .asSuperUser() - .findObject(bucketId, objectName, 'id, version, metadata', { - forUpdate: true, - dontErrorOnEmpty: true, - }) + const currentObj = await db.findObject(bucketId, objectName, 'id, version, metadata', { + forUpdate: true, + dontErrorOnEmpty: true, + }) const isNew = !Boolean(currentObj) // update object - const newObject = await db.asSuperUser().upsertObject({ + const newObject = await db.upsertObject({ bucket_id: bucketId, name: objectName, metadata: objectMetadata, @@ -180,14 +181,28 @@ export class Uploader { const event = isUpsert && !isNew ? ObjectCreatedPutEvent : ObjectCreatedPostEvent events.push( - event.sendWebhook({ - tenant: this.db.tenant(), - name: objectName, - bucketId: bucketId, - metadata: objectMetadata, - reqId: this.db.reqId, - uploadType, - }) + event + .sendWebhook({ + tenant: this.db.tenant(), + name: objectName, + bucketId: bucketId, + metadata: objectMetadata, + reqId: this.db.reqId, + uploadType, + }) + .catch(() => { + logSchema.error(logger, 'Failed to send webhook', { + type: 'event', + project: this.db.tenantId, + metadata: JSON.stringify({ + name: objectName, + bucketId: bucketId, + metadata: objectMetadata, + reqId: this.db.reqId, + uploadType, + }), + }) + }) ) await Promise.all(events)