Skip to content

Commit

Permalink
fix: use a single client
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Sep 23, 2024
1 parent 4281329 commit 385d44b
Show file tree
Hide file tree
Showing 9 changed files with 1,993 additions and 2,011 deletions.
3,846 changes: 1,925 additions & 1,921 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
"node": ">= 14.0.0"
},
"dependencies": {
"@aws-sdk/client-s3": "3.633.0",
"@aws-sdk/lib-storage": "3.633.0",
"@aws-sdk/s3-request-presigner": "3.633.0",
"@aws-sdk/client-s3": "3.654.0",
"@aws-sdk/lib-storage": "3.654.0",
"@aws-sdk/s3-request-presigner": "3.654.0",
"@fastify/accepts": "^4.3.0",
"@fastify/multipart": "^8.3.0",
"@fastify/rate-limit": "^7.6.0",
Expand Down Expand Up @@ -113,7 +113,7 @@
"ts-node-dev": "^1.1.8",
"tsx": "^4.16.0",
"tus-js-client": "^3.1.0",
"typescript": "^5.6.2"
"typescript": "5.2.2"
},
"bin": "./dist/server.js"
}
12 changes: 1 addition & 11 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ type StorageConfigType = {
storageS3ForcePathStyle?: boolean
storageS3Region: string
storageS3ClientTimeout: number
storageS3UploadTimeout: number
storageS3DownloadTimeout: number
isMultitenant: boolean
jwtSecret: string
jwtAlgorithm: string
Expand Down Expand Up @@ -277,15 +275,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
getOptionalConfigFromEnv('STORAGE_S3_FORCE_PATH_STYLE', 'GLOBAL_S3_FORCE_PATH_STYLE') ===
'true',
storageS3Region: getOptionalConfigFromEnv('STORAGE_S3_REGION', 'REGION') as string,
storageS3ClientTimeout: Number(
getOptionalConfigFromEnv('STORAGE_S3_CLIENT_TIMEOUT') || `${1000 * 600}` // 10m
),
storageS3DownloadTimeout: Number(
getOptionalConfigFromEnv('STORAGE_S3_DOWNLOAD_TIMEOUT') || `${1000 * 43200}` //12h
),
storageS3UploadTimeout: Number(
getOptionalConfigFromEnv('STORAGE_S3_UPLOAD_TIMEOUT') || `${1000 * 1200}` // 20m
),
storageS3ClientTimeout: Number(getOptionalConfigFromEnv('STORAGE_S3_CLIENT_TIMEOUT') || `0`),

// DB - Migrations
dbAnonRole: getOptionalConfigFromEnv('DB_ANON_ROLE') || 'anon',
Expand Down
9 changes: 6 additions & 3 deletions src/http/plugins/signals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ export const signals = fastifyPlugin(

// Client terminated the request before the body was fully received
res.raw.once('close', () => {
req.signals.response.abort()
const aborted = !res.raw.writableFinished
if (aborted) {
req.signals.response.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
}
})
})
Expand Down
11 changes: 9 additions & 2 deletions src/http/plugins/tenant-id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ declare module 'fastify' {
}
}

const {
version,
isMultitenant,
tenantId: defaultTenantId,
requestXForwardedHostRegExp,
} = getConfig()

