Skip to content

Commit

Permalink
feat: custom metadata on upload
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jul 9, 2024
1 parent 8347d13 commit 763da08
Show file tree
Hide file tree
Showing 28 changed files with 416 additions and 98 deletions.
2 changes: 2 additions & 0 deletions migrations/tenant/0025-custom-metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE storage.objects ADD COLUMN user_metadata jsonb NULL;
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN metadata jsonb NULL;
78 changes: 39 additions & 39 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
"@opentelemetry/instrumentation-pino": "^0.39.0",
"@shopify/semaphore": "^3.0.2",
"@smithy/node-http-handler": "^2.3.1",
"@tus/file-store": "1.3.1",
"@tus/s3-store": "1.4.1",
"@tus/server": "1.4.1",
"@tus/file-store": "1.4.0",
"@tus/s3-store": "1.5.0",
"@tus/server": "1.7.0",
"agentkeepalive": "^4.5.0",
"ajv": "^8.12.0",
"async-retry": "^1.3.3",
Expand Down
21 changes: 14 additions & 7 deletions src/http/routes/object/getObjectInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ async function requestHandler(
getObjectRequestInterface,
unknown
>,
publicRoute = false
publicRoute = false,
method: 'head' | 'info' = 'head'
) {
const { bucketName } = request.params
const objectName = request.params['*']
Expand All @@ -42,15 +43,21 @@ async function requestHandler(
await request.storage.asSuperUser().findBucket(bucketName, 'id', {
isPublic: true,
})
obj = await request.storage.asSuperUser().from(bucketName).findObject(objectName, 'id,version')
obj = await request.storage
.asSuperUser()
.from(bucketName)
.findObject(objectName, 'id,version,metadata,user_metadata,created_at')
} else {
obj = await request.storage.from(bucketName).findObject(objectName, 'id,version')
obj = await request.storage
.from(bucketName)
.findObject(objectName, 'id,version,metadata,user_metadata,created_at')
}

return request.storage.renderer('head').render(request, response, {
return request.storage.renderer(method).render(request, response, {
bucket: storageS3Bucket,
key: s3Key,
version: obj.version,
object: obj,
})
}

Expand Down Expand Up @@ -90,7 +97,7 @@ export async function publicRoutes(fastify: FastifyInstance) {
},
},
async (request, response) => {
return requestHandler(request, response, true)
return requestHandler(request, response, true, 'info')
}
)
}
Expand Down Expand Up @@ -131,7 +138,7 @@ export async function authenticatedRoutes(fastify: FastifyInstance) {
},
},
async (request, response) => {
return requestHandler(request, response)
return requestHandler(request, response, false, 'info')
}
)

Expand All @@ -151,7 +158,7 @@ export async function authenticatedRoutes(fastify: FastifyInstance) {
},
},
async (request, response) => {
return requestHandler(request, response)
return requestHandler(request, response, false, 'info')
}
)

