Skip to content

Commit

Permalink
fix: custom webhooks-max-sockets env
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jun 30, 2024
1 parent 74492da commit 077e2ae
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 108 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ COPY --from=production-deps /app/node_modules node_modules
COPY --from=build /app/dist dist

EXPOSE 5000
CMD ["node", "dist/server.js"]
CMD ["node", "dist/start/server.js"]
91 changes: 38 additions & 53 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"description": "Supabase storage middleend",
"main": "index.js",
"scripts": {
"dev": "tsx watch ./src/server.ts | pino-pretty",
"dev": "tsx watch src/start/server.ts | pino-pretty",
"build": "node ./build.js && resolve-tspaths",
"start": "NODE_ENV=production node dist/server.js",
"start": "NODE_ENV=production node dist/start/server.js",
"migration:run": "tsx ./src/scripts/migrate-call.ts",
"docs:export": "tsx ./src/scripts/export-docs.ts",
"test:dummy-data": "tsx -r dotenv/config ./src/test/db/import-dummy-data.ts",
Expand Down Expand Up @@ -69,7 +69,7 @@
"md5-file": "^5.0.0",
"multistream": "^4.1.0",
"object-sizeof": "^2.6.4",
"pg": "^8.11.3",
"pg": "^8.12.0",
"pg-boss": "^9.0.3",
"pg-listen": "^1.7.0",
"pino": "^8.15.4",
Expand Down
6 changes: 5 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type StorageConfigType = {
uploadFileSizeLimit: number
uploadFileSizeLimitStandard?: number
storageFilePath?: string
storageS3MaxSockets?: number
storageS3MaxSockets: number
storageS3Bucket: string
storageS3Endpoint?: string
storageS3ForcePathStyle?: boolean
Expand Down Expand Up @@ -69,6 +69,7 @@ type StorageConfigType = {
webhookQueuePullInterval?: number
webhookQueueTeamSize?: number
webhookQueueConcurrency?: number
webhookMaxConnections: number
adminDeleteQueueTeamSize?: number
adminDeleteConcurrency?: number
imageTransformationEnabled: boolean
Expand Down Expand Up @@ -324,6 +325,9 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
),
webhookQueueTeamSize: parseInt(getOptionalConfigFromEnv('QUEUE_WEBHOOKS_TEAM_SIZE') || '50'),
webhookQueueConcurrency: parseInt(getOptionalConfigFromEnv('QUEUE_WEBHOOK_CONCURRENCY') || '5'),
webhookMaxConnections: parseInt(
getOptionalConfigFromEnv('QUEUE_WEBHOOK_MAX_CONNECTIONS') || '500'
),
adminDeleteQueueTeamSize: parseInt(
getOptionalConfigFromEnv('QUEUE_ADMIN_DELETE_TEAM_SIZE') || '50'
),
Expand Down
2 changes: 1 addition & 1 deletion src/http/plugins/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const { storageBackendType } = getConfig()

const storageBackend = createStorageBackend(storageBackendType)

export const storage = fastifyPlugin(async (fastify) => {
export const storage = fastifyPlugin(async function storagePlugin(fastify) {
fastify.decorateRequest('storage', undefined)
fastify.addHook('preHandler', async (request) => {
const database = new StorageKnexDB(request.db, {
Expand Down
11 changes: 10 additions & 1 deletion src/internal/database/multitenant-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@ const { multitenantDatabaseUrl } = getConfig()

export const multitenantKnex = Knex({
client: 'pg',
connection: multitenantDatabaseUrl,
connection: {
connectionString: multitenantDatabaseUrl,
connectionTimeoutMillis: 5000,
},
version: '12',
pool: {
min: 0,
max: 10,
createTimeoutMillis: 5000,
acquireTimeoutMillis: 5000,
idleTimeoutMillis: 5000,
reapIntervalMillis: 1000,
createRetryIntervalMillis: 100,
},
})
2 changes: 2 additions & 0 deletions src/internal/monitoring/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ interface InfoLog {

export const logSchema = {
info: (logger: BaseLogger, message: string, log: InfoLog) => logger.info(log, message),
warning: (logger: BaseLogger, message: string, log: InfoLog | ErrorLog) =>
logger.warn(log, message),
request: (logger: BaseLogger, message: string, log: RequestLog) => {
if (!log.res) {
logger.warn(log, message)
Expand Down
11 changes: 6 additions & 5 deletions src/server.ts → src/start/server.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import './internal/monitoring/otel'
import '@internal/monitoring/otel'
import { FastifyInstance } from 'fastify'
import { IncomingMessage, Server, ServerResponse } from 'http'

import build from './app'
import buildAdmin from './admin-app'
import { getConfig } from './config'
import build from '../app'
import buildAdmin from '../admin-app'
import { getConfig } from '../config'
import {
runMultitenantMigrations,
runMigrationsOnTenant,
Expand All @@ -16,6 +16,7 @@ import { logger, logSchema } from '@internal/monitoring'
import { Queue } from '@internal/queue'
import { registerWorkers } from '@storage/events'
import { AsyncAbortController } from '@internal/concurrency'

import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown'

const shutdownSignal = new AsyncAbortController()
Expand Down Expand Up @@ -46,7 +47,7 @@ main()
* Start Storage API server
*/
async function main() {
const { databaseURL, isMultitenant, pgQueueEnable, pgQueueEnableWorkers } = getConfig()
const { databaseURL, isMultitenant, pgQueueEnable } = getConfig()

// Migrations
if (isMultitenant) {
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions src/worker.ts → src/start/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { listenForTenantUpdate, PubSub } from '@internal/database'
import { AsyncAbortController } from '@internal/concurrency'
import { registerWorkers } from '@storage/events'

import { getConfig } from './config'
import adminApp from './admin-app'
import { getConfig } from '../config'
import adminApp from '../admin-app'
import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown'

const shutdownSignal = new AsyncAbortController()
Expand Down
7 changes: 6 additions & 1 deletion src/storage/database/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ export interface Database {

listBuckets(columns: string): Promise<Bucket[]>
mustLockObject(bucketId: string, objectName: string, version?: string): Promise<boolean>
waitObjectLock(bucketId: string, objectName: string, version?: string): Promise<boolean>
waitObjectLock(
bucketId: string,
objectName: string,
version?: string,
opts?: { timeout?: number }
): Promise<boolean>

updateBucket(
bucketId: string,
Expand Down
32 changes: 30 additions & 2 deletions src/storage/database/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,38 @@ export class StorageKnexDB implements Database {
})
}

async waitObjectLock(bucketId: string, objectName: string, version?: string) {
async waitObjectLock(
bucketId: string,
objectName: string,
version?: string,
opts?: { timeout: number }
) {
return this.runQuery('WaitObjectLock', async (knex) => {
const hash = hashStringToInt(`${bucketId}/${objectName}${version ? `/${version}` : ''}`)
await knex.raw<any>(`SELECT pg_advisory_xact_lock(?)`, [hash])
const query = knex.raw<any>(`SELECT pg_advisory_xact_lock(?)`, [hash])

if (opts?.timeout) {
let timeoutInterval: undefined | NodeJS.Timeout

try {
await Promise.race([
query,
new Promise(
(_, reject) =>
(timeoutInterval = setTimeout(() => reject(ERRORS.LockTimeout()), opts.timeout))
),
])
} catch (e) {
throw e
} finally {
if (timeoutInterval) {
clearTimeout(timeoutInterval)
}
}
} else {
await query
}

return true
})
}
Expand Down
Loading

0 comments on commit 077e2ae

Please sign in to comment.