export const tenantId = fastifyPlugin(
async (fastify) => {
const { isMultitenant, tenantId, requestXForwardedHostRegExp } = getConfig()
fastify.decorateRequest('tenantId', tenantId)
fastify.decorateRequest('tenantId', defaultTenantId)
fastify.addHook('onRequest', async (request) => {
if (!isMultitenant || !requestXForwardedHostRegExp) return
const xForwardedHost = request.headers['x-forwarded-host']
Expand All @@ -26,6 +32,7 @@ export const tenantId = fastifyPlugin(
tenantId: request.tenantId,
project: request.tenantId,
reqId: request.id,
appVersion: version,
})
})
},
Expand Down
71 changes: 39 additions & 32 deletions src/http/plugins/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,28 @@ export const tracing = fastifyPlugin(
fastify.register(traceServerTime)

fastify.addHook('onRequest', async (request) => {
if (isMultitenant && request.tenantId) {
const tenantConfig = await getTenantConfig(request.tenantId)
request.tracingMode = tenantConfig.tracingMode
} else {
request.tracingMode = defaultTracingMode
}
try {
if (isMultitenant && request.tenantId) {
const tenantConfig = await getTenantConfig(request.tenantId)
request.tracingMode = tenantConfig.tracingMode
} else {
request.tracingMode = defaultTracingMode
}

const span = trace.getSpan(context.active())
const span = trace.getSpan(context.active())

if (span) {
// We collect logs only in full,logs,debug mode
if (
tracingEnabled &&
request.tracingMode &&
!['full', 'logs', 'debug'].includes(request.tracingMode)
) {
traceCollector.clearTrace(span.spanContext().traceId)
if (span) {
// We collect logs only in full,logs,debug mode
if (
tracingEnabled &&
request.tracingMode &&
!['full', 'logs', 'debug'].includes(request.tracingMode)
) {
traceCollector.clearTrace(span.spanContext().traceId)
}
}
} catch (e) {
logSchema.error(request.log, 'failed setting tracing mode', { error: e, type: 'tracing' })
}
})
},
Expand All @@ -62,12 +66,12 @@ export const traceServerTime = fastifyPlugin(
return
}
fastify.addHook('onResponse', async (request, reply) => {
const traceId = trace.getSpan(context.active())?.spanContext().traceId
try {
const traceId = trace.getSpan(context.active())?.spanContext().traceId

if (traceId) {
const spans = traceCollector.getSpansForTrace(traceId)
if (spans) {
try {
if (traceId) {
const spans = traceCollector.getSpansForTrace(traceId)
if (spans) {
const serverTimingHeaders = spansToServerTimings(spans, reply.statusCode >= 500)

request.serverTimings = serverTimingHeaders
Expand All @@ -84,27 +88,30 @@ export const traceServerTime = fastifyPlugin(
.join(',')
reply.header('Server-Timing', httpServerTimes)
}
} catch (e) {
logSchema.error(logger, 'failed parsing server times', { error: e, type: 'otel' })
traceCollector.clearTrace(traceId)
}

traceCollector.clearTrace(traceId)
}
} catch (e) {
logSchema.error(request.log, 'failed tracing on response', { error: e, type: 'tracing' })
}
})

fastify.addHook('onRequestAbort', async (req) => {
const span = trace.getSpan(context.active())
const traceId = span?.spanContext().traceId
try {
const span = trace.getSpan(context.active())
const traceId = span?.spanContext().traceId

span?.setAttribute('req_aborted', true)
span?.setAttribute('req_aborted', true)

if (traceId) {
const spans = traceCollector.getSpansForTrace(traceId)
if (spans) {
req.serverTimings = spansToServerTimings(spans, true)
if (traceId) {
const spans = traceCollector.getSpansForTrace(traceId)
if (spans) {
req.serverTimings = spansToServerTimings(spans, true)
}
traceCollector.clearTrace(traceId)
}
traceCollector.clearTrace(traceId)
} catch (e) {
logSchema.error(logger, 'failed parsing server times on abort', { error: e, type: 'otel' })
}
})
},
Expand Down
4 changes: 2 additions & 2 deletions src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const {
storageS3Endpoint,
storageS3ForcePathStyle,
storageS3Region,
storageS3UploadTimeout,
storageS3ClientTimeout,
tusUrlExpiryMs,
tusPath,
tusPartSize,
Expand Down Expand Up @@ -69,7 +69,7 @@ function createTusStore() {
requestHandler: new NodeHttpHandler({
...agent,
connectionTimeout: 5000,
requestTimeout: storageS3UploadTimeout,
requestTimeout: storageS3ClientTimeout,
}),
bucket: storageS3Bucket,
region: storageS3Region,
Expand Down
12 changes: 2 additions & 10 deletions src/storage/backend/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,8 @@ export * from './s3'
export * from './file'
export * from './adapter'

const {
storageS3Region,
storageS3Endpoint,
storageS3ForcePathStyle,
storageS3ClientTimeout,
storageS3UploadTimeout,
storageS3DownloadTimeout,
} = getConfig()
const { storageS3Region, storageS3Endpoint, storageS3ForcePathStyle, storageS3ClientTimeout } =
getConfig()

type ConfigForStorage<Type extends StorageBackendType> = Type extends 's3'
? S3ClientOptions
Expand All @@ -34,8 +28,6 @@ export function createStorageBackend<Type extends StorageBackendType>(
endpoint: storageS3Endpoint,
forcePathStyle: storageS3ForcePathStyle,
requestTimeout: storageS3ClientTimeout,
uploadTimeout: storageS3UploadTimeout,
downloadTimeout: storageS3DownloadTimeout,
...(config ? config : {}),
}
storageBackend = new S3Backend(defaultOptions)
Expand Down
31 changes: 5 additions & 26 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ export interface S3ClientOptions {
role?: string
httpAgent?: { httpAgent: Agent; httpsAgent: HttpsAgent }
requestTimeout?: number
downloadTimeout?: number
uploadTimeout?: number
}

/**
Expand All @@ -136,29 +134,12 @@ export interface S3ClientOptions {
*/
export class S3Backend implements StorageBackendAdapter {
client: S3Client
uploadClient: S3Client
downloadClient: S3Client

constructor(options: S3ClientOptions) {
// Default client for API operations
this.client = this.createS3Client({
...options,
name: 's3_default',
requestTimeout: options.requestTimeout,
})

// Upload client exclusively for upload operations
this.uploadClient = this.createS3Client({
...options,
name: 's3_upload',
requestTimeout: options.uploadTimeout,
})

// Download client exclusively for download operations
this.downloadClient = this.createS3Client({
...options,
name: 's3_download',
requestTimeout: options.downloadTimeout,
})
}

Expand Down Expand Up @@ -187,7 +168,7 @@ export class S3Backend implements StorageBackendAdapter {
input.IfModifiedSince = new Date(headers.ifModifiedSince)
}
const command = new GetObjectCommand(input)
const data = await this.downloadClient.send(command, {
const data = await this.client.send(command, {
abortSignal: signal,
})

Expand Down Expand Up @@ -228,7 +209,7 @@ export class S3Backend implements StorageBackendAdapter {
): Promise<ObjectMetadata> {
try {
const paralellUploadS3 = new Upload({
client: this.uploadClient,
client: this.client,
params: {
Bucket: bucketName,
Key: withOptionalVersion(key, version),
Expand Down Expand Up @@ -312,7 +293,7 @@ export class S3Backend implements StorageBackendAdapter {
CopySourceIfModifiedSince: conditions?.ifModifiedSince,
CopySourceIfUnmodifiedSince: conditions?.ifUnmodifiedSince,
})
const data = await this.uploadClient.send(command)
const data = await this.client.send(command)
return {
httpStatusCode: data.$metadata.httpStatusCode || 200,
eTag: data.CopyObjectResult?.ETag || '',
Expand Down Expand Up @@ -438,9 +419,7 @@ export class S3Backend implements StorageBackendAdapter {
ContentLength: length,
})

const resp = await this.uploadClient.send(paralellUploadS3, {
// overwriting the requestTimeout here to avoid the request being cancelled, as the upload can take a long time for a max 5GB upload
requestTimeout: 0,
const resp = await this.client.send(paralellUploadS3, {
abortSignal: signal,
})

Expand Down Expand Up @@ -524,7 +503,7 @@ export class S3Backend implements StorageBackendAdapter {
CopySourceRange: bytesRange ? `bytes=${bytesRange.fromByte}-${bytesRange.toByte}` : undefined,
})

const part = await this.uploadClient.send(uploadPartCopy)
const part = await this.client.send(uploadPartCopy)

return {
eTag: part.CopyPartResult?.ETag,
Expand Down

0 comments on commit 385d44b

Please sign in to comment.