Skip to content

Commit

Permalink
fix: add metrics for agentkeepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Sep 19, 2024
1 parent ddd1558 commit 16acbdb
Show file tree
Hide file tree
Showing 14 changed files with 579 additions and 178 deletions.
559 changes: 414 additions & 145 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
"conventional-changelog-conventionalcommits": "^5.0.0",
"crypto-js": "^4.2.0",
"dotenv": "^16.0.0",
"fastify": "^4.8.1",
"fastify": "^4.28.1",
"fastify-metrics": "^10.2.0",
"fastify-plugin": "^4.0.0",
"fastify-plugin": "^4.5.1",
"fastify-xml-body-parser": "^2.2.0",
"fs-extra": "^10.0.1",
"fs-xattr": "0.3.1",
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/updateObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export default async function routes(fastify: FastifyInstance) {
.uploadOverridingObject(request, {
owner,
objectName: objectName,
signal: request.signals.body.signal,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/uploadSignedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export default async function routes(fastify: FastifyInstance) {
owner,
objectName,
isUpsert: upsert,
signal: request.signals.body.signal,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
23 changes: 13 additions & 10 deletions src/http/routes/s3/commands/upload-part.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,19 @@ export default function UploadPart(s3Router: S3Router) {

const metadata = s3Protocol.parseMetadataHeaders(req.Headers)

return s3Protocol.putObject({
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
})
return s3Protocol.putObject(
{
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
},
ctx.signals.body
)
}
)
}
2 changes: 1 addition & 1 deletion src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type MultiPartRequest = http.IncomingMessage & {

function createTusStore() {
if (storageBackendType === 's3') {
const agent = createAgent(storageS3Endpoint?.includes('http://') ? 'http' : 'https')
const agent = createAgent('s3_tus')
return new S3Store({
partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB,
expirationPeriodInMilliseconds: tusUrlExpiryMs,
Expand Down
25 changes: 25 additions & 0 deletions src/internal/monitoring/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,28 @@ export const DbActiveConnection = new client.Gauge({
help: 'Number of database connections',
labelNames: ['region', 'is_external'],
})

// Create Prometheus metrics
export const HttpPoolSocketsGauge = new client.Gauge({
name: 'storage_api_http_pool_busy_sockets',
help: 'Number of busy sockets currently in use',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolFreeSocketsGauge = new client.Gauge({
name: 'storage_api_http_pool_free_sockets',
help: 'Number of free sockets available for reuse',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolPendingRequestsGauge = new client.Gauge({
name: 'storage_api_http_pool_requests',
help: 'Number of pending requests waiting for a socket',
labelNames: ['name', 'region', 'protocol'],
})

export const HttpPoolErrorGauge = new client.Gauge({
name: 'storage_api_http_pool_errors',
help: 'Number of pending requests waiting for a socket',
labelNames: ['name', 'region', 'type', 'protocol'],
})
6 changes: 6 additions & 0 deletions src/internal/monitoring/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { S3Backend } from '@storage/backend'
import { StorageKnexDB } from '@storage/database'
import { TenantConnection } from '@internal/database'
import { S3Store } from '@tus/s3-store'
import { Upload } from '@aws-sdk/lib-storage'

const tracingEnabled = process.env.TRACING_ENABLED === 'true'
const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || ''
Expand Down Expand Up @@ -239,6 +240,11 @@ const sdk = new NodeSDK({
},
setName: (name, attrs) => 'S3.' + attrs.operation,
}),
new ClassInstrumentation({
targetClass: Upload,
enabled: true,
methodsToInstrument: ['done', '__notifyProgress'],
}),
getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-http': {
enabled: false,
Expand Down
6 changes: 4 additions & 2 deletions src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ export abstract class StorageBackendAdapter {
version: string | undefined,
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
throw new Error('uploadObject not implemented')
}
Expand Down Expand Up @@ -172,7 +173,8 @@ export abstract class StorageBackendAdapter {
uploadId: string,
partNumber: number,
body?: string | Uint8Array | Buffer | Readable,
length?: number
length?: number,
signal?: AbortSignal
): Promise<{ ETag?: string }> {
throw new Error('not implemented')
}
Expand Down
101 changes: 90 additions & 11 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,91 @@ import { ERRORS, StorageBackendError } from '@internal/errors'
import { getConfig } from '../../config'
import Agent, { HttpsAgent } from 'agentkeepalive'
import { Readable } from 'stream'
import {
HttpPoolErrorGauge,
HttpPoolFreeSocketsGauge,
HttpPoolPendingRequestsGauge,
HttpPoolSocketsGauge,
} from '@internal/monitoring/metrics'

const { storageS3MaxSockets, region } = getConfig()

const watchers: NodeJS.Timeout[] = []

const { storageS3MaxSockets } = getConfig()
process.once('SIGTERM', () => {
watchers.forEach((watcher) => {
clearInterval(watcher)
})
})

/**
* Creates an agent for the given protocol
* @param protocol
* @param name
*/
export function createAgent(protocol: 'http' | 'https') {
export function createAgent(name: string) {
const agentOptions = {
maxSockets: storageS3MaxSockets,
keepAlive: true,
keepAliveMsecs: 1000,
freeSocketTimeout: 1000 * 15,
}

const httpAgent = new Agent(agentOptions)
const httpsAgent = new HttpsAgent(agentOptions)

if (httpsAgent) {
const watcher = setInterval(() => {
const httpStatus = httpAgent.getCurrentStatus()
const httpsStatus = httpsAgent.getCurrentStatus()
updateHttpPoolMetrics(name, 'http', httpStatus)
updateHttpPoolMetrics(name, 'https', httpsStatus)
}, 5000)

watchers.push(watcher)
}

return { httpAgent, httpsAgent }
}

// Function to update Prometheus metrics based on the current status of the agent
function updateHttpPoolMetrics(name: string, protocol: string, status: Agent.AgentStatus): void {
// Calculate the number of busy sockets by iterating over the `sockets` object
let busySocketCount = 0
for (const host in status.sockets) {
if (status.sockets.hasOwnProperty(host)) {
busySocketCount += status.sockets[host]
}
}

return protocol === 'http'
? { httpAgent: new Agent(agentOptions) }
: { httpsAgent: new HttpsAgent(agentOptions) }
// Calculate the number of free sockets by iterating over the `freeSockets` object
let freeSocketCount = 0
for (const host in status.freeSockets) {
if (status.freeSockets.hasOwnProperty(host)) {
freeSocketCount += status.freeSockets[host]
}
}

// Calculate the number of pending requests by iterating over the `requests` object
let pendingRequestCount = 0
for (const host in status.requests) {
if (status.requests.hasOwnProperty(host)) {
pendingRequestCount += status.requests[host]
}
}

// Update the metrics with calculated values
HttpPoolSocketsGauge.set({ name, region, protocol }, busySocketCount)
HttpPoolFreeSocketsGauge.set({ name, region, protocol }, freeSocketCount)
HttpPoolPendingRequestsGauge.set({ name, region }, pendingRequestCount)
HttpPoolErrorGauge.set({ name, region, type: 'socket_error', protocol }, status.errorSocketCount)
HttpPoolErrorGauge.set(
{ name, region, type: 'timeout_socket_error', protocol },
status.timeoutSocketCount
)
HttpPoolErrorGauge.set(
{ name, region, type: 'create_socket_error', protocol },
status.createSocketErrorCount
)
}

export interface S3ClientOptions {
Expand Down Expand Up @@ -75,18 +143,21 @@ export class S3Backend implements StorageBackendAdapter {
// Default client for API operations
this.client = this.createS3Client({
...options,
name: 's3_default',
requestTimeout: options.requestTimeout,
})

// Upload client exclusively for upload operations
this.uploadClient = this.createS3Client({
...options,
name: 's3_upload',
requestTimeout: options.uploadTimeout,
})

// Download client exclusively for download operations
this.downloadClient = this.createS3Client({
...options,
name: 's3_download',
requestTimeout: options.downloadTimeout,
})
}
Expand Down Expand Up @@ -144,14 +215,16 @@ export class S3Backend implements StorageBackendAdapter {
* @param body
* @param contentType
* @param cacheControl
* @param signal
*/
async uploadObject(
bucketName: string,
key: string,
version: string | undefined,
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
try {
const paralellUploadS3 = new Upload({
Expand All @@ -166,6 +239,14 @@ export class S3Backend implements StorageBackendAdapter {
},
})

signal?.addEventListener(
'abort',
() => {
paralellUploadS3.abort()
},
{ once: true }
)

const data = (await paralellUploadS3.done()) as CompleteMultipartUploadCommandOutput

const metadata = await this.headObject(bucketName, key, version)
Expand Down Expand Up @@ -451,10 +532,8 @@ export class S3Backend implements StorageBackendAdapter {
}
}

protected createS3Client(options: S3ClientOptions) {
const storageS3Protocol = options.endpoint?.includes('http://') ? 'http' : 'https'

const agent = options.httpAgent ? options.httpAgent : createAgent(storageS3Protocol)
protected createS3Client(options: S3ClientOptions & { name: string }) {
const agent = createAgent(options.name)

const params: S3ClientConfig = {
region: options.region,
Expand Down
6 changes: 3 additions & 3 deletions src/storage/events/base-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import { Storage } from '../storage'
import { getConfig } from '../../config'
import { logger } from '@internal/monitoring'

const { storageBackendType, storageS3Endpoint, region } = getConfig()
const storageS3Protocol = storageS3Endpoint?.includes('http://') ? 'http' : 'https'
const httpAgent = createAgent(storageS3Protocol)
const { storageBackendType, region } = getConfig()

const httpAgent = createAgent('s3_worker')

export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> extends QueueBaseEvent<T> {
/**
Expand Down
1 change: 1 addition & 0 deletions src/storage/object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface UploadObjectOptions {
owner?: string
isUpsert?: boolean
version?: string
signal?: AbortSignal
}

const { requestUrlLengthLimit, storageS3Bucket } = getConfig()
Expand Down
18 changes: 15 additions & 3 deletions src/storage/protocols/s3/s3-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,13 @@ export class S3ProtocolHandler {
*
* Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
* @param command
* @param signal
*/
async uploadPart(command: UploadPartCommandInput) {
async uploadPart(command: UploadPartCommandInput, signal?: AbortSignal) {
if (signal?.aborted) {
throw ERRORS.Aborted('UploadPart aborted')
}

const { Bucket, PartNumber, UploadId, Key, Body, ContentLength } = command

if (!UploadId) {
Expand Down Expand Up @@ -608,6 +613,10 @@ export class S3ProtocolHandler {

const multipart = await this.shouldAllowPartUpload(UploadId, ContentLength, maxFileSize)

if (signal?.aborted) {
throw ERRORS.Aborted('UploadPart aborted')
}

const proxy = new PassThrough()

if (Body instanceof Readable) {
Expand Down Expand Up @@ -636,7 +645,8 @@ export class S3ProtocolHandler {
UploadId,
PartNumber || 0,
stream as Readable,
ContentLength
ContentLength,
signal
)
}
)
Expand Down Expand Up @@ -677,8 +687,9 @@ export class S3ProtocolHandler {
* Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
*
* @param command
* @param signal
*/
async putObject(command: PutObjectCommandInput) {
async putObject(command: PutObjectCommandInput, signal?: AbortSignal) {
const uploader = new Uploader(this.storage.backend, this.storage.db)

mustBeValidBucketName(command.Bucket)
Expand All @@ -705,6 +716,7 @@ export class S3ProtocolHandler {
fileSizeLimit: bucket.file_size_limit,
allowedMimeTypes: bucket.allowed_mime_types,
metadata: command.Metadata,
signal,
})

return {
Expand Down
4 changes: 3 additions & 1 deletion src/storage/uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ interface UploaderOptions extends UploadObjectOptions {
fileSizeLimit?: number | null
allowedMimeTypes?: string[] | null
metadata?: Record<string, any>
signal?: AbortSignal
}

const { storageS3Bucket, uploadFileSizeLimitStandard } = getConfig()
Expand Down Expand Up @@ -106,7 +107,8 @@ export class Uploader {
version,
file.body,
file.mimeType,
file.cacheControl
file.cacheControl,
options.signal
)

if (file.isTruncated()) {
Expand Down

0 comments on commit 16acbdb

Please sign in to comment.