Skip to content

Commit

Permalink
fix: add metrics for agentkeepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Sep 19, 2024
1 parent ddd1558 commit a0fe880
Show file tree
Hide file tree
Showing 17 changed files with 605 additions and 195 deletions.
575 changes: 422 additions & 153 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
"conventional-changelog-conventionalcommits": "^5.0.0",
"crypto-js": "^4.2.0",
"dotenv": "^16.0.0",
"fastify": "^4.8.1",
"fastify": "^4.28.1",
"fastify-metrics": "^10.2.0",
"fastify-plugin": "^4.0.0",
"fastify-plugin": "^4.5.1",
"fastify-xml-body-parser": "^2.2.0",
"fs-extra": "^10.0.1",
"fs-xattr": "0.3.1",
Expand Down Expand Up @@ -113,7 +113,7 @@
"ts-node-dev": "^1.1.8",
"tsx": "^4.16.0",
"tus-js-client": "^3.1.0",
"typescript": "^4.5.5"
"typescript": "^5.6.2"
},
"bin": "./dist/server.js"
}
2 changes: 1 addition & 1 deletion src/http/plugins/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const storageBackend = createStorageBackend(storageBackendType)

export const storage = fastifyPlugin(
async function storagePlugin(fastify) {
fastify.decorateRequest('storage', undefined)
fastify.decorateRequest('storage', null)
fastify.addHook('preHandler', async (request) => {
const database = new StorageKnexDB(request.db, {
tenantId: request.tenantId,
Expand Down
2 changes: 1 addition & 1 deletion src/http/routes/object/deleteObject.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { FastifyInstance } from 'fastify'
import { FromSchema, JSONSchema } from 'json-schema-to-ts'
import { FromSchema } from 'json-schema-to-ts'
import { createDefaultSchema, createResponse } from '../../routes-helper'
import { AuthenticatedRequest } from '../../types'
import { ROUTE_OPERATIONS } from '../operations'
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/updateObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export default async function routes(fastify: FastifyInstance) {
.uploadOverridingObject(request, {
owner,
objectName: objectName,
signal: request.signals.body.signal,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/uploadSignedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export default async function routes(fastify: FastifyInstance) {
owner,
objectName,
isUpsert: upsert,
signal: request.signals.body.signal,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
23 changes: 13 additions & 10 deletions src/http/routes/s3/commands/upload-part.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,19 @@ export default function UploadPart(s3Router: S3Router) {

const metadata = s3Protocol.parseMetadataHeaders(req.Headers)

return s3Protocol.putObject({
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
})
return s3Protocol.putObject(
{
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
},
ctx.signals.body
)
}
)
}
2 changes: 1 addition & 1 deletion src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type MultiPartRequest = http.IncomingMessage & {

function createTusStore() {
if (storageBackendType === 's3') {
const agent = createAgent(storageS3Endpoint?.includes('http://') ? 'http' : 'https')
const agent = createAgent('s3_tus')
return new S3Store({
partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB,
expirationPeriodInMilliseconds: tusUrlExpiryMs,
Expand Down
25 changes: 25 additions & 0 deletions src/internal/monitoring/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,28 @@ export const DbActiveConnection = new client.Gauge({
help: 'Number of database connections',
labelNames: ['region', 'is_external'],
})

// Create Prometheus metrics
export const HttpPoolSocketsGauge = new client.Gauge({
name: 'storage_api_http_pool_busy_sockets',
help: 'Number of busy sockets currently in use',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolFreeSocketsGauge = new client.Gauge({
name: 'storage_api_http_pool_free_sockets',
help: 'Number of free sockets available for reuse',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolPendingRequestsGauge = new client.Gauge({
name: 'storage_api_http_pool_requests',
help: 'Number of pending requests waiting for a socket',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolErrorGauge = new client.Gauge({
name: 'storage_api_http_pool_errors',
help: 'Number of pending requests waiting for a socket',
labelNames: ['name', 'region', 'type', 'protocol'],
})
6 changes: 6 additions & 0 deletions src/internal/monitoring/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { S3Backend } from '@storage/backend'
import { StorageKnexDB } from '@storage/database'
import { TenantConnection } from '@internal/database'
import { S3Store } from '@tus/s3-store'
import { Upload } from '@aws-sdk/lib-storage'

const tracingEnabled = process.env.TRACING_ENABLED === 'true'
const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || ''
Expand Down Expand Up @@ -239,6 +240,11 @@ const sdk = new NodeSDK({
},
setName: (name, attrs) => 'S3.' + attrs.operation,
}),
new ClassInstrumentation({
targetClass: Upload,
enabled: true,
methodsToInstrument: ['done', '__notifyProgress'],
}),
getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-http': {
enabled: false,
Expand Down
6 changes: 4 additions & 2 deletions src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ export abstract class StorageBackendAdapter {
version: string | undefined,
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
throw new Error('uploadObject not implemented')
}
Expand Down Expand Up @@ -172,7 +173,8 @@ export abstract class StorageBackendAdapter {
uploadId: string,
partNumber: number,
body?: string | Uint8Array | Buffer | Readable,
length?: number
length?: number,
signal?: AbortSignal
): Promise<{ ETag?: string }> {
throw new Error('not implemented')
}
Expand Down
103 changes: 91 additions & 12 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,91 @@ import { ERRORS, StorageBackendError } from '@internal/errors'
import { getConfig } from '../../config'
import Agent, { HttpsAgent } from 'agentkeepalive'
import { Readable } from 'stream'
import {
HttpPoolErrorGauge,
HttpPoolFreeSocketsGauge,
HttpPoolPendingRequestsGauge,
HttpPoolSocketsGauge,
} from '@internal/monitoring/metrics'

const { storageS3MaxSockets, region } = getConfig()

const watchers: NodeJS.Timeout[] = []

const { storageS3MaxSockets } = getConfig()
process.once('SIGTERM', () => {
watchers.forEach((watcher) => {
clearInterval(watcher)
})
})

/**
* Creates an agent for the given protocol
* @param protocol
* @param name
*/
export function createAgent(protocol: 'http' | 'https') {
export function createAgent(name: string) {
const agentOptions = {
maxSockets: storageS3MaxSockets,
keepAlive: true,
keepAliveMsecs: 1000,
freeSocketTimeout: 1000 * 15,
}

const httpAgent = new Agent(agentOptions)
const httpsAgent = new HttpsAgent(agentOptions)

if (httpsAgent) {
const watcher = setInterval(() => {
const httpStatus = httpAgent.getCurrentStatus()
const httpsStatus = httpsAgent.getCurrentStatus()
updateHttpPoolMetrics(name, 'http', httpStatus)
updateHttpPoolMetrics(name, 'https', httpsStatus)
}, 5000)

watchers.push(watcher)
}

return { httpAgent, httpsAgent }
}

// Function to update Prometheus metrics based on the current status of the agent
function updateHttpPoolMetrics(name: string, protocol: string, status: Agent.AgentStatus): void {
// Calculate the number of busy sockets by iterating over the `sockets` object
let busySocketCount = 0
for (const host in status.sockets) {
if (status.sockets.hasOwnProperty(host)) {
busySocketCount += status.sockets[host]
}
}

return protocol === 'http'
? { httpAgent: new Agent(agentOptions) }
: { httpsAgent: new HttpsAgent(agentOptions) }
// Calculate the number of free sockets by iterating over the `freeSockets` object
let freeSocketCount = 0
for (const host in status.freeSockets) {
if (status.freeSockets.hasOwnProperty(host)) {
freeSocketCount += status.freeSockets[host]
}
}

// Calculate the number of pending requests by iterating over the `requests` object
let pendingRequestCount = 0
for (const host in status.requests) {
if (status.requests.hasOwnProperty(host)) {
pendingRequestCount += status.requests[host]
}
}

// Update the metrics with calculated values
HttpPoolSocketsGauge.set({ name, region, protocol }, busySocketCount)
HttpPoolFreeSocketsGauge.set({ name, region, protocol }, freeSocketCount)
HttpPoolPendingRequestsGauge.set({ name, region }, pendingRequestCount)
HttpPoolErrorGauge.set({ name, region, type: 'socket_error', protocol }, status.errorSocketCount)
HttpPoolErrorGauge.set(
{ name, region, type: 'timeout_socket_error', protocol },
status.timeoutSocketCount
)
HttpPoolErrorGauge.set(
{ name, region, type: 'create_socket_error', protocol },
status.createSocketErrorCount
)
}

export interface S3ClientOptions {
Expand All @@ -56,7 +124,7 @@ export interface S3ClientOptions {
accessKey?: string
secretKey?: string
role?: string
httpAgent?: { httpAgent: Agent } | { httpsAgent: HttpsAgent }
httpAgent?: { httpAgent: Agent; httpsAgent: HttpsAgent }
requestTimeout?: number
downloadTimeout?: number
uploadTimeout?: number
Expand All @@ -75,18 +143,21 @@ export class S3Backend implements StorageBackendAdapter {
// Default client for API operations
this.client = this.createS3Client({
...options,
name: 's3_default',
requestTimeout: options.requestTimeout,
})

// Upload client exclusively for upload operations
this.uploadClient = this.createS3Client({
...options,
name: 's3_upload',
requestTimeout: options.uploadTimeout,
})

// Download client exclusively for download operations
this.downloadClient = this.createS3Client({
...options,
name: 's3_download',
requestTimeout: options.downloadTimeout,
})
}
Expand Down Expand Up @@ -144,14 +215,16 @@ export class S3Backend implements StorageBackendAdapter {
* @param body
* @param contentType
* @param cacheControl
* @param signal
*/
async uploadObject(
bucketName: string,
key: string,
version: string | undefined,
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
try {
const paralellUploadS3 = new Upload({
Expand All @@ -166,6 +239,14 @@ export class S3Backend implements StorageBackendAdapter {
},
})

signal?.addEventListener(
'abort',
() => {
paralellUploadS3.abort()
},
{ once: true }
)

const data = (await paralellUploadS3.done()) as CompleteMultipartUploadCommandOutput

const metadata = await this.headObject(bucketName, key, version)
Expand Down Expand Up @@ -451,10 +532,8 @@ export class S3Backend implements StorageBackendAdapter {
}
}

protected createS3Client(options: S3ClientOptions) {
const storageS3Protocol = options.endpoint?.includes('http://') ? 'http' : 'https'

const agent = options.httpAgent ? options.httpAgent : createAgent(storageS3Protocol)
protected createS3Client(options: S3ClientOptions & { name: string }) {
const agent = options.httpAgent ?? createAgent(options.name)

const params: S3ClientConfig = {
region: options.region,
Expand Down
17 changes: 13 additions & 4 deletions src/storage/events/base-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import { createAgent, createStorageBackend } from '../backend'
import { Storage } from '../storage'
import { getConfig } from '../../config'
import { logger } from '@internal/monitoring'
import Agent, { HttpsAgent } from 'agentkeepalive'

const { storageBackendType, storageS3Endpoint, region } = getConfig()
const storageS3Protocol = storageS3Endpoint?.includes('http://') ? 'http' : 'https'
const httpAgent = createAgent(storageS3Protocol)
const { storageBackendType, region } = getConfig()

let httpAgent: { httpAgent: Agent; httpsAgent: HttpsAgent } | undefined

export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends QueueBaseEvent<T> {
/**
Expand Down Expand Up @@ -51,6 +52,14 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends
}
}

protected static getAgent() {
if (httpAgent) {
return httpAgent
}
httpAgent = createAgent('s3_worker')
return httpAgent
}

protected static async createStorage(payload: BasePayload) {
const adminUser = await getServiceKeyUser(payload.tenant.ref)

Expand All @@ -68,7 +77,7 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends
})

const storageBackend = createStorageBackend(storageBackendType, {
httpAgent,
httpAgent: BaseEvent.getAgent(),
})

return new Storage(storageBackend, db)
Expand Down
1 change: 1 addition & 0 deletions src/storage/object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface UploadObjectOptions {
owner?: string
isUpsert?: boolean
version?: string
signal?: AbortSignal
}

const { requestUrlLengthLimit, storageS3Bucket } = getConfig()
Expand Down
Loading

0 comments on commit a0fe880

Please sign in to comment.