diff --git a/pkg/block/gs/adapter.go b/pkg/block/gs/adapter.go index 4ae38b67df8..43328cddeb4 100644 --- a/pkg/block/gs/adapter.go +++ b/pkg/block/gs/adapter.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "net/url" + "sort" "strings" "time" @@ -25,7 +26,10 @@ const ( ) var ( + ErrMismatchPartETag = errors.New("mismatch part ETag") + ErrMismatchPartName = errors.New("mismatch part name") ErrMaxMultipartObjects = errors.New("maximum multipart object reached") + ErrPartListMismatch = errors.New("multipart part list mismatch") ErrMissingTargetAttrs = errors.New("missing target attributes") ) @@ -499,7 +503,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP }) lg.Debug("started multipart upload") - parts, err := a.getPartNames(bucketName, uploadID, multipartList) + parts, err := a.getPartNamesWithValidation(ctx, bucketName, uploadID, multipartList) if err != nil { return nil, err } @@ -524,18 +528,89 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP }, nil } -func (a *Adapter) getPartNames(bucketName, uploadID string, multipartList *block.MultipartUploadCompletion) ([]string, error) { - if len(multipartList.Part) > MaxMultipartObjects { - return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, ErrMaxMultipartObjects) +func (a *Adapter) getPartNamesWithValidation(ctx context.Context, bucketName, uploadID string, multipartList *block.MultipartUploadCompletion) ([]string, error) { + // list bucket parts and validate request match + bucketParts, err := a.listMultipartUploadParts(ctx, bucketName, uploadID) + if err != nil { + return nil, err + } + // validate bucketParts match the request multipartList + err = a.validateMultipartUploadParts(uploadID, multipartList, bucketParts) + if err != nil { + return nil, err } - parts := make([]string, len(multipartList.Part)) - for i := 0; i < len(parts); i++ { - parts[i] = formatMultipartFilename(uploadID, i+1) + // prepare names + parts := make([]string, len(bucketParts)) + for i, part := range bucketParts { + parts[i] = part.Name } return parts, nil } +func (a *Adapter) validateMultipartUploadParts(uploadID string, multipartList *block.MultipartUploadCompletion, bucketParts []*storage.ObjectAttrs) error { + if len(multipartList.Part) != len(bucketParts) { + return fmt.Errorf("part list mismatch - expected %d parts, got %d: %w", len(bucketParts), len(multipartList.Part), ErrPartListMismatch) + } + for i, p := range multipartList.Part { + objName := formatMultipartFilename(uploadID, p.PartNumber) + if objName != bucketParts[i].Name { + return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartName) + } + if p.ETag != bucketParts[i].Etag { + return fmt.Errorf("invalid part at position %d: %w", i, ErrMismatchPartETag) + } + } + return nil +} + +func (a *Adapter) listMultipartUploadParts(ctx context.Context, bucketName string, uploadID string) ([]*storage.ObjectAttrs, error) { + bucket := a.client.Bucket(bucketName) + var bucketParts []*storage.ObjectAttrs + query := &storage.Query{ + Delimiter: delimiter, + Prefix: uploadID + partSuffix, + } + err := query.SetAttrSelection([]string{"Name", "Etag"}) + if err != nil { + return nil, err + } + it := bucket.Objects(ctx, query) + for { + attrs, err := it.Next() + if errors.Is(err, iterator.Done) { + break + } + if err != nil { + return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, err) + } + + // filter out invalid part names + if !a.isPartName(attrs.Name) { + continue + } + + bucketParts = append(bucketParts, attrs) + if len(bucketParts) > MaxMultipartObjects { + return nil, fmt.Errorf("listing bucket '%s' upload '%s': %w", bucketName, uploadID, ErrMaxMultipartObjects) + } + } + // sort by name - assume natual sort order + sort.Slice(bucketParts, func(i, j int) bool { + return bucketParts[i].Name < bucketParts[j].Name + }) + return bucketParts, nil +} + +// isPartName checks it's a valid part name, as opposed to an already merged group of parts +func (a *Adapter) isPartName(name string) bool { + if len(name) < len(partSuffix)+5 { + return false + } + suffixSubstring := name[len(name)-5-len(partSuffix) : len(name)-5] + return partSuffix == suffixSubstring +} + func (a *Adapter) composeMultipartUploadParts(ctx context.Context, bucketName string, uploadID string, parts []string) (*storage.ObjectAttrs, error) { // compose target from all parts bucket := a.client.Bucket(bucketName)