Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a singleton http agent for queue events + Reduce metrics cardinality #391

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dotenv from 'dotenv'

type StorageBackendType = 'file' | 's3'
export type StorageBackendType = 'file' | 's3'

type StorageConfigType = {
version: string
Expand All @@ -12,7 +12,7 @@ type StorageConfigType = {
encryptionKey: string
fileSizeLimit: number
fileStoragePath?: string
globalS3Protocol?: 'http' | 'https' | string
globalS3Protocol: 'http' | 'https'
globalS3MaxSockets?: number
globalS3Bucket: string
globalS3Endpoint?: string
Expand Down Expand Up @@ -111,7 +111,9 @@ export function getConfig(): StorageConfigType {
fileSizeLimit: Number(getConfigFromEnv('FILE_SIZE_LIMIT')),
fileStoragePath: getOptionalConfigFromEnv('FILE_STORAGE_BACKEND_PATH'),
globalS3MaxSockets: parseInt(getOptionalConfigFromEnv('GLOBAL_S3_MAX_SOCKETS') || '200', 10),
globalS3Protocol: getOptionalConfigFromEnv('GLOBAL_S3_PROTOCOL') || 'https',
globalS3Protocol: (getOptionalConfigFromEnv('GLOBAL_S3_PROTOCOL') || 'https') as
| 'http'
| 'https',
globalS3Bucket: getConfigFromEnv('GLOBAL_S3_BUCKET'),
globalS3Endpoint: getOptionalConfigFromEnv('GLOBAL_S3_ENDPOINT'),
globalS3ForcePathStyle: getOptionalConfigFromEnv('GLOBAL_S3_FORCE_PATH_STYLE') === 'true',
Expand Down
6 changes: 2 additions & 4 deletions src/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,22 @@ export class TenantConnection {
acquireConnectionTimeout: databaseConnectionTimeout,
})

DbActivePool.inc({ tenant_id: options.tenantId, is_external: isExternalPool.toString() })
DbActivePool.inc({ is_external: isExternalPool.toString() })

knexPool.client.pool.on('createSuccess', () => {
DbActiveConnection.inc({
tenant_id: options.tenantId,
is_external: isExternalPool.toString(),
})
})

knexPool.client.pool.on('destroySuccess', () => {
DbActiveConnection.dec({
tenant_id: options.tenantId,
is_external: isExternalPool.toString(),
})
})

knexPool.client.pool.on('poolDestroySuccess', () => {
DbActivePool.dec({ tenant_id: options.tenantId, is_external: isExternalPool.toString() })
DbActivePool.dec({ is_external: isExternalPool.toString() })
})

if (!isExternalPool) {
Expand Down
22 changes: 1 addition & 21 deletions src/http/plugins/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import fastifyPlugin from 'fastify-plugin'
import { MetricsRegistrar, RequestErrors } from '../../monitoring/metrics'
import { MetricsRegistrar } from '../../monitoring/metrics'
import fastifyMetrics from 'fastify-metrics'
import { getConfig } from '../../config'

Expand Down Expand Up @@ -33,26 +33,6 @@ export const metrics = ({ enabledEndpoint }: MetricsOptions) =>
},
registeredRoutesOnly: true,
groupStatusCodes: true,
customLabels: {
tenant_id: (req) => {
return req.tenantId
},
},
},
})

// Errors
fastify.addHook('onResponse', async (request, reply) => {
const error = (reply.raw as any).executionError || reply.executionError

if (error) {
RequestErrors.inc({
name: error.name || error.constructor.name,
tenant_id: request.tenantId,
path: request.routerPath,
method: request.routerMethod,
status: reply.statusCode,
})
}
})
})
5 changes: 4 additions & 1 deletion src/http/plugins/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import fastifyPlugin from 'fastify-plugin'
import { StorageBackendAdapter, createStorageBackend } from '../../storage/backend'
import { Storage } from '../../storage'
import { StorageKnexDB } from '../../storage/database'
import { getConfig } from '../../config'

