Skip to content

Commit

Permalink
fix: deps upgrade, custom tus options (#440)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Feb 22, 2024
1 parent 65b0297 commit 21e9bde
Show file tree
Hide file tree
Showing 14 changed files with 1,802 additions and 3,102 deletions.
1 change: 1 addition & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ UPLOAD_SIGNED_URL_EXPIRATION_TIME=60

TUS_URL_PATH=/upload/resumable
TUS_URL_EXPIRY_MS=3600000
TUS_PART_SIZE=50


#######################################
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Base stage for shared environment setup
FROM node:18-alpine as base
FROM node:20-alpine as base
RUN apk add --no-cache g++ make python3
WORKDIR /app
COPY package.json package-lock.json ./
Expand Down
4,783 changes: 1,720 additions & 3,063 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
"node": ">= 14.0.0"
},
"dependencies": {
"@aws-sdk/client-s3": "3.398.0",
"@aws-sdk/lib-storage": "3.421.0",
"@aws-sdk/node-http-handler": "3.374.0",
"@aws-sdk/client-s3": "3.515.0",
"@aws-sdk/lib-storage": "3.515.0",
"@aws-sdk/s3-request-presigner": "3.421.0",
"@fastify/multipart": "^7.6.0",
"@fastify/rate-limit": "^7.6.0",
"@fastify/swagger": "^8.3.1",
"@fastify/swagger-ui": "^1.7.0",
"@isaacs/ttlcache": "^1.4.1",
"@shopify/semaphore": "^3.0.2",
"@smithy/node-http-handler": "^2.3.1",
"@tus/file-store": "1.3.1",
"@tus/s3-store": "1.4.1",
"@tus/server": "1.4.1",
Expand Down Expand Up @@ -72,7 +72,7 @@
"@types/js-yaml": "^4.0.5",
"@types/jsonwebtoken": "^9.0.5",
"@types/mustache": "^4.2.2",
"@types/node": "^18.14.6",
"@types/node": "^20.11.5",
"@types/pg": "^8.6.4",
"@typescript-eslint/eslint-plugin": "^5.12.1",
"@typescript-eslint/parser": "^5.12.1",
Expand Down
4 changes: 4 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type StorageConfigType = {
dbSuperUser: string
dbSearchPath: string
dbMigrationStrategy: MultitenantMigrationStrategy
dbPostgresVersion?: string
databaseURL: string
databaseSSLRootCert?: string
databasePoolURL?: string
Expand Down Expand Up @@ -92,6 +93,7 @@ type StorageConfigType = {
uploadSignedUrlExpirationTime: number
tusUrlExpiryMs: number
tusPath: string
tusPartSize: number
tusUseFileVersionSeparator: boolean
defaultMetricsEnabled: boolean
}
Expand Down Expand Up @@ -205,6 +207,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {

// Upload - TUS
tusPath: getOptionalConfigFromEnv('TUS_URL_PATH') || '/upload/resumable',
tusPartSize: parseInt(getOptionalConfigFromEnv('TUS_PART_SIZE') || '50', 10),
tusUrlExpiryMs: parseInt(
getOptionalConfigFromEnv('TUS_URL_EXPIRY_MS') || (1000 * 60 * 60).toString(),
10
Expand Down Expand Up @@ -246,6 +249,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {

// Database - Connection
dbSearchPath: getOptionalConfigFromEnv('DATABASE_SEARCH_PATH', 'DB_SEARCH_PATH') || '',
dbPostgresVersion: getOptionalConfigFromEnv('DATABASE_POSTGRES_VERSION'),
multitenantDatabaseUrl: getOptionalConfigFromEnv(
'DATABASE_MULTITENANT_URL',
'MULTITENANT_DATABASE_URL'
Expand Down
2 changes: 2 additions & 0 deletions src/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const {
databaseFreePoolAfterInactivity,
databaseConnectionTimeout,
dbSearchPath,
dbPostgresVersion,
} = getConfig()

interface TenantConnectionOptions {
Expand Down Expand Up @@ -91,6 +92,7 @@ export class TenantConnection {

knexPool = knex({
client: 'pg',
version: dbPostgresVersion,
searchPath: isExternalPool ? undefined : searchPath,
pool: {
min: 0,
Expand Down
2 changes: 1 addition & 1 deletion src/database/migrations/progressive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { RunMigrationsOnTenants } from '../../queue'
export class ProgressiveMigrations {
protected tenants: string[] = []
protected emittingJobs = false
protected watchInterval: NodeJS.Timer | undefined
protected watchInterval: NodeJS.Timeout | undefined

constructor(protected readonly options: { maxSize: number; interval: number }) {}

Expand Down
23 changes: 19 additions & 4 deletions src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import { FastifyBaseLogger, FastifyInstance } from 'fastify'
import * as http from 'http'
import { Server, ServerOptions, DataStore } from '@tus/server'
import { ServerOptions, DataStore } from '@tus/server'
import { jwt, storage, db, dbSuperUser } from '../../plugins'
import { getConfig } from '../../../config'
import { getFileSizeLimit } from '../../../storage/limits'
import { Storage } from '../../../storage'
import { FileStore, LockNotifier, PgLocker, UploadId, AlsMemoryKV } from '../../../storage/tus'
import {
TusServer,
FileStore,
LockNotifier,
PgLocker,
UploadId,
AlsMemoryKV,
} from '../../../storage/tus'
import {
namingFunction,
onCreate,
Expand All @@ -18,6 +25,8 @@ import {
import { TenantConnection } from '../../../database/connection'
import { PubSub } from '../../../database/pubsub'
import { S3Store } from '@tus/s3-store'
import { NodeHttpHandler } from '@smithy/node-http-handler'
import { createAgent } from '../../../storage/backend'

const {
storageS3Bucket,
Expand All @@ -26,6 +35,7 @@ const {
storageS3Region,
tusUrlExpiryMs,
tusPath,
tusPartSize,
storageBackendType,
storageFilePath,
} = getConfig()
Expand All @@ -42,11 +52,16 @@ type MultiPartRequest = http.IncomingMessage & {

function createTusStore() {
if (storageBackendType === 's3') {
const agent = createAgent(storageS3Endpoint?.includes('http://') ? 'http' : 'https')
return new S3Store({
partSize: 6 * 1024 * 1024, // Each uploaded part will have ~6MB,
partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB,
expirationPeriodInMilliseconds: tusUrlExpiryMs,
cache: new AlsMemoryKV(),
maxConcurrentPartUploads: 100,
s3ClientConfig: {
requestHandler: new NodeHttpHandler({
...agent,
}),
bucket: storageS3Bucket,
region: storageS3Region,
endpoint: storageS3Endpoint,
Expand Down Expand Up @@ -104,7 +119,7 @@ function createTusServer(lockNotifier: LockNotifier) {
return fileSizeLimit
},
}
return new Server(serverOptions)
return new TusServer(serverOptions)
}

export default async function routes(fastify: FastifyInstance) {
Expand Down
6 changes: 3 additions & 3 deletions src/http/routes/tus/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { randomUUID } from 'crypto'
import { isRenderableError, Storage, StorageBackendError } from '../../../storage'
import { getConfig } from '../../../config'
import { Uploader } from '../../../storage/uploader'
import { TenantConnection } from '../../../database/connection'
import { TenantConnection } from '../../../database'
import { UploadId } from '../../../storage/tus'

const { storageS3Bucket, tusPath } = getConfig()
Expand All @@ -31,19 +31,19 @@ export async function onIncomingRequest(
id: string
) {
const req = rawReq as MultiPartRequest
const uploadID = UploadId.fromString(id)

res.on('finish', () => {
req.upload.db.dispose().catch((e) => {
req.log.error({ error: e }, 'Error disposing db connection')
})
})

if (rawReq.method === 'OPTIONS') {
if (rawReq.method === 'OPTIONS' || req.method === 'HEAD') {
return
}

const isUpsert = req.headers['x-upsert'] === 'true'
const uploadID = UploadId.fromString(id)

const uploader = new Uploader(req.upload.storage.backend, req.upload.storage.db)

Expand Down
23 changes: 1 addition & 22 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
S3ClientConfig,
} from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'
import { NodeHttpHandler } from '@aws-sdk/node-http-handler'
import { NodeHttpHandler } from '@smithy/node-http-handler'
import {
StorageBackendAdapter,
BrowserCacheHeaders,
Expand Down Expand Up @@ -166,27 +166,6 @@ export class S3Backend implements StorageBackendAdapter {
}
}

async setMetadataToCompleted(bucketName: string, key: string) {
const headObject = new HeadObjectCommand({
Bucket: bucketName,
Key: `${key}.info`,
})
const findObjResp = await this.client.send(headObject)

const copyCmd = new CopyObjectCommand({
Bucket: bucketName,
CopySource: `${bucketName}/${key}.info`,
Key: `${key}.info`,
Metadata: {
...findObjResp.Metadata,
tus_completed: 'true',
},
MetadataDirective: 'REPLACE',
})

return this.client.send(copyCmd)
}

/**
* Deletes an object
* @param bucket
Expand Down
1 change: 1 addition & 0 deletions src/storage/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from './file-store'
export * from './postgres-locker'
export * from './upload-id'
export * from './als-memory-kv'
export * from './server'
4 changes: 2 additions & 2 deletions src/storage/tus/postgres-locker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class LockNotifier {
this.events.once(`release:${id}`, callback)
}

removeListeners(id: string) {
unsubscribe(id: string) {
this.events.removeAllListeners(`release:${id}`)
}

Expand Down Expand Up @@ -77,7 +77,7 @@ export class PgLock implements Lock {
}

async unlock(): Promise<void> {
this.notifier.removeListeners(this.id)
this.notifier.unsubscribe(this.id)
this.tnxResolver?.()
this.tnxResolver = undefined
}
Expand Down
41 changes: 41 additions & 0 deletions src/storage/tus/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { CancellationContext, ERRORS, Server } from '@tus/server'
import http from 'node:http'

export class TusServer extends Server {
protected createContext(req: http.IncomingMessage) {
// Initialize two AbortControllers:
// 1. `requestAbortController` for instant request termination, particularly useful for stopping clients to upload when errors occur.
// 2. `abortWithDelayController` to introduce a delay before aborting, allowing the server time to complete ongoing operations.
// This is particularly useful when a future request may need to acquire a lock currently held by this request.
const requestAbortController = new AbortController()
const abortWithDelayController = new AbortController()

const onDelayedAbort = (err: unknown) => {
abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort)
setTimeout(() => {
requestAbortController.abort(err)
}, 200)
}
abortWithDelayController.signal.addEventListener('abort', onDelayedAbort)

req.on('close', () => {
abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort)
})

return {
signal: requestAbortController.signal,
abort: () => {
// abort the request immediately
if (!requestAbortController.signal.aborted) {
requestAbortController.abort(ERRORS.ABORTED)
}
},
cancel: () => {
// Initiates the delayed abort sequence unless it's already in progress.
if (!abortWithDelayController.signal.aborted) {
abortWithDelayController.abort(ERRORS.ABORTED)
}
},
}
}
}
4 changes: 2 additions & 2 deletions src/test/rls.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ describe('RLS policies', () => {
const content = fs.readFileSync(path.resolve(__dirname, 'rls_tests.yaml'), 'utf8')

const runId = randomUUID()
let bucketName = randomUUID()
let objectName = randomUUID()
let bucketName: string = randomUUID()
let objectName: string = randomUUID()
const originalBucketName = bucketName

const testScopedSpec = yaml.load(
Expand Down

0 comments on commit 21e9bde

Please sign in to comment.