Skip to content

Commit

Permalink
Merge pull request #2 from SiaFoundation/pj/etag
Browse files Browse the repository at this point in the history
ETag
  • Loading branch information
ChrisSchinnerl authored Sep 26, 2023
2 parents ad9e995 + d69e32b commit 8aab3bc
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 38 deletions.
16 changes: 4 additions & 12 deletions backend.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gofakes3

import (
"encoding/hex"
"io"
"time"

Expand Down Expand Up @@ -120,13 +119,6 @@ func (p ListBucketPage) IsEmpty() bool {
return p == ListBucketPage{}
}

type PutObjectResult struct {
// If versioning is enabled on the bucket, this should be set to the
// created version ID. If versioning is not enabled, this should be
// empty.
VersionID VersionID
}

// Backend provides a set of operations to be implemented in order to support
// gofakes3.
//
Expand Down Expand Up @@ -320,13 +312,13 @@ type VersionedBackend interface {
// gets finalised and pushed to the backend.
type MultipartBackend interface {
CreateMultipartUpload(bucket, object string, meta map[string]string) (UploadID, error)
UploadPart(bucket, object string, id UploadID, partNumber int, contentLength int64, input io.Reader) (etag string, err error)
UploadPart(bucket, object string, id UploadID, partNumber int, contentLength int64, input io.Reader) (*UploadPartResult, error)

ListMultipartUploads(bucket string, marker *UploadListMarker, prefix Prefix, limit int64) (*ListMultipartUploadsResult, error)
ListParts(bucket, object string, uploadID UploadID, marker int, limit int64) (*ListMultipartUploadPartsResult, error)

AbortMultipartUpload(bucket, object string, id UploadID) error
CompleteMultipartUpload(bucket, object string, id UploadID, input *CompleteMultipartUploadRequest) (versionID VersionID, etag string, err error)
CompleteMultipartUpload(bucket, object string, id UploadID, input *CompleteMultipartUploadRequest) (*CompleteMultipartUploadResult, error)
}

// CopyObject is a helper function useful for quickly implementing CopyObject on
Expand All @@ -339,13 +331,13 @@ func CopyObject(db Backend, srcBucket, srcKey, dstBucket, dstKey string, meta ma
}
defer c.Contents.Close()

_, err = db.PutObject(dstBucket, dstKey, meta, c.Contents, c.Size)
res, err := db.PutObject(dstBucket, dstKey, meta, c.Contents, c.Size)
if err != nil {
return
}

return CopyObjectResult{
ETag: `"` + hex.EncodeToString(c.Hash) + `"`,
ETag: res.ETag,
LastModified: NewContentTime(time.Now()),
}, nil
}
Expand Down
1 change: 1 addition & 0 deletions backend/s3mem/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func (db *Backend) PutObject(bucketName, objectName string, meta map[string]stri
result.VersionID = item.versionID
}

result.ETag = item.etag
return result, nil
}

Expand Down
37 changes: 28 additions & 9 deletions gofakes3.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,11 @@ func (g *GoFakeS3) createObjectBrowserUpload(bucket string, w http.ResponseWrite
w.Header().Set("x-amz-version-id", string(result.VersionID))
}

w.Header().Set("ETag", `"`+hex.EncodeToString(rdr.Sum(nil))+`"`)
etag := result.ETag
if etag == "" {
etag = formatETag(hex.EncodeToString(rdr.Sum(nil)))
}
w.Header().Set("ETag", etag)
return nil
}

Expand Down Expand Up @@ -725,7 +729,12 @@ func (g *GoFakeS3) createObject(bucket, object string, w http.ResponseWriter, r
g.log.Print(LogInfo, "CREATED VERSION:", bucket, object, result.VersionID)
w.Header().Set("x-amz-version-id", string(result.VersionID))
}
w.Header().Set("ETag", `"`+hex.EncodeToString(rdr.Sum(nil))+`"`)

etag := result.ETag
if etag == "" {
etag = formatETag(hex.EncodeToString(rdr.Sum(nil)))
}
w.Header().Set("ETag", etag)