declare module 'fastify' {
interface FastifyRequest {
Expand All @@ -10,8 +11,10 @@ declare module 'fastify' {
}
}

const { storageBackendType } = getConfig()

export const storage = fastifyPlugin(async (fastify) => {
const storageBackend = createStorageBackend()
const storageBackend = createStorageBackend(storageBackendType)

fastify.decorateRequest('storage', undefined)
fastify.addHook('preHandler', async (request) => {
Expand Down
5 changes: 1 addition & 4 deletions src/http/routes/tus/s3-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@ export class S3Store extends BaseS3Store {
const timer = S3UploadPart.startTimer()

const result = await super.uploadPart(metadata, readStream, partNumber)
const resource = UploadId.fromString(metadata.file.id)

timer({
tenant_id: resource.tenant,
})
timer()

return result
}
Expand Down
28 changes: 11 additions & 17 deletions src/monitoring/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,71 +6,65 @@ export const MetricsRegistrar = new Registry()
export const FileUploadStarted = new client.Gauge({
name: 'storage_api_upload_started',
help: 'Upload started',
labelNames: ['tenant_id', 'region', 'is_multipart'],
labelNames: ['region', 'is_multipart'],
})

export const FileUploadedSuccess = new client.Gauge({
name: 'storage_api_upload_success',
help: 'Successful uploads',
labelNames: ['tenant_id', 'region', 'is_multipart'],
labelNames: ['region', 'is_multipart'],
})

export const DbQueryPerformance = new client.Histogram({
name: 'storage_api_database_query_performance',
help: 'Database query performance',
labelNames: ['tenant_id', 'region', 'name'],
})

export const RequestErrors = new client.Gauge({
name: 'storage_api_request_errors',
labelNames: ['tenant_id', 'region', 'method', 'path', 'status', 'name'],
help: 'Response Errors',
labelNames: ['region', 'name'],
})

export const QueueJobSchedulingTime = new client.Histogram({
name: 'storage_api_queue_job_scheduled_time',
help: 'Time taken to schedule a job in the queue',
labelNames: ['region', 'name', 'tenant_id'],
labelNames: ['region', 'name'],
})

export const QueueJobScheduled = new client.Gauge({
name: 'storage_api_queue_job_scheduled',
help: 'Current number of pending messages in the queue',
labelNames: ['region', 'name', 'tenant_id'],
labelNames: ['region', 'name'],
})

export const QueueJobCompleted = new client.Gauge({
name: 'storage_api_queue_job_completed',
help: 'Current number of processed messages in the queue',
labelNames: ['tenant_id', 'region', 'name'],
labelNames: ['region', 'name'],
})

export const QueueJobRetryFailed = new client.Gauge({
name: 'storage_api_queue_job_retry_failed',
help: 'Current number of failed attempts messages in the queue',
labelNames: ['tenant_id', 'region', 'name'],
labelNames: ['region', 'name'],
})

export const QueueJobError = new client.Gauge({
name: 'storage_api_queue_job_error',
help: 'Current number of errored messages in the queue',
labelNames: ['tenant_id', 'region', 'name'],
labelNames: ['region', 'name'],
})

export const S3UploadPart = new client.Histogram({
name: 'storage_api_s3_upload_part',
help: 'S3 upload part performance',
labelNames: ['tenant_id', 'region'],
labelNames: ['region'],
})

export const DbActivePool = new client.Gauge({
name: 'storage_api_db_pool',
help: 'Number of database pools created',
labelNames: ['tenant_id', 'region', 'is_external'],
labelNames: ['region', 'is_external'],
})

export const DbActiveConnection = new client.Gauge({
name: 'storage_api_db_connections',
help: 'Number of database connections',
labelNames: ['tenant_id', 'region', 'is_external'],
labelNames: ['region', 'is_external'],
})
11 changes: 6 additions & 5 deletions src/queue/events/base-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { getServiceKeyUser } from '../../database/tenant'
import { getPostgresConnection } from '../../database'
import { Storage } from '../../storage'
import { StorageKnexDB } from '../../storage/database'
import { createStorageBackend } from '../../storage/backend'
import { createAgent, createStorageBackend } from '../../storage/backend'
import { getConfig } from '../../config'
import { QueueJobScheduled, QueueJobSchedulingTime } from '../../monitoring/metrics'
import { logger } from '../../monitoring'
Expand All @@ -20,7 +20,8 @@ export interface BasePayload {

export type StaticThis<T> = { new (...args: any): T }

const { enableQueueEvents } = getConfig()
const { enableQueueEvents, storageBackendType, globalS3Protocol } = getConfig()
const httpAgent = createAgent(globalS3Protocol)

export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
public static readonly version: string = 'v1'
Expand Down Expand Up @@ -113,7 +114,9 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
host: payload.tenant.host,
})

const storageBackend = createStorageBackend()
const storageBackend = createStorageBackend(storageBackendType, {
httpAgent,
})

return new Storage(storageBackend, db)
}
Expand Down Expand Up @@ -145,12 +148,10 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {

timer({
name: constructor.getQueueName(),
tenant_id: this.payload.tenant.ref,
})

QueueJobScheduled.inc({
name: constructor.getQueueName(),
tenant_id: this.payload.tenant.ref,
})

return res
Expand Down
3 changes: 0 additions & 3 deletions src/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@ export abstract class Queue {
const res = await event.handle(job)

QueueJobCompleted.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})

return res
} catch (e) {
QueueJobRetryFailed.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})

Expand All @@ -106,7 +104,6 @@ export abstract class Queue {
}
if (dbJob.retrycount === dbJob.retrylimit) {
QueueJobError.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})
}
Expand Down
25 changes: 19 additions & 6 deletions src/storage/backend/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
import { StorageBackendAdapter } from './generic'
import { FileBackend } from './file'
import { S3Backend } from './s3'
import { getConfig } from '../../config'
import { S3Backend, S3ClientOptions } from './s3'
import { getConfig, StorageBackendType } from '../../config'

