diff --git a/src/config.ts b/src/config.ts index ca9622a3..8b0f6deb 100644 --- a/src/config.ts +++ b/src/config.ts @@ -101,6 +101,7 @@ type StorageConfigType = { rateLimiterRedisCommandTimeout: number uploadSignedUrlExpirationTime: number tusUrlExpiryMs: number + tusMaxConcurrentUploads: number tusPath: string tusPartSize: number tusUseFileVersionSeparator: boolean @@ -231,6 +232,10 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { getOptionalConfigFromEnv('TUS_URL_EXPIRY_MS') || (1000 * 60 * 60).toString(), 10 ), + tusMaxConcurrentUploads: parseInt( + getOptionalConfigFromEnv('TUS_MAX_CONCURRENT_UPLOADS') || '500', + 10 + ), tusUseFileVersionSeparator: getOptionalConfigFromEnv('TUS_USE_FILE_VERSION_SEPARATOR') === 'true', diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index ed6f64a9..988f724b 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -61,7 +61,7 @@ function createTusStore() { partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB, expirationPeriodInMilliseconds: tusUrlExpiryMs, cache: new AlsMemoryKV(), - maxConcurrentPartUploads: 100, + maxConcurrentPartUploads: 500, s3ClientConfig: { requestHandler: new NodeHttpHandler({ ...agent, diff --git a/src/storage/object.ts b/src/storage/object.ts index 58738145..ff9441d4 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -33,6 +33,7 @@ interface CopyObjectParams { destinationKey: string owner?: string copyMetadata?: boolean + upsert?: boolean conditions?: { ifMatch?: string ifNoneMatch?: string @@ -280,6 +281,7 @@ export class ObjectStorage { owner, conditions, copyMetadata, + upsert, }: CopyObjectParams) { mustBeValidKey(destinationKey) @@ -310,7 +312,7 @@ export class ObjectStorage { bucketId: destinationBucket, objectName: destinationKey, owner, - isUpsert: false, + isUpsert: upsert, }) try { @@ -325,14 +327,42 @@ export class ObjectStorage { const metadata = await this.backend.headObject(storageS3Bucket, s3DestinationKey, newVersion) - const destObject = await this.db.createObject({ - ...originObject, - bucket_id: destinationBucket, - name: destinationKey, - owner, - metadata, - user_metadata: copyMetadata ? originObject.user_metadata : undefined, - version: newVersion, + const destinationObject = await this.db.asSuperUser().withTransaction(async (db) => { + await db.waitObjectLock(destinationBucket, destinationKey, undefined, { + timeout: 3000, + }) + + const existingDestObject = await db.findObject( + this.bucketId, + destinationKey, + 'id,name,metadata,version,bucket_id', + { + dontErrorOnEmpty: true, + forUpdate: true, + } + ) + + const destinationObject = await db.upsertObject({ + ...originObject, + bucket_id: destinationBucket, + name: destinationKey, + owner, + metadata, + user_metadata: copyMetadata ? originObject.user_metadata : undefined, + version: newVersion, + }) + + if (existingDestObject) { + await ObjectAdminDelete.send({ + name: existingDestObject.name, + bucketId: existingDestObject.bucket_id, + tenant: this.db.tenant(), + version: existingDestObject.version, + reqId: this.db.reqId, + }) + } + + return destinationObject }) await ObjectCreatedCopyEvent.sendWebhook({ @@ -345,7 +375,7 @@ export class ObjectStorage { }) return { - destObject, + destObject: destinationObject, httpStatusCode: copyResult.httpStatusCode, eTag: copyResult.eTag, lastModified: copyResult.lastModified, diff --git a/src/storage/protocols/s3/s3-handler.ts b/src/storage/protocols/s3/s3-handler.ts index c6133aca..c6340529 100644 --- a/src/storage/protocols/s3/s3-handler.ts +++ b/src/storage/protocols/s3/s3-handler.ts @@ -996,6 +996,7 @@ export class S3ProtocolHandler { destinationBucket: Bucket, destinationKey: Key, owner: this.owner, + upsert: true, conditions: { ifMatch: command.CopySourceIfMatch, ifNoneMatch: command.CopySourceIfNoneMatch, diff --git a/src/storage/protocols/tus/postgres-locker.ts b/src/storage/protocols/tus/postgres-locker.ts index fac00567..a3d3a2d6 100644 --- a/src/storage/protocols/tus/postgres-locker.ts +++ b/src/storage/protocols/tus/postgres-locker.ts @@ -53,21 +53,24 @@ export class PgLock implements Lock { this.db .withTransaction(async (db) => { const abortController = new AbortController() - const acquired = await Promise.race([ - this.waitTimeout(15000, abortController.signal), - this.acquireLock(db, this.id, abortController.signal), - ]) - abortController.abort() - - if (!acquired) { - throw ERRORS.LockTimeout() + try { + const acquired = await Promise.race([ + this.waitTimeout(5000, abortController.signal), + this.acquireLock(db, this.id, abortController.signal), + ]) + + if (!acquired) { + throw ERRORS.LockTimeout() + } + + await new Promise((innerResolve) => { + this.tnxResolver = innerResolve + resolve() + }) + } finally { + abortController.abort() } - - await new Promise((innerResolve) => { - this.tnxResolver = innerResolve - resolve() - }) }) .catch(reject) })