From 2a946cb294a2d04e95f9f880fbafe4d2600a7501 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 11 Mar 2024 13:38:42 +0100 Subject: [PATCH 1/3] stores: persist user metadata in CompleteMultipartUpload --- api/multipart.go | 5 +++++ bus/bus.go | 6 ++++-- bus/client/multipart-upload.go | 3 ++- go.mod | 2 +- go.sum | 4 ++-- internal/test/e2e/cluster_test.go | 4 ++-- internal/test/e2e/s3_test.go | 32 +++++++++++++++++++++++++++++++ s3/authentication.go | 4 ++-- s3/backend.go | 7 +++++-- s3/s3.go | 2 +- stores/metadata.go | 14 ++++++-------- stores/multipart.go | 10 +++++++++- stores/multipart_test.go | 2 +- 13 files changed, 72 insertions(+), 23 deletions(-) diff --git a/api/multipart.go b/api/multipart.go index a191b2b13..0654386c0 100644 --- a/api/multipart.go +++ b/api/multipart.go @@ -51,6 +51,10 @@ type ( MimeType string Metadata ObjectUserMetadata } + + CompleteMultipartOptions struct { + Metadata ObjectUserMetadata + } ) type ( @@ -76,6 +80,7 @@ type ( MultipartCompleteRequest struct { Bucket string `json:"bucket"` + Metadata ObjectUserMetadata Path string `json:"path"` UploadID string `json:"uploadID"` Parts []MultipartCompletedPart diff --git a/bus/bus.go b/bus/bus.go index 045b8e82a..05770eb96 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -150,7 +150,7 @@ type ( AbortMultipartUpload(ctx context.Context, bucketName, path string, uploadID string) (err error) AddMultipartPart(ctx context.Context, bucketName, path, contractSet, eTag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) - CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) + CompleteMultipartUpload(ctx context.Context, bucketName, path, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (_ api.MultipartCompleteResponse, err error) CreateMultipartUpload(ctx context.Context, bucketName, path string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (api.MultipartCreateResponse, error) MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, _ error) MultipartUploads(ctx context.Context, bucketName, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) @@ -2244,7 +2244,9 @@ func (b *bus) multipartHandlerCompletePOST(jc jape.Context) { if jc.Decode(&req) != nil { return } - resp, err := b.ms.CompleteMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.Parts) + resp, err := b.ms.CompleteMultipartUpload(jc.Request.Context(), req.Bucket, req.Path, req.UploadID, req.Parts, api.CompleteMultipartOptions{ + Metadata: req.Metadata, + }) if jc.Check("failed to complete multipart upload", err) != nil { return } diff --git a/bus/client/multipart-upload.go b/bus/client/multipart-upload.go index 281019487..6fd06204c 100644 --- a/bus/client/multipart-upload.go +++ b/bus/client/multipart-upload.go @@ -33,10 +33,11 @@ func (c *Client) AddMultipartPart(ctx context.Context, bucket, path, contractSet } // CompleteMultipartUpload completes a multipart upload. -func (c *Client) CompleteMultipartUpload(ctx context.Context, bucket, path, uploadID string, parts []api.MultipartCompletedPart) (resp api.MultipartCompleteResponse, err error) { +func (c *Client) CompleteMultipartUpload(ctx context.Context, bucket, path, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (resp api.MultipartCompleteResponse, err error) { err = c.c.WithContext(ctx).POST("/multipart/complete", api.MultipartCompleteRequest{ Bucket: bucket, Path: path, + Metadata: opts.Metadata, UploadID: uploadID, Parts: parts, }, &resp) diff --git a/go.mod b/go.mod index 237598d4c..e0933ff50 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe go.sia.tech/core v0.2.1 go.sia.tech/coreutils v0.0.3 - go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2 + go.sia.tech/gofakes3 v0.0.0-20240311124002-c206381023db go.sia.tech/hostd v1.0.2 go.sia.tech/jape v0.11.2-0.20240124024603-93559895d640 go.sia.tech/mux v1.2.0 diff --git a/go.sum b/go.sum index 2a3b756ab..f42a8843a 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ go.sia.tech/core v0.2.1 h1:CqmMd+T5rAhC+Py3NxfvGtvsj/GgwIqQHHVrdts/LqY= go.sia.tech/core v0.2.1/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= go.sia.tech/coreutils v0.0.3 h1:ZxuzovRpQMvfy/pCOV4om1cPF6sE15GyJyK36kIrF1Y= go.sia.tech/coreutils v0.0.3/go.mod h1:UBFc77wXiE//eyilO5HLOncIEj7F69j0Nv2OkFujtP0= -go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2 h1:ulzfJNjxN5DjXHClkW2pTiDk+eJ+0NQhX87lFDZ03t0= -go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2/go.mod h1:PlsiVCn6+wssrR7bsOIlZm0DahsVrDydrlbjY4F14sg= +go.sia.tech/gofakes3 v0.0.0-20240311124002-c206381023db h1:t35K7tD79+ZZPHJ8XPaFopQvhGlQ5r1o9UgZnLOTvmc= +go.sia.tech/gofakes3 v0.0.0-20240311124002-c206381023db/go.mod h1:PlsiVCn6+wssrR7bsOIlZm0DahsVrDydrlbjY4F14sg= go.sia.tech/hostd v1.0.2 h1:GjzNIAlwg3/dViF6258Xn5DI3+otQLRqmkoPDugP+9Y= go.sia.tech/hostd v1.0.2/go.mod h1:zGw+AGVmazAp4ydvo7bZLNKTy1J51RI6Mp/oxRtYT6c= go.sia.tech/jape v0.11.2-0.20240124024603-93559895d640 h1:mSaJ622P7T/M97dAK8iPV+IRIC9M5vV28NHeceoWO3M= diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 5ca7141d5..dd3cd6e31 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -2107,7 +2107,7 @@ func TestMultipartUploads(t *testing.T) { PartNumber: 3, ETag: etag3, }, - }) + }, api.CompleteMultipartOptions{}) tt.OK(err) if ui.ETag == "" { t.Fatal("unexpected response:", ui) @@ -2435,7 +2435,7 @@ func TestMultipartUploadWrappedByPartialSlabs(t *testing.T) { PartNumber: 3, ETag: resp3.ETag, }, - })) + }, api.CompleteMultipartOptions{})) // download the object and verify its integrity dst := new(bytes.Buffer) diff --git a/internal/test/e2e/s3_test.go b/internal/test/e2e/s3_test.go index b25e11871..c8c6bb334 100644 --- a/internal/test/e2e/s3_test.go +++ b/internal/test/e2e/s3_test.go @@ -254,6 +254,38 @@ func TestS3ObjectMetadata(t *testing.T) { head, err = s3.StatObject(context.Background(), api.DefaultBucketName, t.Name(), minio.StatObjectOptions{}) tt.OK(err) assertMetadata(metadata, head.UserMetadata) + + // upload a file using multipart upload + core := cluster.S3Core + uid, err := core.NewMultipartUpload(context.Background(), api.DefaultBucketName, "multi", minio.PutObjectOptions{ + UserMetadata: map[string]string{ + "New": "1", + }, + }) + tt.OK(err) + data := frand.Bytes(3) + + part, err := core.PutObjectPart(context.Background(), api.DefaultBucketName, "foo", uid, 1, bytes.NewReader(data), int64(len(data)), minio.PutObjectPartOptions{}) + tt.OK(err) + _, err = core.CompleteMultipartUpload(context.Background(), api.DefaultBucketName, "multi", uid, []minio.CompletePart{ + { + PartNumber: part.PartNumber, + ETag: part.ETag, + }, + }, minio.PutObjectOptions{ + UserMetadata: map[string]string{ + "Complete": "2", + }, + }) + tt.OK(err) + + // check metadata + head, err = s3.StatObject(context.Background(), api.DefaultBucketName, "multi", minio.StatObjectOptions{}) + tt.OK(err) + assertMetadata(map[string]string{ + "New": "1", + "Complete": "2", + }, head.UserMetadata) } func TestS3Authentication(t *testing.T) { diff --git a/s3/authentication.go b/s3/authentication.go index 9d5da4f1a..215da52b7 100644 --- a/s3/authentication.go +++ b/s3/authentication.go @@ -264,9 +264,9 @@ func (b *authenticatedBackend) AbortMultipartUpload(ctx context.Context, bucket, return b.backend.AbortMultipartUpload(ctx, bucket, object, id) } -func (b *authenticatedBackend) CompleteMultipartUpload(ctx context.Context, bucket, object string, id gofakes3.UploadID, input *gofakes3.CompleteMultipartUploadRequest) (resp *gofakes3.CompleteMultipartUploadResult, err error) { +func (b *authenticatedBackend) CompleteMultipartUpload(ctx context.Context, bucket, object string, id gofakes3.UploadID, meta map[string]string, input *gofakes3.CompleteMultipartUploadRequest) (resp *gofakes3.CompleteMultipartUploadResult, err error) { if !b.permsFromCtx(ctx, bucket).CompleteMultipartUpload { return nil, gofakes3.ErrAccessDenied } - return b.backend.CompleteMultipartUpload(ctx, bucket, object, id, input) + return b.backend.CompleteMultipartUpload(ctx, bucket, object, id, meta, input) } diff --git a/s3/backend.go b/s3/backend.go index c05a3ec98..7b5ea74f9 100644 --- a/s3/backend.go +++ b/s3/backend.go @@ -502,7 +502,8 @@ func (s *s3) AbortMultipartUpload(ctx context.Context, bucket, object string, id return nil } -func (s *s3) CompleteMultipartUpload(ctx context.Context, bucket, object string, id gofakes3.UploadID, input *gofakes3.CompleteMultipartUploadRequest) (*gofakes3.CompleteMultipartUploadResult, error) { +func (s *s3) CompleteMultipartUpload(ctx context.Context, bucket, object string, id gofakes3.UploadID, meta map[string]string, input *gofakes3.CompleteMultipartUploadRequest) (*gofakes3.CompleteMultipartUploadResult, error) { + convertToSiaMetadataHeaders(meta) var parts []api.MultipartCompletedPart for _, part := range input.Parts { parts = append(parts, api.MultipartCompletedPart{ @@ -510,7 +511,9 @@ func (s *s3) CompleteMultipartUpload(ctx context.Context, bucket, object string, PartNumber: part.PartNumber, }) } - resp, err := s.b.CompleteMultipartUpload(ctx, bucket, "/"+object, string(id), parts) + resp, err := s.b.CompleteMultipartUpload(ctx, bucket, "/"+object, string(id), parts, api.CompleteMultipartOptions{ + Metadata: api.ExtractObjectUserMetadataFrom(meta), + }) if err != nil { return nil, gofakes3.ErrorMessage(gofakes3.ErrInternal, err.Error()) } diff --git a/s3/s3.go b/s3/s3.go index dc7ac664b..95c2e98e6 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -36,7 +36,7 @@ type bus interface { Object(ctx context.Context, bucket, path string, opts api.GetObjectOptions) (res api.ObjectsResponse, err error) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) (err error) - CompleteMultipartUpload(ctx context.Context, bucket, path, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) + CompleteMultipartUpload(ctx context.Context, bucket, path, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (_ api.MultipartCompleteResponse, err error) CreateMultipartUpload(ctx context.Context, bucket, path string, opts api.CreateMultipartOptions) (api.MultipartCreateResponse, error) MultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker string, maxUploads int) (resp api.MultipartListUploadsResponse, _ error) MultipartUploadParts(ctx context.Context, bucket, object string, uploadID string, marker int, limit int64) (resp api.MultipartListPartsResponse, _ error) diff --git a/stores/metadata.go b/stores/metadata.go index 529d7ec89..5c35f90ce 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1724,7 +1724,7 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, // NOTE: the metadata is not deleted because this delete will cascade, // if we stop recreating the object we have to make sure to delete the // object's metadata before trying to recreate it - _, err := s.deleteObject(tx, bucket, path) + deleted, err := s.deleteObject(tx, bucket, path) if err != nil { return fmt.Errorf("failed to delete object: %w", err) } @@ -1771,6 +1771,10 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, return fmt.Errorf("failed to create user metadata: %w", err) } + // Delete slabs. + if deleted > 0 { + return pruneSlabs(tx) + } return nil }) } @@ -2727,13 +2731,7 @@ func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, if tx.Error != nil { return 0, tx.Error } - numDeleted := tx.RowsAffected - if numDeleted == 0 { - return 0, nil // nothing to prune if no object was deleted - } else if err := pruneSlabs(tx); err != nil { - return numDeleted, err - } - return numDeleted, nil + return tx.RowsAffected, nil } // deleteObjects deletes a batch of objects from the database. The order of diff --git a/stores/multipart.go b/stores/multipart.go index 864503455..be3333077 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -313,7 +313,7 @@ func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, path string }) } -func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart) (_ api.MultipartCompleteResponse, err error) { +func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path string, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (_ api.MultipartCompleteResponse, err error) { // Sanity check input parts. if !sort.SliceIsSorted(parts, func(i, j int) bool { return parts[i].PartNumber < parts[j].PartNumber @@ -434,6 +434,14 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str } } + // Create new metadata. + if len(opts.Metadata) > 0 { + err = s.createUserMetadata(tx, obj.ID, opts.Metadata) + if err != nil { + return fmt.Errorf("failed to create metadata: %w", err) + } + } + // Update user metadata. if err := tx. Model(&dbObjectUserMetadata{}). diff --git a/stores/multipart_test.go b/stores/multipart_test.go index 37b294418..50272fcda 100644 --- a/stores/multipart_test.go +++ b/stores/multipart_test.go @@ -91,7 +91,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { t.Fatal(err) } else if nSlicesBefore == 0 { t.Fatal("expected some slices") - } else if _, err = ss.CompleteMultipartUpload(ctx, api.DefaultBucketName, objName, resp.UploadID, parts); err != nil { + } else if _, err = ss.CompleteMultipartUpload(ctx, api.DefaultBucketName, objName, resp.UploadID, parts, api.CompleteMultipartOptions{}); err != nil { t.Fatal(err) } else if err := ss.db.Model(&dbSlice{}).Count(&nSlicesAfter).Error; err != nil { t.Fatal(err) From 894af25816d9a11bcb1384ac1c3068302ad10d01 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Mon, 11 Mar 2024 14:22:42 +0100 Subject: [PATCH 2/3] stores: revert some changes --- stores/metadata.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/stores/metadata.go b/stores/metadata.go index 5c35f90ce..529d7ec89 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1724,7 +1724,7 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, // NOTE: the metadata is not deleted because this delete will cascade, // if we stop recreating the object we have to make sure to delete the // object's metadata before trying to recreate it - deleted, err := s.deleteObject(tx, bucket, path) + _, err := s.deleteObject(tx, bucket, path) if err != nil { return fmt.Errorf("failed to delete object: %w", err) } @@ -1771,10 +1771,6 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, return fmt.Errorf("failed to create user metadata: %w", err) } - // Delete slabs. - if deleted > 0 { - return pruneSlabs(tx) - } return nil }) } @@ -2731,7 +2727,13 @@ func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, if tx.Error != nil { return 0, tx.Error } - return tx.RowsAffected, nil + numDeleted := tx.RowsAffected + if numDeleted == 0 { + return 0, nil // nothing to prune if no object was deleted + } else if err := pruneSlabs(tx); err != nil { + return numDeleted, err + } + return numDeleted, nil } // deleteObjects deletes a batch of objects from the database. The order of From 4648806009a5ade5da12015fa3895d601e0d6e52 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 12 Mar 2024 11:49:01 +0100 Subject: [PATCH 3/3] api: add missing json tags in MultipartCompleteRequest --- api/multipart.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/api/multipart.go b/api/multipart.go index 0654386c0..ee26567b1 100644 --- a/api/multipart.go +++ b/api/multipart.go @@ -79,11 +79,11 @@ type ( } MultipartCompleteRequest struct { - Bucket string `json:"bucket"` - Metadata ObjectUserMetadata - Path string `json:"path"` - UploadID string `json:"uploadID"` - Parts []MultipartCompletedPart + Bucket string `json:"bucket"` + Metadata ObjectUserMetadata `json:"metadata"` + Path string `json:"path"` + UploadID string `json:"uploadID"` + Parts []MultipartCompletedPart `json:"parts"` } MultipartCreateRequest struct {