Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch upload stream + fix duplicate logs #559

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
fail-fast: false
matrix:
platform: [ubuntu-22.04]
node: ['16']
node: ['20']

runs-on: ${{ matrix.platform }}

Expand Down
14 changes: 13 additions & 1 deletion src/http/error-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@
? 500
: 400

if (renderableError.code === ErrorCode.AbortedTerminate) {
reply.header('Connection', 'close')

reply.raw.once('finish', () => {
setTimeout(() => {
if (!request.raw.closed) {
request.raw.destroy()
}
}, 3000)
})
}

return reply.status(statusCode).send({
...renderableError,
error: error.error || renderableError.code,
Expand All @@ -52,14 +64,14 @@
// Fastify errors
if ('statusCode' in error) {
const err = error as FastifyError
return reply.status((error as any).statusCode || 500).send({

Check warning on line 67 in src/http/error-handler.ts

View workflow job for this annotation

GitHub Actions / Test / OS ubuntu-20.04 / Node 20

Unexpected any. Specify a different type
statusCode: `${err.statusCode}`,
error: err.name,
message: err.message,
})
}

reply.status(500).send({
return reply.status(500).send({
statusCode: '500',
error: 'Internal',
message: 'Internal Server Error',
Expand Down
21 changes: 10 additions & 11 deletions src/http/plugins/log-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

interface FastifyContextConfig {
operation?: { type: string }
resources?: (req: FastifyRequest<any>) => string[]

Check warning on line 21 in src/http/plugins/log-request.ts

View workflow job for this annotation

GitHub Actions / Test / OS ubuntu-20.04 / Node 20

Unexpected any. Specify a different type
}
}

Expand All @@ -32,10 +32,17 @@
fastify.addHook('onRequest', async (req, res) => {
req.startTime = Date.now()

// Request was aborted before the server finishes to return a response
res.raw.once('close', () => {
const aborted = !res.raw.writableFinished
if (aborted) {
if (req.raw.aborted) {
doRequestLog(req, {
excludeUrls: options.excludeUrls,
statusCode: 'ABORTED REQ',
responseTime: (Date.now() - req.startTime) / 1000,
})
return
}

if (!res.raw.writableFinished) {
doRequestLog(req, {
excludeUrls: options.excludeUrls,
statusCode: 'ABORTED RES',
Expand All @@ -53,7 +60,7 @@
const resources = getFirstDefined<string[]>(
req.resources,
req.routeConfig.resources?.(req),
(req.raw as any).resources,

Check warning on line 63 in src/http/plugins/log-request.ts

View workflow job for this annotation

GitHub Actions / Test / OS ubuntu-20.04 / Node 20

Unexpected any. Specify a different type
resourceFromParams ? [resourceFromParams] : ([] as string[])
)

Expand All @@ -73,14 +80,6 @@
}
})

fastify.addHook('onRequestAbort', async (req) => {
doRequestLog(req, {
excludeUrls: options.excludeUrls,
statusCode: 'ABORTED REQ',
responseTime: (Date.now() - req.startTime) / 1000,
})
})

fastify.addHook('onResponse', async (req, reply) => {
doRequestLog(req, {
reply,
Expand Down Expand Up @@ -116,7 +115,7 @@
const rId = req.id
const cIP = req.ip
const statusCode = options.statusCode
const error = (req.raw as any).executionError || req.executionError

Check warning on line 118 in src/http/plugins/log-request.ts

View workflow job for this annotation

GitHub Actions / Test / OS ubuntu-20.04 / Node 20

Unexpected any. Specify a different type
const tenantId = req.tenantId

const buildLogMessage = `${tenantId} | ${rMeth} | ${statusCode} | ${cIP} | ${rId} | ${rUrl} | ${uAgent}`
Expand Down
12 changes: 11 additions & 1 deletion src/http/plugins/signals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ export const signals = fastifyPlugin(
disconnect: new AbortController(),
}

// Client terminated the request before the body was fully sent
req.raw.once('close', () => {
if (req.raw.aborted) {
req.signals.body.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
}
})

// Client terminated the request before server finished sending the response
res.raw.once('close', () => {
const aborted = !res.raw.writableFinished
Expand All @@ -33,7 +44,6 @@ export const signals = fastifyPlugin(
})
})

// Client terminated the request before the body was fully sent
fastify.addHook('onRequestAbort', async (req) => {
req.signals.body.abort()

Expand Down
2 changes: 1 addition & 1 deletion src/http/plugins/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const tracing = fastifyPlugin(
if (
tracingEnabled &&
request.tracingMode &&
!['full', 'logs', 'debug'].includes(request.tracingMode)
!['logs', 'debug'].includes(request.tracingMode)
) {
traceCollector.clearTrace(span.spanContext().traceId)
}
Expand Down
1 change: 1 addition & 0 deletions src/internal/concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './mutex'
export * from './stream'
export * from './async-abort-controller'
40 changes: 40 additions & 0 deletions src/internal/concurrency/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Transform, TransformCallback } from 'stream'

interface ByteCounterStreamOptions {
maxHistory?: number
onMaxHistory?: (history: Date[]) => void
rewriteHistoryOnMax?: boolean
}

export const createByteCounterStream = (options: ByteCounterStreamOptions) => {
const { maxHistory = 100 } = options

let bytes = 0
let history: Date[] = []

const transformStream = new Transform({
transform(chunk: Buffer, encoding: string, callback: TransformCallback) {
bytes += chunk.length
history.push(new Date())

if (history.length === maxHistory) {
if (options.rewriteHistoryOnMax) {
options.onMaxHistory?.(history)
history = []
}
}

callback(null, chunk)
},
})

return {
transformStream,
get bytes() {
return bytes
},
get history() {
return history
},
}
}
8 changes: 8 additions & 0 deletions src/internal/errors/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export enum ErrorCode {
SlowDown = 'SlowDown',
TusError = 'TusError',
Aborted = 'Aborted',
AbortedTerminate = 'AbortedTerminate',
}

export const ERRORS = {
Expand Down Expand Up @@ -372,6 +373,13 @@ export const ERRORS = {
message: message,
originalError,
}),
AbortedTerminate: (message: string, originalError?: unknown) =>
new StorageBackendError({
code: ErrorCode.AbortedTerminate,
httpStatusCode: 500,
message: message,
originalError,
}),
}

export function isStorageError(errorType: ErrorCode, error: any): error is StorageBackendError {
Expand Down
160 changes: 126 additions & 34 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import { ERRORS, StorageBackendError } from '@internal/errors'
import { getConfig } from '../../config'
import Agent, { HttpsAgent } from 'agentkeepalive'
import { Readable } from 'stream'
import { addAbortSignal, PassThrough, Readable } from 'node:stream'
import {
HttpPoolErrorGauge,
HttpPoolFreeSocketsGauge,
HttpPoolPendingRequestsGauge,
HttpPoolSocketsGauge,
} from '@internal/monitoring/metrics'
import stream from 'stream/promises'
import { trace } from '@opentelemetry/api'
import { createByteCounterStream } from '@internal/concurrency'

const { storageS3MaxSockets, region } = getConfig()

Expand Down Expand Up @@ -201,34 +204,83 @@ export class S3Backend implements StorageBackendAdapter {
bucketName: string,
key: string,
version: string | undefined,
body: NodeJS.ReadableStream,
body: Readable,
contentType: string,
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
if (signal?.aborted) {
throw ERRORS.Aborted('Upload was aborted')
}

const passThrough = new PassThrough()

if (signal) {
addAbortSignal(signal, passThrough)
}

passThrough.on('error', () => {
body.unpipe(passThrough)
})

body.on('error', (err) => {
if (!passThrough.closed) {
passThrough.destroy(err)
}
})

const currentAverage: number[] = []
const byteReader = createByteCounterStream({
maxHistory: 100,
rewriteHistoryOnMax: true,
onMaxHistory: (history) => {
currentAverage.push(averageTimeBetweenDates(history))
},
})
const bodyStream = body.pipe(passThrough)

let upload: Upload | undefined = undefined
let bytesUploaded = 0

try {
const paralellUploadS3 = new Upload({
client: this.client,
params: {
Bucket: bucketName,
Key: withOptionalVersion(key, version),
/* @ts-expect-error: https://github.com/aws/aws-sdk-js-v3/issues/2085 */
Body: body,
ContentType: contentType,
CacheControl: cacheControl,
},
})
const data = await stream.pipeline(
bodyStream,
byteReader.transformStream,
async (bodyStream) => {
if (signal?.aborted) {
throw ERRORS.Aborted('Upload was aborted')
}

upload = new Upload({
client: this.client,
params: {
Bucket: bucketName,
Key: withOptionalVersion(key, version),
Body: bodyStream as Readable,
ContentType: contentType,
CacheControl: cacheControl,
},
})

upload.on('httpUploadProgress', (progress) => {
if (typeof progress.loaded !== 'undefined') {
bytesUploaded = progress.loaded
}
})

signal?.addEventListener(
'abort',
() => {
upload?.abort()
},
{ once: true }
)

signal?.addEventListener(
'abort',
() => {
paralellUploadS3.abort()
return await upload.done()
},
{ once: true }
{ signal }
)

const data = await paralellUploadS3.done()

const metadata = await this.headObject(bucketName, key, version)

return {
Expand All @@ -242,6 +294,22 @@ export class S3Backend implements StorageBackendAdapter {
contentRange: metadata.contentRange,
}
} catch (err: any) {
if (err instanceof Error && err.name === 'AbortError') {
const span = trace.getActiveSpan()
if (span) {
// Print how far we got uploading the file
span.setAttributes({
byteRead: byteReader.bytes,
bytesUploaded,
chunkTimes: JSON.stringify([
...currentAverage,
averageTimeBetweenDates(byteReader.history),
]),
})
}

throw ERRORS.AbortedTerminate('Upload was aborted', err)
}
throw StorageBackendError.fromError(err)
}
}
Expand Down Expand Up @@ -409,22 +477,30 @@ export class S3Backend implements StorageBackendAdapter {
length?: number,
signal?: AbortSignal
) {
const paralellUploadS3 = new UploadPartCommand({
Bucket: bucketName,
Key: `${key}/${version}`,
UploadId: uploadId,
PartNumber: partNumber,
Body: body,
ContentLength: length,
})
try {
const paralellUploadS3 = new UploadPartCommand({
Bucket: bucketName,
Key: `${key}/${version}`,
UploadId: uploadId,
PartNumber: partNumber,
Body: body,
ContentLength: length,
})

const resp = await this.client.send(paralellUploadS3, {
abortSignal: signal,
})
const resp = await this.client.send(paralellUploadS3, {
abortSignal: signal,
})

return {
version,
ETag: resp.ETag,
return {
version,
ETag: resp.ETag,
}
} catch (e) {
if (e instanceof Error && e.name === 'AbortError') {
throw ERRORS.AbortedTerminate('Upload was aborted', e)
}

throw StorageBackendError.fromError(e)
}
}

Expand Down Expand Up @@ -531,3 +607,19 @@ export class S3Backend implements StorageBackendAdapter {
return new S3Client(params)
}
}

function averageTimeBetweenDates(dates: Date[]): number {
if (dates.length < 2) {
throw new Error('At least two dates are required to calculate the average time between them.')
}

let totalDifference = 0

for (let i = 1; i < dates.length; i++) {
const diff = dates[i].getTime() - dates[i - 1].getTime()
totalDifference += diff
}

const averageDifference = totalDifference / (dates.length - 1)
return averageDifference
}
Loading
Loading