Skip to content

Commit

Permalink
fix: prevent duplicate logs
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Oct 1, 2024
1 parent da3ce61 commit a08713b
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 57 deletions.
14 changes: 13 additions & 1 deletion src/http/error-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ export const setErrorHandler = (app: FastifyInstance) => {
? 500
: 400

if (renderableError.code === ErrorCode.AbortedTerminate) {
reply.header('Connection', 'close')

reply.raw.once('finish', () => {
setTimeout(() => {
if (!request.raw.closed) {
request.raw.destroy()
}
}, 3000)
})
}

return reply.status(statusCode).send({
...renderableError,
error: error.error || renderableError.code,
Expand All @@ -59,7 +71,7 @@ export const setErrorHandler = (app: FastifyInstance) => {
})
}

reply.status(500).send({
return reply.status(500).send({
statusCode: '500',
error: 'Internal',
message: 'Internal Server Error',
Expand Down
21 changes: 10 additions & 11 deletions src/http/plugins/log-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,17 @@ export const logRequest = (options: RequestLoggerOptions) =>
fastify.addHook('onRequest', async (req, res) => {
req.startTime = Date.now()

// Request was aborted before the server finishes to return a response
res.raw.once('close', () => {
const aborted = !res.raw.writableFinished
if (aborted) {
if (req.raw.aborted) {
doRequestLog(req, {
excludeUrls: options.excludeUrls,
statusCode: 'ABORTED REQ',
responseTime: (Date.now() - req.startTime) / 1000,
})
return
}

if (!res.raw.writableFinished) {
doRequestLog(req, {
excludeUrls: options.excludeUrls,
statusCode: 'ABORTED RES',
Expand Down Expand Up @@ -73,14 +80,6 @@ export const logRequest = (options: RequestLoggerOptions) =>
}
})

fastify.addHook('onRequestAbort', async (req) => {
doRequestLog(req, {
excludeUrls: options.excludeUrls,
statusCode: 'ABORTED REQ',
responseTime: (Date.now() - req.startTime) / 1000,
})
})

fastify.addHook('onResponse', async (req, reply) => {
doRequestLog(req, {
reply,
Expand Down
12 changes: 11 additions & 1 deletion src/http/plugins/signals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ export const signals = fastifyPlugin(
disconnect: new AbortController(),
}

// Client terminated the request before the body was fully sent
req.raw.once('close', () => {
if (req.raw.aborted) {
req.signals.body.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
}
})

// Client terminated the request before server finished sending the response
res.raw.once('close', () => {
const aborted = !res.raw.writableFinished
Expand All @@ -33,7 +44,6 @@ export const signals = fastifyPlugin(
})
})

// Client terminated the request before the body was fully sent
fastify.addHook('onRequestAbort', async (req) => {
req.signals.body.abort()

Expand Down
2 changes: 1 addition & 1 deletion src/http/plugins/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const tracing = fastifyPlugin(
if (
tracingEnabled &&
request.tracingMode &&
!['full', 'logs', 'debug'].includes(request.tracingMode)
!['logs', 'debug'].includes(request.tracingMode)
) {
traceCollector.clearTrace(span.spanContext().traceId)
}
Expand Down
1 change: 1 addition & 0 deletions src/internal/concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './mutex'
export * from './stream'
export * from './async-abort-controller'
40 changes: 40 additions & 0 deletions src/internal/concurrency/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Transform, TransformCallback } from 'stream'

interface ByteCounterStreamOptions {
maxHistory?: number
onMaxHistory?: (history: Date[]) => void
rewriteHistoryOnMax?: boolean
}

export const createByteCounterStream = (options: ByteCounterStreamOptions) => {
const { maxHistory = 100 } = options

let bytes = 0
let history: Date[] = []

const transformStream = new Transform({
transform(chunk: Buffer, encoding: string, callback: TransformCallback) {
bytes += chunk.length
history.push(new Date())

if (history.length === maxHistory) {
if (options.rewriteHistoryOnMax) {
options.onMaxHistory?.(history)
history = []
}
}

callback(null, chunk)
},
})

return {
transformStream,
get bytes() {
return bytes
},
get history() {
return history
},
}
}
8 changes: 8 additions & 0 deletions src/internal/errors/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export enum ErrorCode {
SlowDown = 'SlowDown',
TusError = 'TusError',
Aborted = 'Aborted',
AbortedTerminate = 'AbortedTerminate',
}

export const ERRORS = {
Expand Down Expand Up @@ -372,6 +373,13 @@ export const ERRORS = {
message: message,
originalError,
}),
AbortedTerminate: (message: string, originalError?: unknown) =>
new StorageBackendError({
code: ErrorCode.AbortedTerminate,
httpStatusCode: 500,
message: message,
originalError,
}),
}

export function isStorageError(errorType: ErrorCode, error: any): error is StorageBackendError {
Expand Down
160 changes: 126 additions & 34 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import { ERRORS, StorageBackendError } from '@internal/errors'
import { getConfig } from '../../config'
import Agent, { HttpsAgent } from 'agentkeepalive'
import { Readable } from 'stream'
import { addAbortSignal, PassThrough, Readable } from 'node:stream'
import {
HttpPoolErrorGauge,
HttpPoolFreeSocketsGauge,
HttpPoolPendingRequestsGauge,
HttpPoolSocketsGauge,
} from '@internal/monitoring/metrics'
import stream from 'stream/promises'
import { trace } from '@opentelemetry/api'
import { createByteCounterStream } from '@internal/concurrency'

const { storageS3MaxSockets, region } = getConfig()

Expand Down Expand Up @@ -201,34 +204,83 @@ export class S3Backend implements StorageBackendAdapter {
bucketName: string,
key: string,
version: string | undefined,
body: NodeJS.ReadableStream,
body: Readable,
contentType: string,
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
if (signal?.aborted) {
throw ERRORS.Aborted('Upload was aborted')
}

const passThrough = new PassThrough()

if (signal) {
addAbortSignal(signal, passThrough)
}

passThrough.on('error', () => {
body.unpipe(passThrough)
})

body.on('error', (err) => {
if (!passThrough.closed) {
passThrough.destroy(err)
}
})

const currentAverage: number[] = []
const byteReader = createByteCounterStream({
maxHistory: 100,
rewriteHistoryOnMax: true,
onMaxHistory: (history) => {
currentAverage.push(averageTimeBetweenDates(history))
},
})
const bodyStream = body.pipe(passThrough)

let upload: Upload | undefined = undefined
let bytesUploaded = 0

try {
const paralellUploadS3 = new Upload({
client: this.client,
params: {
Bucket: bucketName,
Key: withOptionalVersion(key, version),
/* @ts-expect-error: https://github.com/aws/aws-sdk-js-v3/issues/2085 */
Body: body,
ContentType: contentType,
CacheControl: cacheControl,
},
})
const data = await stream.pipeline(
bodyStream,
byteReader.transformStream,
async (bodyStream) => {
if (signal?.aborted) {
throw ERRORS.Aborted('Upload was aborted')
}

upload = new Upload({
client: this.client,
params: {
Bucket: bucketName,
Key: withOptionalVersion(key, version),
Body: bodyStream as Readable,
ContentType: contentType,
CacheControl: cacheControl,
},
})

upload.on('httpUploadProgress', (progress) => {
if (typeof progress.loaded !== 'undefined') {
bytesUploaded = progress.loaded
}
})

signal?.addEventListener(
'abort',
() => {
upload?.abort()
},
{ once: true }
)

signal?.addEventListener(
'abort',
() => {
paralellUploadS3.abort()
return await upload.done()
},
{ once: true }
{ signal }
)

const data = await paralellUploadS3.done()

const metadata = await this.headObject(bucketName, key, version)

return {
Expand All @@ -242,6 +294,22 @@ export class S3Backend implements StorageBackendAdapter {
contentRange: metadata.contentRange,
}
} catch (err: any) {
if (err instanceof Error && err.name === 'AbortError') {
const span = trace.getActiveSpan()
if (span) {
// Print how far we got uploading the file
span.setAttributes({
byteRead: byteReader.bytes,
bytesUploaded,
chunkTimes: JSON.stringify([
...currentAverage,
averageTimeBetweenDates(byteReader.history),
]),
})
}

throw ERRORS.AbortedTerminate('Upload was aborted', err)
}
throw StorageBackendError.fromError(err)
}
}
Expand Down Expand Up @@ -409,22 +477,30 @@ export class S3Backend implements StorageBackendAdapter {
length?: number,
signal?: AbortSignal
) {
const paralellUploadS3 = new UploadPartCommand({
Bucket: bucketName,
Key: `${key}/${version}`,
UploadId: uploadId,
PartNumber: partNumber,
Body: body,
ContentLength: length,
})
try {
const paralellUploadS3 = new UploadPartCommand({
Bucket: bucketName,
Key: `${key}/${version}`,
UploadId: uploadId,
PartNumber: partNumber,
Body: body,
ContentLength: length,
})

const resp = await this.client.send(paralellUploadS3, {
abortSignal: signal,
})
const resp = await this.client.send(paralellUploadS3, {
abortSignal: signal,
})

return {
version,
ETag: resp.ETag,
return {
version,
ETag: resp.ETag,
}
} catch (e) {
if (e instanceof Error && e.name === 'AbortError') {
throw ERRORS.AbortedTerminate('Upload was aborted', e)
}

throw StorageBackendError.fromError(e)
}
}

Expand Down Expand Up @@ -531,3 +607,19 @@ export class S3Backend implements StorageBackendAdapter {
return new S3Client(params)
}
}

function averageTimeBetweenDates(dates: Date[]): number {
if (dates.length < 2) {
throw new Error('At least two dates are required to calculate the average time between them.')
}

let totalDifference = 0

for (let i = 1; i < dates.length; i++) {
const diff = dates[i].getTime() - dates[i - 1].getTime()
totalDifference += diff
}

const averageDifference = totalDifference / (dates.length - 1)
return averageDifference
}
Loading

0 comments on commit a08713b

Please sign in to comment.