Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add metrics for agentkeepalive #551

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
575 changes: 422 additions & 153 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
"conventional-changelog-conventionalcommits": "^5.0.0",
"crypto-js": "^4.2.0",
"dotenv": "^16.0.0",
"fastify": "^4.8.1",
"fastify": "^4.28.1",
"fastify-metrics": "^10.2.0",
"fastify-plugin": "^4.0.0",
"fastify-plugin": "^4.5.1",
"fastify-xml-body-parser": "^2.2.0",
"fs-extra": "^10.0.1",
"fs-xattr": "0.3.1",
Expand Down Expand Up @@ -113,7 +113,7 @@
"ts-node-dev": "^1.1.8",
"tsx": "^4.16.0",
"tus-js-client": "^3.1.0",
"typescript": "^4.5.5"
"typescript": "^5.6.2"
},
"bin": "./dist/server.js"
}
2 changes: 1 addition & 1 deletion src/http/plugins/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const storageBackend = createStorageBackend(storageBackendType)

export const storage = fastifyPlugin(
async function storagePlugin(fastify) {
fastify.decorateRequest('storage', undefined)
fastify.decorateRequest('storage', null)
fastify.addHook('preHandler', async (request) => {
const database = new StorageKnexDB(request.db, {
tenantId: request.tenantId,
Expand Down
15 changes: 10 additions & 5 deletions src/http/plugins/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ export const tracing = fastifyPlugin(
const span = trace.getSpan(context.active())

if (span) {
// We collect logs only in full and logs mode
// We collect logs only in full,logs,debug mode
if (
tracingEnabled &&
request.tracingMode &&
!['full', 'logs'].includes(request.tracingMode)
!['full', 'logs', 'debug'].includes(request.tracingMode)
) {
traceCollector.clearTrace(span.spanContext().traceId)
}
Expand All @@ -68,7 +68,7 @@ export const traceServerTime = fastifyPlugin(
const spans = traceCollector.getSpansForTrace(traceId)
if (spans) {
try {
const serverTimingHeaders = spansToServerTimings(spans)
const serverTimingHeaders = spansToServerTimings(spans, reply.statusCode >= 500)

request.serverTimings = serverTimingHeaders

Expand All @@ -94,12 +94,15 @@ export const traceServerTime = fastifyPlugin(
})

fastify.addHook('onRequestAbort', async (req) => {
const traceId = trace.getSpan(context.active())?.spanContext().traceId
const span = trace.getSpan(context.active())
const traceId = span?.spanContext().traceId

span?.setAttribute('req_aborted', true)

if (traceId) {
const spans = traceCollector.getSpansForTrace(traceId)
if (spans) {
req.serverTimings = spansToServerTimings(spans)
req.serverTimings = spansToServerTimings(spans, true)
}
traceCollector.clearTrace(traceId)
}
Expand Down Expand Up @@ -155,6 +158,8 @@ function spansToServerTimings(
spanName,
duration,
action: span.item.attributes['db.statement'],
error: span.item.attributes.error,
status: span.item.status,
host: hostName
? isIP(hostName)
? hostName
Expand Down
2 changes: 1 addition & 1 deletion src/http/routes/object/deleteObject.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { FastifyInstance } from 'fastify'
import { FromSchema, JSONSchema } from 'json-schema-to-ts'
import { FromSchema } from 'json-schema-to-ts'
import { createDefaultSchema, createResponse } from '../../routes-helper'
import { AuthenticatedRequest } from '../../types'
import { ROUTE_OPERATIONS } from '../operations'
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/updateObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export default async function routes(fastify: FastifyInstance) {
.uploadOverridingObject(request, {
owner,
objectName: objectName,
signal: request.signals.body.signal,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/uploadSignedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export default async function routes(fastify: FastifyInstance) {
owner,
objectName,
isUpsert: upsert,
signal: request.signals.body.signal,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
23 changes: 13 additions & 10 deletions src/http/routes/s3/commands/upload-part.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,19 @@ export default function UploadPart(s3Router: S3Router) {

const metadata = s3Protocol.parseMetadataHeaders(req.Headers)

return s3Protocol.putObject({
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
})
return s3Protocol.putObject(
{
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
},
ctx.signals.body
)
}
)
}
2 changes: 1 addition & 1 deletion src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type MultiPartRequest = http.IncomingMessage & {

function createTusStore() {
if (storageBackendType === 's3') {
const agent = createAgent(storageS3Endpoint?.includes('http://') ? 'http' : 'https')
const agent = createAgent('s3_tus')
return new S3Store({
partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB,
expirationPeriodInMilliseconds: tusUrlExpiryMs,
Expand Down
25 changes: 25 additions & 0 deletions src/internal/monitoring/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,28 @@ export const DbActiveConnection = new client.Gauge({
help: 'Number of database connections',
labelNames: ['region', 'is_external'],
})

// Create Prometheus metrics
export const HttpPoolSocketsGauge = new client.Gauge({
name: 'storage_api_http_pool_busy_sockets',
help: 'Number of busy sockets currently in use',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolFreeSocketsGauge = new client.Gauge({
name: 'storage_api_http_pool_free_sockets',
help: 'Number of free sockets available for reuse',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolPendingRequestsGauge = new client.Gauge({
name: 'storage_api_http_pool_requests',
help: 'Number of pending requests waiting for a socket',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolErrorGauge = new client.Gauge({
name: 'storage_api_http_pool_errors',
help: 'Number of pending requests waiting for a socket',
labelNames: ['name', 'region', 'type', 'protocol'],
})
7 changes: 7 additions & 0 deletions src/internal/monitoring/otel-instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ class ClassInstrumentation implements Instrumentation {
span.setStatus({ code: SpanStatusCode.OK })
return result
} catch (error) {
if (error instanceof Error) {
span.setAttributes({
error: JSON.stringify({ message: error.message, stack: error.stack }),
stack: error.stack,
})
}

span.setStatus({
code: SpanStatusCode.ERROR,
message: error instanceof Error ? error.message : String(error),
Expand Down
6 changes: 6 additions & 0 deletions src/internal/monitoring/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { S3Backend } from '@storage/backend'
import { StorageKnexDB } from '@storage/database'
import { TenantConnection } from '@internal/database'
import { S3Store } from '@tus/s3-store'
import { Upload } from '@aws-sdk/lib-storage'

const tracingEnabled = process.env.TRACING_ENABLED === 'true'
const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || ''
Expand Down Expand Up @@ -239,6 +240,11 @@ const sdk = new NodeSDK({
},
setName: (name, attrs) => 'S3.' + attrs.operation,
}),
new ClassInstrumentation({
targetClass: Upload,
enabled: true,
methodsToInstrument: ['done', '__notifyProgress'],
}),
getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-http': {
enabled: false,
Expand Down
6 changes: 4 additions & 2 deletions src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ export abstract class StorageBackendAdapter {
version: string | undefined,
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
throw new Error('uploadObject not implemented')
}
Expand Down Expand Up @@ -172,7 +173,8 @@ export abstract class StorageBackendAdapter {
uploadId: string,
partNumber: number,
body?: string | Uint8Array | Buffer | Readable,
length?: number
length?: number,
signal?: AbortSignal
): Promise<{ ETag?: string }> {
throw new Error('not implemented')
}
Expand Down
103 changes: 91 additions & 12 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,91 @@ import { ERRORS, StorageBackendError } from '@internal/errors'
import { getConfig } from '../../config'
import Agent, { HttpsAgent } from 'agentkeepalive'
import { Readable } from 'stream'
import {
HttpPoolErrorGauge,
HttpPoolFreeSocketsGauge,
HttpPoolPendingRequestsGauge,
HttpPoolSocketsGauge,
} from '@internal/monitoring/metrics'

const { storageS3MaxSockets, region } = getConfig()

const watchers: NodeJS.Timeout[] = []

const { storageS3MaxSockets } = getConfig()
process.once('SIGTERM', () => {
watchers.forEach((watcher) => {
clearInterval(watcher)
})
})

/**
* Creates an agent for the given protocol
* @param protocol
* @param name
*/
export function createAgent(protocol: 'http' | 'https') {
export function createAgent(name: string) {
const agentOptions = {
maxSockets: storageS3MaxSockets,
keepAlive: true,
keepAliveMsecs: 1000,
freeSocketTimeout: 1000 * 15,
}

const httpAgent = new Agent(agentOptions)
const httpsAgent = new HttpsAgent(agentOptions)

if (httpsAgent) {
const watcher = setInterval(() => {
const httpStatus = httpAgent.getCurrentStatus()
const httpsStatus = httpsAgent.getCurrentStatus()
updateHttpPoolMetrics(name, 'http', httpStatus)
updateHttpPoolMetrics(name, 'https', httpsStatus)
}, 5000)

watchers.push(watcher)
}

return { httpAgent, httpsAgent }
}

// Function to update Prometheus metrics based on the current status of the agent
function updateHttpPoolMetrics(name: string, protocol: string, status: Agent.AgentStatus): void {
// Calculate the number of busy sockets by iterating over the `sockets` object
let busySocketCount = 0
for (const host in status.sockets) {
if (status.sockets.hasOwnProperty(host)) {
busySocketCount += status.sockets[host]
}
}

return protocol === 'http'
? { httpAgent: new Agent(agentOptions) }
: { httpsAgent: new HttpsAgent(agentOptions) }
// Calculate the number of free sockets by iterating over the `freeSockets` object
let freeSocketCount = 0
for (const host in status.freeSockets) {
if (status.freeSockets.hasOwnProperty(host)) {
freeSocketCount += status.freeSockets[host]
}
}

// Calculate the number of pending requests by iterating over the `requests` object
let pendingRequestCount = 0
for (const host in status.requests) {
if (status.requests.hasOwnProperty(host)) {
pendingRequestCount += status.requests[host]
}
}

// Update the metrics with calculated values
HttpPoolSocketsGauge.set({ name, region, protocol }, busySocketCount)
HttpPoolFreeSocketsGauge.set({ name, region, protocol }, freeSocketCount)
HttpPoolPendingRequestsGauge.set({ name, region }, pendingRequestCount)
HttpPoolErrorGauge.set({ name, region, type: 'socket_error', protocol }, status.errorSocketCount)
HttpPoolErrorGauge.set(
{ name, region, type: 'timeout_socket_error', protocol },
status.timeoutSocketCount
)
HttpPoolErrorGauge.set(
{ name, region, type: 'create_socket_error', protocol },
status.createSocketErrorCount
)
}

export interface S3ClientOptions {
Expand All @@ -56,7 +124,7 @@ export interface S3ClientOptions {
accessKey?: string
secretKey?: string
role?: string
httpAgent?: { httpAgent: Agent } | { httpsAgent: HttpsAgent }
httpAgent?: { httpAgent: Agent; httpsAgent: HttpsAgent }
requestTimeout?: number
downloadTimeout?: number
uploadTimeout?: number
Expand All @@ -75,18 +143,21 @@ export class S3Backend implements StorageBackendAdapter {
// Default client for API operations
this.client = this.createS3Client({
...options,
name: 's3_default',
requestTimeout: options.requestTimeout,
})

// Upload client exclusively for upload operations
this.uploadClient = this.createS3Client({
...options,
name: 's3_upload',
requestTimeout: options.uploadTimeout,
})

// Download client exclusively for download operations
this.downloadClient = this.createS3Client({
...options,
name: 's3_download',
requestTimeout: options.downloadTimeout,
})
}
Expand Down Expand Up @@ -144,14 +215,16 @@ export class S3Backend implements StorageBackendAdapter {
* @param body
* @param contentType
* @param cacheControl
* @param signal
*/
async uploadObject(
bucketName: string,
key: string,
version: string | undefined,
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
try {
const paralellUploadS3 = new Upload({
Expand All @@ -166,6 +239,14 @@ export class S3Backend implements StorageBackendAdapter {
},
})

signal?.addEventListener(
'abort',
() => {
paralellUploadS3.abort()
},
{ once: true }
)

const data = (await paralellUploadS3.done()) as CompleteMultipartUploadCommandOutput

const metadata = await this.headObject(bucketName, key, version)
Expand Down Expand Up @@ -451,10 +532,8 @@ export class S3Backend implements StorageBackendAdapter {
}
}

protected createS3Client(options: S3ClientOptions) {
const storageS3Protocol = options.endpoint?.includes('http://') ? 'http' : 'https'

const agent = options.httpAgent ? options.httpAgent : createAgent(storageS3Protocol)
protected createS3Client(options: S3ClientOptions & { name: string }) {
const agent = options.httpAgent ?? createAgent(options.name)

const params: S3ClientConfig = {
region: options.region,
Expand Down
Loading
Loading