return nil
}
Expand Down Expand Up @@ -780,6 +789,12 @@ func (g *GoFakeS3) copyObject(bucket, object string, meta map[string]string, w h
w.Header().Set("x-amz-version-id", string(srcObj.VersionID))
}

etag := result.ETag
if etag == "" {
etag = formatETag(hex.EncodeToString(srcObj.Hash))
}
w.Header().Set("ETag", etag)

return g.xmlEncoder(w).Encode(result)
}

Expand Down Expand Up @@ -940,12 +955,12 @@ func (g *GoFakeS3) putMultipartUploadPart(bucket, object string, uploadID Upload
}
}

etag, err := g.uploader.UploadPart(bucket, object, uploadID, int(partNumber), r.ContentLength, rdr)
res, err := g.uploader.UploadPart(bucket, object, uploadID, int(partNumber), r.ContentLength, rdr)
if err != nil {
return err
}

w.Header().Add("ETag", etag)
w.Header().Add("ETag", res.ETag)
return nil
}

Expand All @@ -966,17 +981,17 @@ func (g *GoFakeS3) completeMultipartUpload(bucket, object string, uploadID Uploa
return err
}

versionID, etag, err := g.uploader.CompleteMultipartUpload(bucket, object, uploadID, &in)
res, err := g.uploader.CompleteMultipartUpload(bucket, object, uploadID, &in)
if err != nil {
return err
}

