Skip to content

Commit

Permalink
stores: persist user metadata in CompleteMultipartUpload
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Mar 12, 2024
1 parent cc1c619 commit 2a946cb
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 23 deletions.
5 changes: 5 additions & 0 deletions api/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type (
MimeType string
Metadata ObjectUserMetadata
}

CompleteMultipartOptions struct {
Metadata ObjectUserMetadata
}
)

type (
Expand All @@ -76,6 +80,7 @@ type (

MultipartCompleteRequest struct {
Bucket string `json:"bucket"`
Metadata ObjectUserMetadata
Path string `json:"path"`
UploadID string `json:"uploadID"`
Parts []MultipartCompletedPart
Expand Down
6 changes: 4 additions & 2 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type (

AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error)
AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error)
CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error)
CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (_ api.MultipartCompleteResponse, err error)
CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (api.MultipartCreateResponse, error)
MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error)
MultipartUploads(ctx context.Context, bucketName, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
Expand Down Expand Up @@ -2244,7 +2244,9 @@ func (b *bus) multipartHandlerCompletePOST(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}
resp, err := b.ms.CompleteMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.Parts)
resp, err := b.ms.CompleteMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.Parts, api.CompleteMultipartOptions{
Metadata: req.Metadata,
})
if jc.Check("failed to complete multipart upload", err) != nil {
return
}
Expand Down
3 changes: 2 additions & 1 deletion bus/client/multipart-upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet
}

// CompleteMultipartUpload completes a multipart upload.
func (c *Client) CompleteMultipartUpload(ctx context.Context, bucket, path, uploadID string, parts []api.MultipartCompletedPart) (resp api.MultipartCompleteResponse, err error) {
func (c *Client) CompleteMultipartUpload(ctx context.Context, bucket, path, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (resp api.MultipartCompleteResponse, err error) {
err = c.c.WithContext(ctx).POST("/multipart/complete", api.MultipartCompleteRequest{
Bucket: bucket,
Path: path,
Metadata: opts.Metadata,
UploadID: uploadID,
Parts: parts,
}, &resp)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe
go.sia.tech/core v0.2.1
go.sia.tech/coreutils v0.0.3
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2
go.sia.tech/gofakes3 v0.0.0-20240311124002-c206381023db
go.sia.tech/hostd v1.0.2
go.sia.tech/jape v0.11.2-0.20240124024603-93559895d640
go.sia.tech/mux v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ go.sia.tech/core v0.2.1 h1:CqmMd+T5rAhC+Py3NxfvGtvsj/GgwIqQHHVrdts/LqY=
go.sia.tech/core v0.2.1/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q=
go.sia.tech/coreutils v0.0.3 h1:ZxuzovRpQMvfy/pCOV4om1cPF6sE15GyJyK36kIrF1Y=
go.sia.tech/coreutils v0.0.3/go.mod h1:UBFc77wXiE//eyilO5HLOncIEj7F69j0Nv2OkFujtP0=
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2 h1:ulzfJNjxN5DjXHClkW2pTiDk+eJ+0NQhX87lFDZ03t0=
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2/go.mod h1:PlsiVCn6+wssrR7bsOIlZm0DahsVrDydrlbjY4F14sg=
go.sia.tech/gofakes3 v0.0.0-20240311124002-c206381023db h1:t35K7tD79+ZZPHJ8XPaFopQvhGlQ5r1o9UgZnLOTvmc=
go.sia.tech/gofakes3 v0.0.0-20240311124002-c206381023db/go.mod h1:PlsiVCn6+wssrR7bsOIlZm0DahsVrDydrlbjY4F14sg=
go.sia.tech/hostd v1.0.2 h1:GjzNIAlwg3/dViF6258Xn5DI3+otQLRqmkoPDugP+9Y=
go.sia.tech/hostd v1.0.2/go.mod h1:zGw+AGVmazAp4ydvo7bZLNKTy1J51RI6Mp/oxRtYT6c=
go.sia.tech/jape v0.11.2-0.20240124024603-93559895d640 h1:mSaJ622P7T/M97dAK8iPV+IRIC9M5vV28NHeceoWO3M=
Expand Down
4 changes: 2 additions & 2 deletions internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2107,7 +2107,7 @@ func TestMultipartUploads(t *testing.T) {
PartNumber: 3,
ETag: etag3,
},
})
}, api.CompleteMultipartOptions{})
tt.OK(err)
if ui.ETag == "" {
t.Fatal("unexpected response:", ui)
Expand Down Expand Up @@ -2435,7 +2435,7 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) {
PartNumber: 3,
ETag: resp3.ETag,
},
}))
}, api.CompleteMultipartOptions{}))

// download the object and verify its integrity
dst := new(bytes.Buffer)
Expand Down
32 changes: 32 additions & 0 deletions internal/test/e2e/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,38 @@ func TestS3ObjectMetadata(t *testing.T) {
head, err = s3.StatObject(context.Background(), api.DefaultBucketName, t.Name(), minio.StatObjectOptions{})
tt.OK(err)
assertMetadata(metadata, head.UserMetadata)

// upload a file using multipart upload
core := cluster.S3Core
uid, err := core.NewMultipartUpload(context.Background(), api.DefaultBucketName, "multi", minio.PutObjectOptions{
UserMetadata: map[string]string{
"New": "1",
},
})
tt.OK(err)
data := frand.Bytes(3)

part, err := core.PutObjectPart(context.Background(), api.DefaultBucketName, "foo", uid, 1, bytes.NewReader(data), int64(len(data)), minio.PutObjectPartOptions{})
tt.OK(err)
_, err = core.CompleteMultipartUpload(context.Background(), api.DefaultBucketName, "multi", uid, []minio.CompletePart{
{
PartNumber: part.PartNumber,
ETag: part.ETag,
},
}, minio.PutObjectOptions{
UserMetadata: map[string]string{
"Complete": "2",
},
})
tt.OK(err)

// check metadata
head, err = s3.StatObject(context.Background(), api.DefaultBucketName, "multi", minio.StatObjectOptions{})
tt.OK(err)
assertMetadata(map[string]string{
"New": "1",
"Complete": "2",
}, head.UserMetadata)
}

