diff --git a/src/http/error-handler.ts b/src/http/error-handler.ts index c4f6081a..344390eb 100644 --- a/src/http/error-handler.ts +++ b/src/http/error-handler.ts @@ -43,6 +43,18 @@ export const setErrorHandler = (app: FastifyInstance) => { ? 500 : 400 + if (renderableError.code === ErrorCode.AbortedTerminate) { + reply.header('Connection', 'close') + + reply.raw.once('finish', () => { + setTimeout(() => { + if (!request.raw.closed) { + request.raw.destroy() + } + }, 3000) + }) + } + return reply.status(statusCode).send({ ...renderableError, error: error.error || renderableError.code, @@ -59,7 +71,7 @@ export const setErrorHandler = (app: FastifyInstance) => { }) } - reply.status(500).send({ + return reply.status(500).send({ statusCode: '500', error: 'Internal', message: 'Internal Server Error', diff --git a/src/http/plugins/log-request.ts b/src/http/plugins/log-request.ts index 15117e89..b5693ce9 100644 --- a/src/http/plugins/log-request.ts +++ b/src/http/plugins/log-request.ts @@ -32,10 +32,17 @@ export const logRequest = (options: RequestLoggerOptions) => fastify.addHook('onRequest', async (req, res) => { req.startTime = Date.now() - // Request was aborted before the server finishes to return a response res.raw.once('close', () => { - const aborted = !res.raw.writableFinished - if (aborted) { + if (req.raw.aborted) { + doRequestLog(req, { + excludeUrls: options.excludeUrls, + statusCode: 'ABORTED REQ', + responseTime: (Date.now() - req.startTime) / 1000, + }) + return + } + + if (!res.raw.writableFinished) { doRequestLog(req, { excludeUrls: options.excludeUrls, statusCode: 'ABORTED RES', @@ -73,14 +80,6 @@ export const logRequest = (options: RequestLoggerOptions) => } }) - fastify.addHook('onRequestAbort', async (req) => { - doRequestLog(req, { - excludeUrls: options.excludeUrls, - statusCode: 'ABORTED REQ', - responseTime: (Date.now() - req.startTime) / 1000, - }) - }) - fastify.addHook('onResponse', async (req, reply) => { doRequestLog(req, { reply, diff --git a/src/http/plugins/signals.ts b/src/http/plugins/signals.ts index 930474e6..a4734621 100644 --- a/src/http/plugins/signals.ts +++ b/src/http/plugins/signals.ts @@ -20,6 +20,17 @@ export const signals = fastifyPlugin( disconnect: new AbortController(), } + // Client terminated the request before the body was fully sent + req.raw.once('close', () => { + if (req.raw.aborted) { + req.signals.body.abort() + + if (!req.signals.disconnect.signal.aborted) { + req.signals.disconnect.abort() + } + } + }) + // Client terminated the request before server finished sending the response res.raw.once('close', () => { const aborted = !res.raw.writableFinished @@ -33,7 +44,6 @@ export const signals = fastifyPlugin( }) }) - // Client terminated the request before the body was fully sent fastify.addHook('onRequestAbort', async (req) => { req.signals.body.abort() diff --git a/src/http/plugins/tracing.ts b/src/http/plugins/tracing.ts index c29f3ab4..4dc7d72d 100644 --- a/src/http/plugins/tracing.ts +++ b/src/http/plugins/tracing.ts @@ -47,7 +47,7 @@ export const tracing = fastifyPlugin( if ( tracingEnabled && request.tracingMode && - !['full', 'logs', 'debug'].includes(request.tracingMode) + !['logs', 'debug'].includes(request.tracingMode) ) { traceCollector.clearTrace(span.spanContext().traceId) } diff --git a/src/internal/concurrency/index.ts b/src/internal/concurrency/index.ts index 86792b45..c2cbc510 100644 --- a/src/internal/concurrency/index.ts +++ b/src/internal/concurrency/index.ts @@ -1,2 +1,3 @@ export * from './mutex' +export * from './stream' export * from './async-abort-controller' diff --git a/src/internal/concurrency/stream.ts b/src/internal/concurrency/stream.ts new file mode 100644 index 00000000..31653c28 --- /dev/null +++ b/src/internal/concurrency/stream.ts @@ -0,0 +1,40 @@ +import { Transform, TransformCallback } from 'stream' + +interface ByteCounterStreamOptions { + maxHistory?: number + onMaxHistory?: (history: Date[]) => void + rewriteHistoryOnMax?: boolean +} + +export const createByteCounterStream = (options: ByteCounterStreamOptions) => { + const { maxHistory = 100 } = options + + let bytes = 0 + let history: Date[] = [] + + const transformStream = new Transform({ + transform(chunk: Buffer, encoding: string, callback: TransformCallback) { + bytes += chunk.length + history.push(new Date()) + + if (history.length === maxHistory) { + if (options.rewriteHistoryOnMax) { + options.onMaxHistory?.(history) + history = [] + } + } + + callback(null, chunk) + }, + }) + + return { + transformStream, + get bytes() { + return bytes + }, + get history() { + return history + }, + } +} diff --git a/src/internal/errors/codes.ts b/src/internal/errors/codes.ts index 6f1b9383..1274b7d5 100644 --- a/src/internal/errors/codes.ts +++ b/src/internal/errors/codes.ts @@ -37,6 +37,7 @@ export enum ErrorCode { SlowDown = 'SlowDown', TusError = 'TusError', Aborted = 'Aborted', + AbortedTerminate = 'AbortedTerminate', } export const ERRORS = { @@ -372,6 +373,13 @@ export const ERRORS = { message: message, originalError, }), + AbortedTerminate: (message: string, originalError?: unknown) => + new StorageBackendError({ + code: ErrorCode.AbortedTerminate, + httpStatusCode: 500, + message: message, + originalError, + }), } export function isStorageError(errorType: ErrorCode, error: any): error is StorageBackendError { diff --git a/src/storage/backend/s3.ts b/src/storage/backend/s3.ts index 88914689..4c353f1a 100644 --- a/src/storage/backend/s3.ts +++ b/src/storage/backend/s3.ts @@ -28,13 +28,16 @@ import { getSignedUrl } from '@aws-sdk/s3-request-presigner' import { ERRORS, StorageBackendError } from '@internal/errors' import { getConfig } from '../../config' import Agent, { HttpsAgent } from 'agentkeepalive' -import { Readable } from 'stream' +import { addAbortSignal, PassThrough, Readable } from 'node:stream' import { HttpPoolErrorGauge, HttpPoolFreeSocketsGauge, HttpPoolPendingRequestsGauge, HttpPoolSocketsGauge, } from '@internal/monitoring/metrics' +import stream from 'stream/promises' +import { trace } from '@opentelemetry/api' +import { createByteCounterStream } from '@internal/concurrency' const { storageS3MaxSockets, region } = getConfig() @@ -201,34 +204,83 @@ export class S3Backend implements StorageBackendAdapter { bucketName: string, key: string, version: string | undefined, - body: NodeJS.ReadableStream, + body: Readable, contentType: string, cacheControl: string, signal?: AbortSignal ): Promise { + if (signal?.aborted) { + throw ERRORS.Aborted('Upload was aborted') + } + + const passThrough = new PassThrough() + + if (signal) { + addAbortSignal(signal, passThrough) + } + + passThrough.on('error', () => { + body.unpipe(passThrough) + }) + + body.on('error', (err) => { + if (!passThrough.closed) { + passThrough.destroy(err) + } + }) + + const currentAverage: number[] = [] + const byteReader = createByteCounterStream({ + maxHistory: 100, + rewriteHistoryOnMax: true, + onMaxHistory: (history) => { + currentAverage.push(averageTimeBetweenDates(history)) + }, + }) + const bodyStream = body.pipe(passThrough) + + let upload: Upload | undefined = undefined + let bytesUploaded = 0 + try { - const paralellUploadS3 = new Upload({ - client: this.client, - params: { - Bucket: bucketName, - Key: withOptionalVersion(key, version), - /* @ts-expect-error: https://github.com/aws/aws-sdk-js-v3/issues/2085 */ - Body: body, - ContentType: contentType, - CacheControl: cacheControl, - }, - }) + const data = await stream.pipeline( + bodyStream, + byteReader.transformStream, + async (bodyStream) => { + if (signal?.aborted) { + throw ERRORS.Aborted('Upload was aborted') + } + + upload = new Upload({ + client: this.client, + params: { + Bucket: bucketName, + Key: withOptionalVersion(key, version), + Body: bodyStream as Readable, + ContentType: contentType, + CacheControl: cacheControl, + }, + }) + + upload.on('httpUploadProgress', (progress) => { + if (typeof progress.loaded !== 'undefined') { + bytesUploaded = progress.loaded + } + }) + + signal?.addEventListener( + 'abort', + () => { + upload?.abort() + }, + { once: true } + ) - signal?.addEventListener( - 'abort', - () => { - paralellUploadS3.abort() + return await upload.done() }, - { once: true } + { signal } ) - const data = await paralellUploadS3.done() - const metadata = await this.headObject(bucketName, key, version) return { @@ -242,6 +294,22 @@ export class S3Backend implements StorageBackendAdapter { contentRange: metadata.contentRange, } } catch (err: any) { + if (err instanceof Error && err.name === 'AbortError') { + const span = trace.getActiveSpan() + if (span) { + // Print how far we got uploading the file + span.setAttributes({ + byteRead: byteReader.bytes, + bytesUploaded, + chunkTimes: JSON.stringify([ + ...currentAverage, + averageTimeBetweenDates(byteReader.history), + ]), + }) + } + + throw ERRORS.AbortedTerminate('Upload was aborted', err) + } throw StorageBackendError.fromError(err) } } @@ -409,22 +477,30 @@ export class S3Backend implements StorageBackendAdapter { length?: number, signal?: AbortSignal ) { - const paralellUploadS3 = new UploadPartCommand({ - Bucket: bucketName, - Key: `${key}/${version}`, - UploadId: uploadId, - PartNumber: partNumber, - Body: body, - ContentLength: length, - }) + try { + const paralellUploadS3 = new UploadPartCommand({ + Bucket: bucketName, + Key: `${key}/${version}`, + UploadId: uploadId, + PartNumber: partNumber, + Body: body, + ContentLength: length, + }) - const resp = await this.client.send(paralellUploadS3, { - abortSignal: signal, - }) + const resp = await this.client.send(paralellUploadS3, { + abortSignal: signal, + }) - return { - version, - ETag: resp.ETag, + return { + version, + ETag: resp.ETag, + } + } catch (e) { + if (e instanceof Error && e.name === 'AbortError') { + throw ERRORS.AbortedTerminate('Upload was aborted', e) + } + + throw StorageBackendError.fromError(e) } } @@ -531,3 +607,19 @@ export class S3Backend implements StorageBackendAdapter { return new S3Client(params) } } + +function averageTimeBetweenDates(dates: Date[]): number { + if (dates.length < 2) { + throw new Error('At least two dates are required to calculate the average time between them.') + } + + let totalDifference = 0 + + for (let i = 1; i < dates.length; i++) { + const diff = dates[i].getTime() - dates[i - 1].getTime() + totalDifference += diff + } + + const averageDifference = totalDifference / (dates.length - 1) + return averageDifference +} diff --git a/src/storage/protocols/s3/s3-handler.ts b/src/storage/protocols/s3/s3-handler.ts index 902c8099..e3b05e80 100644 --- a/src/storage/protocols/s3/s3-handler.ts +++ b/src/storage/protocols/s3/s3-handler.ts @@ -27,6 +27,7 @@ import { ERRORS } from '@internal/errors' import { S3MultipartUpload, Obj } from '../../schemas' import { decrypt, encrypt } from '@internal/auth' import { ByteLimitTransformStream } from './byte-limit-stream' +import { logger, logSchema } from '@internal/monitoring' const { storageS3Region, storageS3Bucket } = getConfig() @@ -583,7 +584,7 @@ export class S3ProtocolHandler { */ async uploadPart(command: UploadPartCommandInput, signal?: AbortSignal) { if (signal?.aborted) { - throw ERRORS.Aborted('UploadPart aborted') + throw ERRORS.AbortedTerminate('UploadPart aborted') } const { Bucket, PartNumber, UploadId, Key, Body, ContentLength } = command @@ -614,7 +615,7 @@ export class S3ProtocolHandler { const multipart = await this.shouldAllowPartUpload(UploadId, ContentLength, maxFileSize) if (signal?.aborted) { - throw ERRORS.Aborted('UploadPart aborted') + throw ERRORS.AbortedTerminate('UploadPart aborted') } const proxy = new PassThrough() @@ -667,15 +668,26 @@ export class S3ProtocolHandler { }, } } catch (e) { - await this.storage.db.asSuperUser().withTransaction(async (db) => { - const multipart = await db.findMultipartUpload(UploadId, 'in_progress_size', { - forUpdate: true, + try { + await this.storage.db.asSuperUser().withTransaction(async (db) => { + const multipart = await db.findMultipartUpload(UploadId, 'in_progress_size', { + forUpdate: true, + }) + + const diff = multipart.in_progress_size - ContentLength + const signature = this.uploadSignature({ in_progress_size: diff }) + await db.updateMultipartUploadProgress(UploadId, diff, signature) + }) + } catch (e) { + logSchema.error(logger, 'Failed to update multipart upload progress', { + type: 's3', + error: e, }) + } - const diff = multipart.in_progress_size - ContentLength - const signature = this.uploadSignature({ in_progress_size: diff }) - await db.updateMultipartUploadProgress(UploadId, diff, signature) - }) + if (e instanceof Error && e.name === 'AbortError') { + throw ERRORS.AbortedTerminate('UploadPart aborted') + } throw e }