Expand Down
11 changes: 11 additions & 0 deletions src/http/routes/s3/commands/create-multipart-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const CreateMultiPartUploadInput = {
},
Headers: {
type: 'object',
additionalProperties: true,
properties: {
authorization: { type: 'string' },
'content-type': { type: 'string' },
Expand All @@ -39,13 +40,23 @@ export default function CreateMultipartUpload(s3Router: S3Router) {
(req, ctx) => {
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)

let metadata: undefined | Record<string, any> = undefined

Object.keys(req.Headers)
.filter((key) => key.startsWith('x-amz-meta-'))
.forEach((key) => {
if (!metadata) metadata = {}
metadata[key.replace('x-amz-meta-', '')] = req.Headers[key]
})

return s3Protocol.createMultiPartUpload({
Bucket: req.Params.Bucket,
Key: req.Params['*'],
ContentType: req.Headers?.['content-type'],
CacheControl: req.Headers?.['cache-control'],
ContentDisposition: req.Headers?.['content-disposition'],
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
})
}
)
Expand Down
11 changes: 11 additions & 0 deletions src/http/routes/s3/commands/upload-part.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ export default function UploadPart(s3Router: S3Router) {
},
(req, ctx) => {
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)

let metadata: undefined | Record<string, any> = undefined

Object.keys(req.Headers)
.filter((key) => key.startsWith('x-amz-meta-'))
.forEach((key) => {
if (!metadata) metadata = {}
metadata[key.replace('x-amz-meta-', '')] = req.Headers[key]
})

return s3Protocol.putObject({
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Expand All @@ -102,6 +112,7 @@ export default function UploadPart(s3Router: S3Router) {
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
})
}
)
Expand Down
29 changes: 21 additions & 8 deletions src/http/routes/tus/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ export async function onCreate(
rawReq: http.IncomingMessage,
res: http.ServerResponse,
upload: Upload
): Promise<http.ServerResponse> {
): Promise<{ res: http.ServerResponse; metadata?: Upload['metadata'] }> {
const uploadID = UploadId.fromString(upload.id)

const req = rawReq as MultiPartRequest
Expand All @@ -166,17 +166,21 @@ export async function onCreate(

const uploader = new Uploader(storage.backend, storage.db)

if (upload.metadata && /^-?\d+$/.test(upload.metadata.cacheControl || '')) {
upload.metadata.cacheControl = `max-age=${upload.metadata.cacheControl}`
} else if (upload.metadata) {
upload.metadata.cacheControl = 'no-cache'
const metadata = {
...(upload.metadata ? upload.metadata : {}),
}

if (upload.metadata?.contentType && bucket.allowed_mime_types) {
uploader.validateMimeType(upload.metadata.contentType, bucket.allowed_mime_types)
if (/^-?\d+$/.test(metadata.cacheControl || '')) {
metadata.cacheControl = `max-age=${metadata.cacheControl}`
} else if (metadata) {
metadata.cacheControl = 'no-cache'
}

return res
if (metadata?.contentType && bucket.allowed_mime_types) {
uploader.validateMimeType(metadata.contentType, bucket.allowed_mime_types)
}

return { res, metadata }
}

/**
Expand All @@ -199,6 +203,14 @@ export async function onUploadFinish(
)

const uploader = new Uploader(req.upload.storage.backend, req.upload.storage.db)
let customMd: undefined | Record<string, string> = undefined
if (upload.metadata?.userMetadata) {
try {
customMd = JSON.parse(upload.metadata.userMetadata)
} catch (e) {
// no-op
}
}

await uploader.completeUpload({
version: resourceId.version,
Expand All @@ -208,6 +220,7 @@ export async function onUploadFinish(
isUpsert: req.upload.isUpsert,
uploadType: 'resumable',
owner: req.upload.owner,
userMetadata: customMd,
})

res.setHeader('Tus-Complete', '1')
Expand Down
8 changes: 7 additions & 1 deletion src/internal/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ export class TenantConnection {
// This should never be reached, since the above promise is always rejected in this edge case.
throw ERRORS.DatabaseError('Transaction already completed')
}
await tnx.raw(`SELECT set_config('search_path', ?, true)`, [searchPath.join(', ')])

try {
await tnx.raw(`SELECT set_config('search_path', ?, true)`, [searchPath.join(', ')])
} catch (e) {
await tnx.rollback()
throw e
}
}

return tnx
Expand Down
4 changes: 2 additions & 2 deletions src/internal/errors/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ export const ERRORS = {
message: `invalid range provided`,
}),

EntityTooLarge: (e?: Error) =>
EntityTooLarge: (e?: Error, entity = 'object') =>
new StorageBackendError({
error: 'Payload too large',
code: ErrorCode.EntityTooLarge,
httpStatusCode: 413,
message: 'The object exceeded the maximum allowed size',
message: `The ${entity} exceeded the maximum allowed size`,
originalError: e,
}),

Expand Down
4 changes: 0 additions & 4 deletions src/internal/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ export abstract class Queue {
url = multitenantDatabaseUrl
}

console.log({
deleteAfterDays: pgQueueDeleteAfterDays,
deleteAfterHours: pgQueueDeleteAfterHours,
})
Queue.pgBoss = new PgBoss({
connectionString: url,
db: new QueueDB({
Expand Down
Loading

0 comments on commit 763da08

Please sign in to comment.