func TestS3Authentication(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions s3/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,9 @@ func (b *authenticatedBackend) AbortMultipartUpload(ctx context.Context, bucket,
return b.backend.AbortMultipartUpload(ctx, bucket, object, id)
}

func (b *authenticatedBackend) CompleteMultipartUpload(ctx context.Context, bucket, object string, id gofakes3.UploadID, input *gofakes3.CompleteMultipartUploadRequest) (resp *gofakes3.CompleteMultipartUploadResult, err error) {
func (b *authenticatedBackend) CompleteMultipartUpload(ctx context.Context, bucket, object string, id gofakes3.UploadID, meta map[string]string, input *gofakes3.CompleteMultipartUploadRequest) (resp *gofakes3.CompleteMultipartUploadResult, err error) {
if !b.permsFromCtx(ctx, bucket).CompleteMultipartUpload {
return nil, gofakes3.ErrAccessDenied
}
return b.backend.CompleteMultipartUpload(ctx, bucket, object, id, input)
return b.backend.CompleteMultipartUpload(ctx, bucket, object, id, meta, input)
}
7 changes: 5 additions & 2 deletions s3/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,15 +502,18 @@ func (s *s3) AbortMultipartUpload(ctx context.Context, bucket, object string, id
return nil
}

func (s *s3) CompleteMultipartUpload(ctx context.Context, bucket, object string, id gofakes3.UploadID, input *gofakes3.CompleteMultipartUploadRequest) (*gofakes3.CompleteMultipartUploadResult, error) {
func (s *s3) CompleteMultipartUpload(ctx context.Context, bucket, object string, id gofakes3.UploadID, meta map[string]string, input *gofakes3.CompleteMultipartUploadRequest) (*gofakes3.CompleteMultipartUploadResult, error) {
convertToSiaMetadataHeaders(meta)
var parts []api.MultipartCompletedPart
for _, part := range input.Parts {
parts = append(parts, api.MultipartCompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
})
}
resp, err := s.b.CompleteMultipartUpload(ctx, bucket, "/"+object, string(id), parts)
resp, err := s.b.CompleteMultipartUpload(ctx, bucket, "/"+object, string(id), parts, api.CompleteMultipartOptions{
Metadata: api.ExtractObjectUserMetadataFrom(meta),
})
if err != nil {
return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type bus interface {
Object(ctx context.Context, bucket, path string, opts api.GetObjectOptions) (res api.ObjectsResponse, err error)

AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (err error)
CompleteMultipartUpload(ctx context.Context, bucket, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error)
CompleteMultipartUpload(ctx context.Context, bucket, path, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (_ api.MultipartCompleteResponse, err error)
CreateMultipartUpload(ctx context.Context, bucket, path string, opts api.CreateMultipartOptions) (api.MultipartCreateResponse, error)
MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error)
MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error)
Expand Down
14 changes: 6 additions & 8 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,7 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
// NOTE: the metadata is not deleted because this delete will cascade,
// if we stop recreating the object we have to make sure to delete the
// object's metadata before trying to recreate it
_, err := s.deleteObject(tx, bucket, path)
deleted, err := s.deleteObject(tx, bucket, path)
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
}
Expand Down Expand Up @@ -1771,6 +1771,10 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
return fmt.Errorf("failed to create user metadata: %w", err)
}

// Delete slabs.
if deleted > 0 {
return pruneSlabs(tx)
}
return nil
})
}
Expand Down Expand Up @@ -2727,13 +2731,7 @@ func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64,
if tx.Error != nil {
return 0, tx.Error
}
numDeleted := tx.RowsAffected
if numDeleted == 0 {
return 0, nil // nothing to prune if no object was deleted
} else if err := pruneSlabs(tx); err != nil {
return numDeleted, err
}
return numDeleted, nil
return tx.RowsAffected, nil
}

// deleteObjects deletes a batch of objects from the database. The order of
Expand Down
10 changes: 9 additions & 1 deletion stores/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, path string
})
}

func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) {
func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (_ api.MultipartCompleteResponse, err error) {
// Sanity check input parts.
if !sort.SliceIsSorted(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
Expand Down Expand Up @@ -434,6 +434,14 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str
}
}

// Create new metadata.
if len(opts.Metadata) > 0 {
err = s.createUserMetadata(tx, obj.ID, opts.Metadata)
if err != nil {
return fmt.Errorf("failed to create metadata: %w", err)
}
}

// Update user metadata.
if err := tx.
Model(&dbObjectUserMetadata{}).
Expand Down
2 changes: 1 addition & 1 deletion stores/multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) {
t.Fatal(err)
} else if nSlicesBefore == 0 {
t.Fatal("expected some slices")
} else if _, err = ss.CompleteMultipartUpload(ctx, api.DefaultBucketName, objName, resp.UploadID, parts); err != nil {
} else if _, err = ss.CompleteMultipartUpload(ctx, api.DefaultBucketName, objName, resp.UploadID, parts, api.CompleteMultipartOptions{}); err != nil {
t.Fatal(err)
} else if err := ss.db.Model(&dbSlice{}).Count(&nSlicesAfter).Error; err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 2a946cb

Please sign in to comment.