Skip to content

Commit

Permalink
fix: add metrics for agentkeepalive (#551)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Sep 20, 2024
1 parent ddd1558 commit fbdc9d3
Show file tree
Hide file tree
Showing 19 changed files with 622 additions and 200 deletions.
575 changes: 422 additions & 153 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
"conventional-changelog-conventionalcommits": "^5.0.0",
"crypto-js": "^4.2.0",
"dotenv": "^16.0.0",
"fastify": "^4.8.1",
"fastify": "^4.28.1",
"fastify-metrics": "^10.2.0",
"fastify-plugin": "^4.0.0",
"fastify-plugin": "^4.5.1",
"fastify-xml-body-parser": "^2.2.0",
"fs-extra": "^10.0.1",
"fs-xattr": "0.3.1",
Expand Down Expand Up @@ -113,7 +113,7 @@
"ts-node-dev": "^1.1.8",
"tsx": "^4.16.0",
"tus-js-client": "^3.1.0",
"typescript": "^4.5.5"
"typescript": "^5.6.2"
},
"bin": "./dist/server.js"
}
2 changes: 1 addition & 1 deletion src/http/plugins/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const storageBackend = createStorageBackend(storageBackendType)

export const storage = fastifyPlugin(
async function storagePlugin(fastify) {
fastify.decorateRequest('storage', undefined)
fastify.decorateRequest('storage', null)
fastify.addHook('preHandler', async (request) => {
const database = new StorageKnexDB(request.db, {
tenantId: request.tenantId,
Expand Down
15 changes: 10 additions & 5 deletions src/http/plugins/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ export const tracing = fastifyPlugin(
const span = trace.getSpan(context.active())

if (span) {
// We collect logs only in full and logs mode
// We collect logs only in full,logs,debug mode
if (
tracingEnabled &&
request.tracingMode &&
!['full', 'logs'].includes(request.tracingMode)
!['full', 'logs', 'debug'].includes(request.tracingMode)
) {
traceCollector.clearTrace(span.spanContext().traceId)
}
Expand All @@ -68,7 +68,7 @@ export const traceServerTime = fastifyPlugin(
const spans = traceCollector.getSpansForTrace(traceId)
if (spans) {
try {
const serverTimingHeaders = spansToServerTimings(spans)
const serverTimingHeaders = spansToServerTimings(spans, reply.statusCode >= 500)

request.serverTimings = serverTimingHeaders

Expand All @@ -94,12 +94,15 @@ export const traceServerTime = fastifyPlugin(
})

fastify.addHook('onRequestAbort', async (req) => {
const traceId = trace.getSpan(context.active())?.spanContext().traceId
const span = trace.getSpan(context.active())
const traceId = span?.spanContext().traceId

span?.setAttribute('req_aborted', true)

if (traceId) {
const spans = traceCollector.getSpansForTrace(traceId)
if (spans) {
req.serverTimings = spansToServerTimings(spans)
req.serverTimings = spansToServerTimings(spans, true)
}
traceCollector.clearTrace(traceId)
}
Expand Down Expand Up @@ -155,6 +158,8 @@ function spansToServerTimings(
spanName,
duration,
action: span.item.attributes['db.statement'],
error: span.item.attributes.error,
status: span.item.status,
host: hostName
? isIP(hostName)
? hostName
Expand Down
2 changes: 1 addition & 1 deletion src/http/routes/object/deleteObject.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { FastifyInstance } from 'fastify'
import { FromSchema, JSONSchema } from 'json-schema-to-ts'
import { FromSchema } from 'json-schema-to-ts'
import { createDefaultSchema, createResponse } from '../../routes-helper'
import { AuthenticatedRequest } from '../../types'
import { ROUTE_OPERATIONS } from '../operations'
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/updateObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export default async function routes(fastify: FastifyInstance) {
.uploadOverridingObject(request, {
owner,
objectName: objectName,
signal: request.signals.body.signal,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/uploadSignedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export default async function routes(fastify: FastifyInstance) {
owner,
objectName,
isUpsert: upsert,
signal: request.signals.body.signal,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
23 changes: 13 additions & 10 deletions src/http/routes/s3/commands/upload-part.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,19 @@ export default function UploadPart(s3Router: S3Router) {

const metadata = s3Protocol.parseMetadataHeaders(req.Headers)

return s3Protocol.putObject({
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
})
return s3Protocol.putObject(
{
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
},
ctx.signals.body
)
}
)
}
2 changes: 1 addition & 1 deletion src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type MultiPartRequest = http.IncomingMessage & {

function createTusStore() {
if (storageBackendType === 's3') {
const agent = createAgent(storageS3Endpoint?.includes('http://') ? 'http' : 'https')
const agent = createAgent('s3_tus')
return new S3Store({
partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB,
expirationPeriodInMilliseconds: tusUrlExpiryMs,
Expand Down
25 changes: 25 additions & 0 deletions src/internal/monitoring/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,28 @@ export const DbActiveConnection = new client.Gauge({
help: 'Number of database connections',
labelNames: ['region', 'is_external'],
})

// Create Prometheus metrics
export const HttpPoolSocketsGauge = new client.Gauge({
name: 'storage_api_http_pool_busy_sockets',
help: 'Number of busy sockets currently in use',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolFreeSocketsGauge = new client.Gauge({
name: 'storage_api_http_pool_free_sockets',
help: 'Number of free sockets available for reuse',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolPendingRequestsGauge = new client.Gauge({
name: 'storage_api_http_pool_requests',
help: 'Number of pending requests waiting for a socket',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolErrorGauge = new client.Gauge({
name: 'storage_api_http_pool_errors',
help: 'Number of pending requests waiting for a socket',
labelNames: ['name', 'region', 'type', 'protocol'],
})
7 changes: 7 additions & 0 deletions src/internal/monitoring/otel-instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ class ClassInstrumentation implements Instrumentation {
span.setStatus({ code: SpanStatusCode.OK })
return result
} catch (error) {
if (error instanceof Error) {
span.setAttributes({
error: JSON.stringify({ message: error.message, stack: error.stack }),
stack: error.stack,
})
}

span.setStatus({
code: SpanStatusCode.ERROR,
message: error instanceof Error ? error.message : String(error),
Expand Down
6 changes: 6 additions & 0 deletions src/internal/monitoring/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { S3Backend } from '@storage/backend'
import { StorageKnexDB } from '@storage/database'
import { TenantConnection } from '@internal/database'
import { S3Store } from '@tus/s3-store'
import { Upload } from '@aws-sdk/lib-storage'

const tracingEnabled = process.env.TRACING_ENABLED === 'true'
const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || ''
Expand Down Expand Up @@ -239,6 +240,11 @@ const sdk = new NodeSDK({
},
setName: (name, attrs) => 'S3.' + attrs.operation,
}),
new ClassInstrumentation({
targetClass: Upload,
enabled: true,
methodsToInstrument: ['done', '__notifyProgress'],
}),
getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-http': {
enabled: false,
Expand Down
6 changes: 4 additions & 2 deletions src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ export abstract class StorageBackendAdapter {
version: string | undefined,
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
throw new Error('uploadObject not implemented')
}
Expand Down Expand Up @@ -172,7 +173,8 @@ export abstract class StorageBackendAdapter {
uploadId: string,
partNumber: number,
body?: string | Uint8Array | Buffer | Readable,
length?: number
length?: number,
signal?: AbortSignal
): Promise<{ ETag?: string }> {
throw new Error('not implemented')
}
Expand Down
103 changes: 91 additions & 12 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,91 @@ import { ERRORS, StorageBackendError } from '@internal/errors'
import { getConfig } from '../../config'
import Agent, { HttpsAgent } from 'agentkeepalive'
import { Readable } from 'stream'
import {
HttpPoolErrorGauge,
HttpPoolFreeSocketsGauge,
HttpPoolPendingRequestsGauge,
HttpPoolSocketsGauge,
} from '@internal/monitoring/metrics'

const { storageS3MaxSockets, region } = getConfig()

const watchers: NodeJS.Timeout[] = []

const { storageS3MaxSockets } = getConfig()
process.once('SIGTERM', () => {
watchers.forEach((watcher) => {
clearInterval(watcher)
})
})

/**
* Creates an agent for the given protocol
* @param protocol
* @param name
*/
export function createAgent(protocol: 'http' | 'https') {
export function createAgent(name: string) {
const agentOptions = {
maxSockets: storageS3MaxSockets,
keepAlive: true,
keepAliveMsecs: 1000,
freeSocketTimeout: 1000 * 15,
}

const httpAgent = new Agent(agentOptions)
const httpsAgent = new HttpsAgent(agentOptions)

if (httpsAgent) {
const watcher = setInterval(() => {
const httpStatus = httpAgent.getCurrentStatus()
const httpsStatus = httpsAgent.getCurrentStatus()
updateHttpPoolMetrics(name, 'http', httpStatus)
updateHttpPoolMetrics(name, 'https', httpsStatus)
}, 5000)

watchers.push(watcher)
}

return { httpAgent, httpsAgent }
}

// Function to update Prometheus metrics based on the current status of the agent
function updateHttpPoolMetrics(name: string, protocol: string, status: Agent.AgentStatus): void {
// Calculate the number of busy sockets by iterating over the `sockets` object
let busySocketCount = 0
for (const host in status.sockets) {
if (status.sockets.hasOwnProperty(host)) {
busySocketCount += status.sockets[host]
}
}

return protocol === 'http'
? { httpAgent: new Agent(agentOptions) }
: { httpsAgent: new HttpsAgent(agentOptions) }
// Calculate the number of free sockets by iterating over the `freeSockets` object
let freeSocketCount = 0
for (const host in status.freeSockets) {
if (status.freeSockets.hasOwnProperty(host)) {
freeSocketCount += status.freeSockets[host]
}
}

// Calculate the number of pending requests by iterating over the `requests` object
let pendingRequestCount = 0
for (const host in status.requests) {
if (status.requests.hasOwnProperty(host)) {
pendingRequestCount += status.requests[host]
}
}

// Update the metrics with calculated values
HttpPoolSocketsGauge.set({ name, region, protocol }, busySocketCount)
HttpPoolFreeSocketsGauge.set({ name, region, protocol }, freeSocketCount)
HttpPoolPendingRequestsGauge.set({ name, region }, pendingRequestCount)
HttpPoolErrorGauge.set({ name, region, type: 'socket_error', protocol }, status.errorSocketCount)
HttpPoolErrorGauge.set(
{ name, region, type: 'timeout_socket_error', protocol },
status.timeoutSocketCount
)
HttpPoolErrorGauge.set(
{ name, region, type: 'create_socket_error', protocol },
status.createSocketErrorCount
)
}

export interface S3ClientOptions {
Expand All @@ -56,7 +124,7 @@ export interface S3ClientOptions {
accessKey?: string
secretKey?: string
role?: string
httpAgent?: { httpAgent: Agent } | { httpsAgent: HttpsAgent }
httpAgent?: { httpAgent: Agent; httpsAgent: HttpsAgent }
requestTimeout?: number
downloadTimeout?: number
uploadTimeout?: number
Expand All @@ -75,18 +143,21 @@ export class S3Backend implements StorageBackendAdapter {
// Default client for API operations
this.client = this.createS3Client({
...options,
name: 's3_default',
requestTimeout: options.requestTimeout,
})

// Upload client exclusively for upload operations
this.uploadClient = this.createS3Client({
...options,
name: 's3_upload',
requestTimeout: options.uploadTimeout,
})

// Download client exclusively for download operations
this.downloadClient = this.createS3Client({
...options,
name: 's3_download',
requestTimeout: options.downloadTimeout,
})
}
Expand Down Expand Up @@ -144,14 +215,16 @@ export class S3Backend implements StorageBackendAdapter {
* @param body
* @param contentType
* @param cacheControl
* @param signal
*/
async uploadObject(
bucketName: string,
key: string,
version: string | undefined,
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
try {
const paralellUploadS3 = new Upload({
Expand All @@ -166,6 +239,14 @@ export class S3Backend implements StorageBackendAdapter {
},
})

signal?.addEventListener(
'abort',
() => {
paralellUploadS3.abort()
},
{ once: true }
)

const data = (await paralellUploadS3.done()) as CompleteMultipartUploadCommandOutput

const metadata = await this.headObject(bucketName, key, version)
Expand Down Expand Up @@ -451,10 +532,8 @@ export class S3Backend implements StorageBackendAdapter {
}
}

protected createS3Client(options: S3ClientOptions) {
const storageS3Protocol = options.endpoint?.includes('http://') ? 'http' : 'https'

const agent = options.httpAgent ? options.httpAgent : createAgent(storageS3Protocol)
protected createS3Client(options: S3ClientOptions & { name: string }) {
const agent = options.httpAgent ?? createAgent(options.name)

const params: S3ClientConfig = {
region: options.region,
Expand Down
Loading

0 comments on commit fbdc9d3

Please sign in to comment.