export * from './s3'
export * from './file'
export * from './generic'

const { region, globalS3Endpoint, globalS3ForcePathStyle, storageBackendType } = getConfig()
const { region, globalS3Endpoint, globalS3ForcePathStyle } = getConfig()

export function createStorageBackend() {
type ConfigForStorage<Type extends StorageBackendType> = Type extends 's3'
? S3ClientOptions
: undefined

export function createStorageBackend<Type extends StorageBackendType>(
type: Type,
config?: ConfigForStorage<Type>
) {
let storageBackend: StorageBackendAdapter

if (storageBackendType === 'file') {
if (type === 'file') {
storageBackend = new FileBackend()
} else {
storageBackend = new S3Backend(region, globalS3Endpoint, globalS3ForcePathStyle)
const defaultOptions: S3ClientOptions = {
region: region,
endpoint: globalS3Endpoint,
forcePathStyle: globalS3ForcePathStyle,
...(config ? config : {}),
}
storageBackend = new S3Backend(defaultOptions)
}

return storageBackend
Expand Down
45 changes: 31 additions & 14 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,52 @@ import Agent, { HttpsAgent } from 'agentkeepalive'

const { globalS3Protocol, globalS3MaxSockets } = getConfig()

/**
* Creates an agent for the given protocol
* @param protocol
*/
export function createAgent(protocol: 'http' | 'https') {
const agentOptions = {
maxSockets: globalS3MaxSockets,
keepAlive: true,
}

return protocol === 'http'
? { httpAgent: new Agent(agentOptions) }
: { httpsAgent: new HttpsAgent(agentOptions) }
}

export interface S3ClientOptions {
endpoint?: string
region?: string
forcePathStyle?: boolean
accessKey?: string
secretKey?: string
role?: string
httpAgent?: { httpAgent: Agent } | { httpsAgent: HttpsAgent }
}

/**
* S3Backend
* Interacts with an s3-compatible file system with this S3Adapter
*/
export class S3Backend implements StorageBackendAdapter {
client: S3Client

constructor(region: string, endpoint?: string | undefined, globalS3ForcePathStyle?: boolean) {
const agentOptions = {
maxSockets: globalS3MaxSockets,
keepAlive: true,
}

const agent =
globalS3Protocol === 'http'
? { httpAgent: new Agent(agentOptions) }
: { httpsAgent: new HttpsAgent(agentOptions) }
constructor(options: S3ClientOptions) {
const agent = options.httpAgent ? options.httpAgent : createAgent(globalS3Protocol)

const params: S3ClientConfig = {
region,
region: options.region,
runtime: 'node',
requestHandler: new NodeHttpHandler({
...agent,
}),
}
if (endpoint) {
params.endpoint = endpoint
if (options.endpoint) {
params.endpoint = options.endpoint
}
if (globalS3ForcePathStyle) {
if (options.forcePathStyle) {
params.forcePathStyle = true
}
this.client = new S3Client(params)
Expand Down
1 change: 0 additions & 1 deletion src/storage/database/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,6 @@ export class StorageKnexDB implements Database {
): Promise<Awaited<ReturnType<T>>> {
const timer = DbQueryPerformance.startTimer({
name: queryName,
tenant_id: this.options.tenantId,
})

let tnx = this.options.tnx
Expand Down
Loading
Loading