if versionID != "" {
w.Header().Set("x-amz-version-id", string(versionID))
if res.VersionID != "" {
w.Header().Set("x-amz-version-id", string(res.VersionID))
}

return g.xmlEncoder(w).Encode(&CompleteMultipartUploadResult{
ETag: etag,
return g.xmlEncoder(w).Encode(&CompleteMultipartUploadResponse{
ETag: res.ETag,
Bucket: bucket,
Key: object,
})
Expand Down Expand Up @@ -1202,3 +1217,7 @@ func listBucketVersionsPageFromQuery(query url.Values) (page ListBucketVersionsP

return page, nil
}

func formatETag(etag string) string {
return fmt.Sprintf("\"%s\"", etag)
}
27 changes: 26 additions & 1 deletion messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c CompleteMultipartUploadRequest) partIDs() []int {
return inParts
}

type CompleteMultipartUploadResult struct {
type CompleteMultipartUploadResponse struct {
Location string `xml:"Location"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
Expand Down Expand Up @@ -365,6 +365,20 @@ func (b *ListBucketVersionsResult) AddPrefix(prefix string) {
b.CommonPrefixes = append(b.CommonPrefixes, CommonPrefix{Prefix: prefix})
}

type UploadPartResult struct {
ETag string `xml:"ETag,omitempty"`
}

type CompleteMultipartUploadResult struct {
// If versioning is enabled on the bucket, this should be set to the
// created version ID. If versioning is not enabled, this should be
// empty.
VersionID VersionID `xml:"VersionId,omitempty"`

// ETag is the value of the ETag header returned by the backend.
ETag string `xml:"ETag,omitempty"`
}

type ListMultipartUploadsResult struct {
Bucket string `xml:"Bucket"`

Expand Down Expand Up @@ -430,6 +444,17 @@ type ListMultipartUploadPartItem struct {
Size int64 `xml:"Size"`
}

// PutObjectResult contains the response from a PutObject operation.
type PutObjectResult struct {
// If versioning is enabled on the bucket, this should be set to the
// created version ID. If versioning is not enabled, this should be
// empty.
VersionID VersionID `xml:"VersionId,omitempty"`

// ETag is the value of the ETag header returned by the backend.
ETag string `xml:"ETag,omitempty"`
}

// CopyObjectResult contains the response from a CopyObject operation.
type CopyObjectResult struct {
XMLName xml.Name `xml:"CopyObjectResult"`
Expand Down
39 changes: 23 additions & 16 deletions uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,17 @@ func (u *uploader) AbortMultipartUpload(bucket, object string, id UploadID) erro
return nil
}

func (u *uploader) UploadPart(bucket, object string, id UploadID, partNumber int, contentLength int64, input io.Reader) (etag string, err error) {
func (u *uploader) UploadPart(bucket, object string, id UploadID, partNumber int, contentLength int64, input io.Reader) (*UploadPartResult, error) {
body, err := io.ReadAll(input)
if err != nil {
return "", err
return nil, err
}
if len(body) != int(contentLength) {
return "", ErrIncompleteBody
return nil, ErrIncompleteBody
}
mpu, err := u.getUnlocked(bucket, object, id)
if err != nil {
return "", err
return nil, err
}

mpu.mu.Lock()
Expand All @@ -387,7 +387,7 @@ func (u *uploader) UploadPart(bucket, object string, id UploadID, partNumber int
// from guaranteed unique input:
hash := md5.New()
hash.Write([]byte(body))
etag = fmt.Sprintf(`"%s"`, hex.EncodeToString(hash.Sum(nil)))
etag := formatETag(hex.EncodeToString(hash.Sum(nil)))

part := multipartUploadPart{
PartNumber: partNumber,
Expand All @@ -399,13 +399,14 @@ func (u *uploader) UploadPart(bucket, object string, id UploadID, partNumber int
mpu.parts = append(mpu.parts, make([]*multipartUploadPart, partNumber-len(mpu.parts)+1)...)
}
mpu.parts[partNumber] = &part
return etag, nil

return &UploadPartResult{ETag: etag}, nil
}

func (u *uploader) CompleteMultipartUpload(bucket, object string, id UploadID, input *CompleteMultipartUploadRequest) (version VersionID, etag string, err error) {
func (u *uploader) CompleteMultipartUpload(bucket, object string, id UploadID, input *CompleteMultipartUploadRequest) (*CompleteMultipartUploadResult, error) {
mpu, err := u.getUnlocked(bucket, object, id)
if err != nil {
return "", "", err
return nil, err
}

mpu.mu.Lock()
Expand All @@ -417,23 +418,23 @@ func (u *uploader) CompleteMultipartUpload(bucket, object string, id UploadID, i
// end up uploading more parts than you need to assemble, so it should
// probably just ignore that?
if len(input.Parts) > mpuPartsLen {
return "", "", ErrInvalidPart
return nil, ErrInvalidPart
}

if !input.partsAreSorted() {
return "", "", ErrInvalidPartOrder
return nil, ErrInvalidPartOrder
}

var size int64

for _, inPart := range input.Parts {
if inPart.PartNumber >= mpuPartsLen || mpu.parts[inPart.PartNumber] == nil {
return "", "", ErrorMessagef(ErrInvalidPart, "unexpected part number %d in complete request", inPart.PartNumber)
return nil, ErrorMessagef(ErrInvalidPart, "unexpected part number %d in complete request", inPart.PartNumber)
}

upPart := mpu.parts[inPart.PartNumber]
if strings.Trim(inPart.ETag, "\"") != strings.Trim(upPart.ETag, "\"") {
return "", "", ErrorMessagef(ErrInvalidPart, "unexpected part etag for number %d in complete request", inPart.PartNumber)
return nil, ErrorMessagef(ErrInvalidPart, "unexpected part etag for number %d in complete request", inPart.PartNumber)
}

size += int64(len(upPart.Body))
Expand All @@ -444,16 +445,22 @@ func (u *uploader) CompleteMultipartUpload(bucket, object string, id UploadID, i
body = append(body, mpu.parts[part.PartNumber].Body...)
}

hash := fmt.Sprintf("%x", md5.Sum(body))

result, err := u.storage.PutObject(bucket, object, mpu.Meta, bytes.NewReader(body), int64(len(body)))
if err != nil {
return "", "", err
return nil, err
}

etag := result.ETag
if etag == "" {
etag = formatETag(fmt.Sprintf("%x", md5.Sum(body)))
}

// if getUnlocked succeeded, so will this:
u.buckets[bucket].remove(id)
return result.VersionID, hash, nil
return &CompleteMultipartUploadResult{
VersionID: result.VersionID,
ETag: etag,
}, nil
}

func (u *uploader) getUnlocked(bucket, object string, id UploadID) (mu *multipartUpload, err error) {
Expand Down

0 comments on commit 8aab3bc

Please sign in to comment.