Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: connection pool #578

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 0 additions & 69 deletions src/http/plugins/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,40 +52,6 @@ export const db = fastifyPlugin(
operation: request.operation?.type,
})
})

fastify.addHook('onSend', async (request, reply, payload) => {
if (request.db) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
})
}
return payload
})

fastify.addHook('onTimeout', async (request) => {
if (request.db) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
})
}
})

fastify.addHook('onRequestAbort', async (request) => {
if (request.db) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
})
}
})
},
{ name: 'db-init' }
)
Expand Down Expand Up @@ -113,41 +79,6 @@ export const dbSuperUser = fastifyPlugin<DbSuperUserPluginOptions>(
disableHostCheck: opts.disableHostCheck,
})
})

fastify.addHook('onSend', async (request, reply, payload) => {
if (request.db) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
})
}

return payload
})

fastify.addHook('onTimeout', async (request) => {
if (request.db) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
})
}
})

fastify.addHook('onRequestAbort', async (request) => {
if (request.db) {
request.db.dispose().catch((e) => {
logSchema.error(request.log, 'Error disposing db connection', {
type: 'db-connection',
error: e,
})
})
}
})
},
{ name: 'db-superuser-init' }
)
Expand Down
6 changes: 0 additions & 6 deletions src/http/routes/tus/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ export async function onIncomingRequest(
) {
const req = rawReq as MultiPartRequest

res.on('finish', () => {
req.upload.db.dispose().catch((e) => {
req.log.error({ error: e }, 'Error disposing db connection')
})
})

const uploadID = UploadId.fromString(id)

req.upload.resources = [`${uploadID.bucket}/${uploadID.objectName}`]
Expand Down
4 changes: 2 additions & 2 deletions src/internal/database/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getConfig } from '../../config'
import { getTenantConfig } from './tenant'
import { User, TenantConnection } from './connection'
import { User, TenantConnection, ConnectionManager } from './connection'
import { ERRORS } from '@internal/errors'

interface ConnectionOptions {
Expand All @@ -24,7 +24,7 @@ export async function getPostgresConnection(options: ConnectionOptions): Promise
disableHostCheck: options.disableHostCheck,
})

