Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add object tagging stub endpoint #454

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions src/http/routes/s3/commands/get-object.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { S3ProtocolHandler } from '../../../../storage/protocols/s3/s3-handler'
import { S3Router } from '../router'

const ListObjectsInput = {
const GetObjectInput = {
summary: 'Get Object',
Params: {
type: 'object',
Expand All @@ -22,8 +22,36 @@ const ListObjectsInput = {
Querystring: {},
} as const

export default function ListObjects(s3Router: S3Router) {
s3Router.get('/:Bucket/*', ListObjectsInput, (req, ctx) => {
const GetObjectTagging = {
summary: 'Get Object Tagging',
Params: {
type: 'object',
properties: {
Bucket: { type: 'string' },
'*': { type: 'string' },
},
required: ['Bucket', '*'],
},
Querystring: {
type: 'object',
properties: {
tagging: { type: 'string' },
},
required: ['tagging'],
},
} as const

export default function GetObject(s3Router: S3Router) {
s3Router.get('/:Bucket/*?tagging', GetObjectTagging, (req, ctx) => {
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)

return s3Protocol.getObjectTagging({
Bucket: req.Params.Bucket,
Key: req.Params['*'],
})
})

s3Router.get('/:Bucket/*', GetObjectInput, (req, ctx) => {
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)
const ifModifiedSince = req.Headers?.['if-modified-since']

Expand Down
9 changes: 9 additions & 0 deletions src/http/routes/s3/commands/upload-part.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ const PutObjectInput = {
'x-amz-content-sha256': { type: 'string' },
'x-amz-date': { type: 'string' },
'content-type': { type: 'string' },
'content-length': { type: 'integer' },
'cache-control': { type: 'string' },
'content-disposition': { type: 'string' },
'content-encoding': { type: 'string' },
expires: { type: 'string' },
},
},
} as const
Expand Down Expand Up @@ -85,6 +90,10 @@ export default function UploadPart(s3Router: S3Router) {
Body: ctx.req as any,
Bucket: req.Params.Bucket,
Key: req.Params['*'],
CacheControl: req.Headers?.['cache-control'],
ContentType: req.Headers?.['content-type'],
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
})
},
{ disableContentTypeParser: true }
Expand Down
3 changes: 2 additions & 1 deletion src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ export abstract class StorageBackendAdapter {
UploadId: string,
PartNumber: number,
sourceKey: string,
sourceKeyVersion?: string
sourceKeyVersion?: string,
bytes?: { fromByte: number; toByte: number }
): Promise<{ eTag?: string; lastModified?: Date }> {
throw new Error('not implemented')
}
Expand Down
49 changes: 43 additions & 6 deletions src/storage/backend/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,16 @@ export class FileBackend implements StorageBackendAdapter {
withOptionalVersion(key, version)
)

const multipartFile = path.join(multiPartFolder, `part-${partNumber}`)
const partPath = path.join(multiPartFolder, `part-${partNumber}`)

const writeStream = fsExtra.createWriteStream(partPath)

const writeStream = fsExtra.createWriteStream(multipartFile)
await pipeline(body, writeStream)

const etag = await fileChecksum(multipartFile)
const etag = await fileChecksum(partPath)

const platform = process.platform == 'darwin' ? 'darwin' : 'linux'
await this.setMetadataAttr(multipartFile, METADATA_ATTR_KEYS[platform]['etag'], etag)
await this.setMetadataAttr(partPath, METADATA_ATTR_KEYS[platform]['etag'], etag)

return { ETag: etag }
}
Expand Down Expand Up @@ -402,9 +404,44 @@ export class FileBackend implements StorageBackendAdapter {
version: string,
UploadId: string,
PartNumber: number,
sourceKey: string
sourceKey: string,
sourceVersion?: string,
rangeBytes?: { fromByte: number; toByte: number }
): Promise<{ eTag?: string; lastModified?: Date }> {
throw new Error('Method not implemented.')
const multiPartFolder = path.join(
this.filePath,
'multiparts',
UploadId,
storageS3Bucket,
withOptionalVersion(key, version)
)

const partFilePath = path.join(multiPartFolder, `part-${PartNumber}`)
const sourceFilePath = path.join(
this.filePath,
storageS3Bucket,
withOptionalVersion(sourceKey, sourceVersion)
)

const platform = process.platform == 'darwin' ? 'darwin' : 'linux'

const readStreamOptions = rangeBytes
? { start: rangeBytes.fromByte, end: rangeBytes.toByte }
: {}
const partStream = fs.createReadStream(sourceFilePath, readStreamOptions)

const writePart = fs.createWriteStream(partFilePath)
await pipeline(partStream, writePart)

const etag = await fileChecksum(partFilePath)
await this.setMetadataAttr(partFilePath, METADATA_ATTR_KEYS[platform]['etag'], etag)

const fileStat = await fs.lstat(partFilePath)

return {
eTag: etag,
lastModified: fileStat.mtime,
}
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import {
GetObjectCommand,
GetObjectCommandInput,
HeadObjectCommand,
ListMultipartUploadsCommand,
ListPartsCommand,
PutObjectCommand,
S3Client,
S3ClientConfig,
UploadPartCommand,
Expand Down Expand Up @@ -417,14 +415,16 @@ export class S3Backend implements StorageBackendAdapter {
UploadId: string,
PartNumber: number,
sourceKey: string,
sourceKeyVersion?: string
sourceKeyVersion?: string,
bytesRange?: { fromByte: number; toByte: number }
) {
const uploadPartCopy = new UploadPartCopyCommand({
Bucket: storageS3Bucket,
Key: withOptionalVersion(key, version),
UploadId,
PartNumber,
CopySource: `${storageS3Bucket}/${withOptionalVersion(sourceKey, sourceKeyVersion)}`,
CopySourceRange: bytesRange ? `bytes=${bytesRange.fromByte}-${bytesRange.toByte}` : undefined,
})

const part = await this.client.send(uploadPartCopy)
Expand Down
77 changes: 58 additions & 19 deletions src/storage/protocols/s3/s3-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
DeleteObjectCommandInput,
DeleteObjectsCommandInput,
GetObjectCommandInput,
GetObjectTaggingCommandInput,
HeadObjectCommandInput,
ListMultipartUploadsCommandInput,
ListObjectsCommandInput,
Expand Down Expand Up @@ -508,7 +509,7 @@ export class S3ProtocolHandler {

if (parts.length === 0) {
const allParts = await this.storage.db.asSuperUser().listParts(UploadId, {
maxParts: 1000,
maxParts: 10000,
})

parts.push(
Expand Down Expand Up @@ -758,13 +759,40 @@ export class S3ProtocolHandler {
'cache-control': (object.metadata?.cacheControl as string) || '',
expires: (object.metadata?.expires as string) || '',
'content-length': (object.metadata?.size as string) || '',
'content-type': (object.metadata?.contentType as string) || '',
'content-type': (object.metadata?.mimetype as string) || '',
etag: (object.metadata?.eTag as string) || '',
'last-modified': object.updated_at ? new Date(object.updated_at).toUTCString() || '' : '',
},
}
}

async getObjectTagging(command: GetObjectTaggingCommandInput) {
const { Bucket, Key } = command

if (!Bucket) {
throw ERRORS.MissingParameter('Bucket')
}

if (!Key) {
throw ERRORS.MissingParameter('Key')
}

const object = await this.storage.from(Bucket).findObject(Key, 'id')

if (!object) {
throw ERRORS.NoSuchKey(Key)
}

// TODO: implement tagging when supported
return {
responseBody: {
Tagging: {
TagSet: null,
},
},
}
}

/**
* Retrieves an object from Amazon S3.
*
Expand All @@ -790,7 +818,7 @@ export class S3ProtocolHandler {
return {
headers: {
'cache-control': response.metadata.cacheControl,
'content-length': response.metadata.contentLength.toString(),
'content-length': response.metadata.contentLength?.toString() || '0',
'content-type': response.metadata.mimetype,
etag: response.metadata.eTag,
'last-modified': response.metadata.lastModified?.toUTCString() || '',
Expand Down Expand Up @@ -1044,16 +1072,33 @@ export class S3ProtocolHandler {
throw ERRORS.NoSuchKey('')
}

const bytes = CopySourceRange.split('=')[1].split('-')
// Check if copy source exists
const copySource = await this.storage.db.findObject(
sourceBucketName,
sourceKey,
'id,name,version,metadata'
)

let copySize = copySource.metadata?.size || 0
let rangeBytes: { fromByte: number; toByte: number } | undefined = undefined

if (bytes.length !== 2) {
throw ERRORS.InvalidRange()
}
if (CopySourceRange) {
const bytes = CopySourceRange.split('=')[1].split('-')

if (bytes.length !== 2) {
throw ERRORS.InvalidRange()
}

const fromByte = BigInt(bytes[0])
const toByte = BigInt(bytes[1])
const fromByte = Number(bytes[0])
const toByte = Number(bytes[1])

const size = toByte - fromByte
if (isNaN(fromByte) || isNaN(toByte)) {
throw ERRORS.InvalidRange()
}

rangeBytes = { fromByte, toByte }
copySize = toByte - fromByte
}

const uploader = new Uploader(this.storage.backend, this.storage.db)

Expand All @@ -1064,13 +1109,6 @@ export class S3ProtocolHandler {
isUpsert: true,
})

// Check if copy source exists
const copySource = await this.storage.db.findObject(
sourceBucketName,
sourceKey,
'id,name,version'
)

const [destinationBucket] = await this.storage.db.asSuperUser().withTransaction(async (db) => {
return Promise.all([
db.findBucketById(Bucket, 'file_size_limit'),
Expand All @@ -1082,7 +1120,7 @@ export class S3ProtocolHandler {
destinationBucket?.file_size_limit
)

const multipart = await this.shouldAllowPartUpload(UploadId, Number(size), maxFileSize)
const multipart = await this.shouldAllowPartUpload(UploadId, Number(copySize), maxFileSize)

const uploadPart = await this.storage.backend.uploadPartCopy(
storageS3Bucket,
Expand All @@ -1091,7 +1129,8 @@ export class S3ProtocolHandler {
UploadId,
PartNumber,
`${this.tenantId}/${sourceBucketName}/${copySource.name}`,
copySource.version
copySource.version,
rangeBytes
)

await this.storage.db.insertUploadPart({
Expand Down
1 change: 0 additions & 1 deletion src/storage/protocols/s3/signature-v4.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ interface Credentials {
*/
export const ALWAYS_UNSIGNABLE_HEADERS = {
authorization: true,
'cache-control': true,
connection: true,
expect: true,
from: true,
Expand Down
9 changes: 6 additions & 3 deletions src/storage/renderer/image.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ export class ImageRenderer extends Renderer {
}
} catch (e) {
if (e instanceof AxiosError) {
await this.handleRequestError(e)
const error = await this.handleRequestError(e)
throw error.withMetadata({
transformations,
})
}

throw e
Expand All @@ -241,7 +244,7 @@ export class ImageRenderer extends Renderer {
protected async handleRequestError(error: AxiosError) {
const stream = error.response?.data as Stream
if (!stream) {
throw ERRORS.InternalError(error)
throw ERRORS.InternalError(undefined, error.message)
}

const errorResponse = await new Promise<string>((resolve) => {
Expand All @@ -257,7 +260,7 @@ export class ImageRenderer extends Renderer {
})

const statusCode = error.response?.status || 500
throw ERRORS.ImageProcessingError(statusCode, errorResponse, error)
return ERRORS.ImageProcessingError(statusCode, errorResponse)
}
}

Expand Down
14 changes: 0 additions & 14 deletions src/test/s3-protocol.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1081,20 +1081,6 @@ describe('S3 Protocol', () => {

const parts = await client.send(listPartsCmd)
expect(parts.Parts?.length).toBe(1)

const completeMultiPartUpload = new CompleteMultipartUploadCommand({
Bucket: bucket,
Key: newKey,
UploadId: resp.UploadId,
MultipartUpload: {
Parts: [
{
PartNumber: 1,
ETag: copyResp.CopyPartResult?.ETag,
},
],
},
})
})
})
})
Expand Down
Loading