diff --git a/src/http/plugins/db.ts b/src/http/plugins/db.ts index 369ee12f..e59df855 100644 --- a/src/http/plugins/db.ts +++ b/src/http/plugins/db.ts @@ -52,40 +52,6 @@ export const db = fastifyPlugin( operation: request.operation?.type, }) }) - - fastify.addHook('onSend', async (request, reply, payload) => { - if (request.db) { - request.db.dispose().catch((e) => { - logSchema.error(request.log, 'Error disposing db connection', { - type: 'db-connection', - error: e, - }) - }) - } - return payload - }) - - fastify.addHook('onTimeout', async (request) => { - if (request.db) { - request.db.dispose().catch((e) => { - logSchema.error(request.log, 'Error disposing db connection', { - type: 'db-connection', - error: e, - }) - }) - } - }) - - fastify.addHook('onRequestAbort', async (request) => { - if (request.db) { - request.db.dispose().catch((e) => { - logSchema.error(request.log, 'Error disposing db connection', { - type: 'db-connection', - error: e, - }) - }) - } - }) }, { name: 'db-init' } ) @@ -113,41 +79,6 @@ export const dbSuperUser = fastifyPlugin( disableHostCheck: opts.disableHostCheck, }) }) - - fastify.addHook('onSend', async (request, reply, payload) => { - if (request.db) { - request.db.dispose().catch((e) => { - logSchema.error(request.log, 'Error disposing db connection', { - type: 'db-connection', - error: e, - }) - }) - } - - return payload - }) - - fastify.addHook('onTimeout', async (request) => { - if (request.db) { - request.db.dispose().catch((e) => { - logSchema.error(request.log, 'Error disposing db connection', { - type: 'db-connection', - error: e, - }) - }) - } - }) - - fastify.addHook('onRequestAbort', async (request) => { - if (request.db) { - request.db.dispose().catch((e) => { - logSchema.error(request.log, 'Error disposing db connection', { - type: 'db-connection', - error: e, - }) - }) - } - }) }, { name: 'db-superuser-init' } ) diff --git a/src/http/routes/tus/lifecycle.ts b/src/http/routes/tus/lifecycle.ts index 5769d995..1d8eeeea 100644 --- a/src/http/routes/tus/lifecycle.ts +++ b/src/http/routes/tus/lifecycle.ts @@ -37,12 +37,6 @@ export async function onIncomingRequest( ) { const req = rawReq as MultiPartRequest - res.on('finish', () => { - req.upload.db.dispose().catch((e) => { - req.log.error({ error: e }, 'Error disposing db connection') - }) - }) - const uploadID = UploadId.fromString(id) req.upload.resources = [`${uploadID.bucket}/${uploadID.objectName}`] diff --git a/src/internal/database/client.ts b/src/internal/database/client.ts index 30c3c173..4f296017 100644 --- a/src/internal/database/client.ts +++ b/src/internal/database/client.ts @@ -1,6 +1,6 @@ import { getConfig } from '../../config' import { getTenantConfig } from './tenant' -import { User, TenantConnection } from './connection' +import { User, TenantConnection, ConnectionManager } from './connection' import { ERRORS } from '@internal/errors' interface ConnectionOptions { @@ -24,7 +24,7 @@ export async function getPostgresConnection(options: ConnectionOptions): Promise disableHostCheck: options.disableHostCheck, }) - return await TenantConnection.create({ + return ConnectionManager.acquire({ ...dbCredentials, ...options, }) diff --git a/src/internal/database/connection.ts b/src/internal/database/connection.ts index ceb2d20e..74ce3cd2 100644 --- a/src/internal/database/connection.ts +++ b/src/internal/database/connection.ts @@ -43,54 +43,66 @@ export interface User { } const multiTenantLRUConfig = { - ttl: 1000 * 10, + ttl: 1000 * 30, updateAgeOnGet: true, checkAgeOnGet: true, + noDisponseOnSet: true, } -export const connections = new TTLCache({ - ...(isMultitenant ? multiTenantLRUConfig : { max: 1, ttl: Infinity }), - dispose: async (pool) => { - if (!pool) return - try { - await pool.destroy() - } catch (e) { - logSchema.error(logger, 'pool was not able to be destroyed', { - type: 'db', - error: e, - }) - } - }, -}) export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.split(',')].filter( Boolean ) -export class TenantConnection { - public readonly role: string - - constructor(protected readonly pool: Knex, protected readonly options: TenantConnectionOptions) { - this.role = options.user.payload.role || 'anon' - } +/** + * Manages connections to tenant databases + * Connections pools expire after a certain amount of time as well as idle connections + */ +export class ConnectionManager { + /** + * Connections map, the string is the connection string and the value is the knex pool + * @protected + */ + protected static connections = new TTLCache({ + ...multiTenantLRUConfig, + dispose: async (pool) => { + if (!pool) return + try { + await pool.destroy() + } catch (e) { + logSchema.error(logger, 'pool was not able to be destroyed', { + type: 'db', + error: e, + }) + } + }, + }) + /** + * Stop the pool manager and destroy all connections + */ static stop() { + this.connections.cancelTimer() const promises: Promise[] = [] - for (const [connectionString, pool] of connections) { + for (const [connectionString, pool] of this.connections) { promises.push(pool.destroy()) - connections.delete(connectionString) + this.connections.delete(connectionString) } return Promise.allSettled(promises) } - static async create(options: TenantConnectionOptions) { + /** + * Acquire a pool for a tenant database + * @param options + */ + static acquirePool(options: TenantConnectionOptions): Knex { const connectionString = options.dbUrl - let knexPool = connections.get(connectionString) + let knexPool = this.connections.get(connectionString) if (knexPool) { - return new this(knexPool, options) + return knexPool } const isExternalPool = Boolean(options.isExternalPool) @@ -101,12 +113,14 @@ export class TenantConnection { searchPath: isExternalPool ? undefined : searchPath, pool: { min: 0, - max: isExternalPool ? 1 : options.maxConnections || databaseMaxConnections, + max: isExternalPool + ? options.maxConnections || databaseMaxConnections + : databaseMaxConnections, acquireTimeoutMillis: databaseConnectionTimeout, idleTimeoutMillis: isExternalPool - ? options.idleTimeoutMillis || 100 + ? options.idleTimeoutMillis || 5000 : databaseFreePoolAfterInactivity, - reapIntervalMillis: isExternalPool ? 50 : undefined, + reapIntervalMillis: isExternalPool ? 1000 : undefined, }, connection: { connectionString: connectionString, @@ -134,11 +148,9 @@ export class TenantConnection { DbActivePool.dec({ is_external: isExternalPool.toString() }) }) - if (!isExternalPool) { - connections.set(connectionString, knexPool) - } + this.connections.set(connectionString, knexPool) - return new this(knexPool, options) + return knexPool } protected static sslSettings() { @@ -147,11 +159,20 @@ export class TenantConnection { } return {} } +} + +/** + * Represent a connection to a tenant database + */ +export class TenantConnection { + public readonly role: string + + constructor(protected readonly options: TenantConnectionOptions) { + this.role = options.user.payload.role || 'anon' + } async dispose() { - if (this.options.isExternalPool) { - await this.pool.destroy() - } + // TODO: remove this method } async transaction(instance?: Knex) { @@ -159,7 +180,7 @@ export class TenantConnection { const tnx = await retry( async (bail) => { try { - const pool = instance || this.pool + const pool = instance || ConnectionManager.acquirePool(this.options) return await pool.transaction() } catch (e) { if ( @@ -222,10 +243,11 @@ export class TenantConnection { } asSuperUser() { - return new TenantConnection(this.pool, { + const newOptions = { ...this.options, user: this.options.superUser, - }) + } + return new TenantConnection(newOptions) } async setScope(tnx: Knex) { diff --git a/src/start/shutdown.ts b/src/start/shutdown.ts index cd571dfa..e2bf03ea 100644 --- a/src/start/shutdown.ts +++ b/src/start/shutdown.ts @@ -1,6 +1,6 @@ import { logger, logSchema } from '@internal/monitoring' import { AsyncAbortController } from '@internal/concurrency' -import { multitenantKnex, TenantConnection } from '@internal/database' +import { ConnectionManager, multitenantKnex } from '@internal/database' import http from 'http' /** @@ -61,7 +61,7 @@ export async function shutdown(serverSignal: AsyncAbortController) { errors.push(e) }) - await TenantConnection.stop().catch((e) => { + await ConnectionManager.stop().catch((e) => { logSchema.error(logger, 'Failed to close tenant connection', { type: 'shutdown', error: e, diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index 9e95f1b9..182976b3 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -146,8 +146,6 @@ export interface Database { searchObjects(bucketId: string, prefix: string, options: SearchObjectOption): Promise healthcheck(): Promise - destroyConnection(): Promise - createMultipartUpload( uploadId: string, bucketId: string, diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 0d6cabac..f9f4f340 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -696,10 +696,6 @@ export class StorageKnexDB implements Database { }) } - destroyConnection() { - return this.connection.dispose() - } - /** * Excludes columns selection if a specific migration wasn't run * @param columns diff --git a/src/storage/events/object-admin-delete.ts b/src/storage/events/object-admin-delete.ts index 5f7152b2..86f9e1b4 100644 --- a/src/storage/events/object-admin-delete.ts +++ b/src/storage/events/object-admin-delete.ts @@ -74,21 +74,6 @@ export class ObjectAdminDelete extends BaseEvent { `[Admin]: ObjectAdminDelete ${s3Key} - FAILED` ) throw e - } finally { - if (storage) { - const tenant = storage.db.tenant() - storage.db - .destroyConnection() - .then(() => { - // no-op - }) - .catch((e) => { - logger.error( - { error: e }, - `[Admin]: ObjectAdminDelete ${tenant.ref} - FAILED DISPOSING CONNECTION` - ) - }) - } } } }