return await TenantConnection.create({
return ConnectionManager.acquire({
...dbCredentials,
...options,
})
Expand Down
100 changes: 61 additions & 39 deletions src/internal/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,54 +43,66 @@ export interface User {
}

const multiTenantLRUConfig = {
ttl: 1000 * 10,
ttl: 1000 * 30,
updateAgeOnGet: true,
checkAgeOnGet: true,
noDisponseOnSet: true,
}
export const connections = new TTLCache<string, Knex>({
...(isMultitenant ? multiTenantLRUConfig : { max: 1, ttl: Infinity }),
dispose: async (pool) => {
if (!pool) return
try {
await pool.destroy()
} catch (e) {
logSchema.error(logger, 'pool was not able to be destroyed', {
type: 'db',
error: e,
})
}
},
})

export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.split(',')].filter(
Boolean
)

export class TenantConnection {
public readonly role: string

constructor(protected readonly pool: Knex, protected readonly options: TenantConnectionOptions) {
this.role = options.user.payload.role || 'anon'
}
/**
* Manages connections to tenant databases
* Connections pools expire after a certain amount of time as well as idle connections
*/
export class ConnectionManager {
/**
* Connections map, the string is the connection string and the value is the knex pool
* @protected
*/
protected static connections = new TTLCache<string, Knex>({
...multiTenantLRUConfig,
dispose: async (pool) => {
if (!pool) return
try {
await pool.destroy()
} catch (e) {
logSchema.error(logger, 'pool was not able to be destroyed', {
type: 'db',
error: e,
})
}
},
})

/**
* Stop the pool manager and destroy all connections
*/
static stop() {
this.connections.cancelTimer()
const promises: Promise<void>[] = []

for (const [connectionString, pool] of connections) {
for (const [connectionString, pool] of this.connections) {
promises.push(pool.destroy())
connections.delete(connectionString)
this.connections.delete(connectionString)
}

return Promise.allSettled(promises)
}

static async create(options: TenantConnectionOptions) {
/**
* Acquire a pool for a tenant database
* @param options
*/
static acquirePool(options: TenantConnectionOptions): Knex {
const connectionString = options.dbUrl

let knexPool = connections.get(connectionString)
let knexPool = this.connections.get(connectionString)

if (knexPool) {
return new this(knexPool, options)
return knexPool
}

const isExternalPool = Boolean(options.isExternalPool)
Expand All @@ -101,12 +113,14 @@ export class TenantConnection {
searchPath: isExternalPool ? undefined : searchPath,
pool: {
min: 0,
max: isExternalPool ? 1 : options.maxConnections || databaseMaxConnections,
max: isExternalPool
? options.maxConnections || databaseMaxConnections
: databaseMaxConnections,
acquireTimeoutMillis: databaseConnectionTimeout,
idleTimeoutMillis: isExternalPool
? options.idleTimeoutMillis || 100
? options.idleTimeoutMillis || 5000
: databaseFreePoolAfterInactivity,
reapIntervalMillis: isExternalPool ? 50 : undefined,
reapIntervalMillis: isExternalPool ? 1000 : undefined,
},
connection: {
connectionString: connectionString,
Expand Down Expand Up @@ -134,11 +148,9 @@ export class TenantConnection {
DbActivePool.dec({ is_external: isExternalPool.toString() })
})

if (!isExternalPool) {
connections.set(connectionString, knexPool)
}
this.connections.set(connectionString, knexPool)

return new this(knexPool, options)
return knexPool
}

protected static sslSettings() {
Expand All @@ -147,19 +159,28 @@ export class TenantConnection {
}
return {}
}
}

/**
* Represent a connection to a tenant database
*/
export class TenantConnection {
public readonly role: string

constructor(protected readonly options: TenantConnectionOptions) {
this.role = options.user.payload.role || 'anon'
}

async dispose() {
if (this.options.isExternalPool) {
await this.pool.destroy()
}
// TODO: remove this method
}

async transaction(instance?: Knex) {
try {
const tnx = await retry(
async (bail) => {
try {
const pool = instance || this.pool
const pool = instance || ConnectionManager.acquirePool(this.options)
return await pool.transaction()
} catch (e) {
if (
Expand Down Expand Up @@ -222,10 +243,11 @@ export class TenantConnection {
}

asSuperUser() {
return new TenantConnection(this.pool, {
const newOptions = {
...this.options,
user: this.options.superUser,
})
}
return new TenantConnection(newOptions)
}

async setScope(tnx: Knex) {
Expand Down
4 changes: 2 additions & 2 deletions src/start/shutdown.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { logger, logSchema } from '@internal/monitoring'
import { AsyncAbortController } from '@internal/concurrency'
import { multitenantKnex, TenantConnection } from '@internal/database'
import { ConnectionManager, multitenantKnex } from '@internal/database'
import http from 'http'

/**
Expand Down Expand Up @@ -61,7 +61,7 @@ export async function shutdown(serverSignal: AsyncAbortController) {
errors.push(e)
})

await TenantConnection.stop().catch((e) => {
await ConnectionManager.stop().catch((e) => {
logSchema.error(logger, 'Failed to close tenant connection', {
type: 'shutdown',
error: e,
Expand Down
2 changes: 0 additions & 2 deletions src/storage/database/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ export interface Database {
searchObjects(bucketId: string, prefix: string, options: SearchObjectOption): Promise<Obj[]>
healthcheck(): Promise<void>

destroyConnection(): Promise<void>

createMultipartUpload(
uploadId: string,
bucketId: string,
Expand Down
4 changes: 0 additions & 4 deletions src/storage/database/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -696,10 +696,6 @@ export class StorageKnexDB implements Database {
})
}

destroyConnection() {
return this.connection.dispose()
}

/**
* Excludes columns selection if a specific migration wasn't run
* @param columns
Expand Down
15 changes: 0 additions & 15 deletions src/storage/events/object-admin-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,6 @@ export class ObjectAdminDelete extends BaseEvent<ObjectDeleteEvent> {
`[Admin]: ObjectAdminDelete ${s3Key} - FAILED`
)
throw e
} finally {
if (storage) {
const tenant = storage.db.tenant()
storage.db
.destroyConnection()
.then(() => {
// no-op
})
.catch((e) => {
logger.error(
{ error: e },
`[Admin]: ObjectAdminDelete ${tenant.ref} - FAILED DISPOSING CONNECTION`
)
})
}
}
}
}
Loading