Skip to content

Commit

Permalink
[bug] Azure copyPartRange should stage block on destination
Browse files Browse the repository at this point in the history
copyPartRange should stage a block on the destination.  This matters When
source and destination containers are different.
  • Loading branch information
arielshaqed committed Feb 18, 2024
1 parent 767ebc3 commit 2d1fdf6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
13 changes: 1 addition & 12 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,18 +533,7 @@ func (a *Adapter) copyPartRange(ctx context.Context, sourceObj, destinationObj b
return nil, err
}

destinationContainer, err := a.clientCache.NewContainerClient(qualifiedDestinationKey.StorageAccountName, qualifiedDestinationKey.ContainerName)
if err != nil {
return nil, err
}
sourceContainer, err := a.clientCache.NewContainerClient(qualifiedSourceKey.StorageAccountName, qualifiedSourceKey.ContainerName)
if err != nil {
return nil, err
}

sourceBlobURL := sourceContainer.NewBlockBlobClient(qualifiedSourceKey.BlobURL)

return copyPartRange(ctx, *destinationContainer, qualifiedDestinationKey.BlobURL, *sourceBlobURL, startPosition, count)
return copyPartRange(ctx, a.clientCache, qualifiedDestinationKey, qualifiedSourceKey, startPosition, count)
}

func (a *Adapter) AbortMultiPartUpload(_ context.Context, _ block.ObjectPointer, _ string) error {
Expand Down
25 changes: 18 additions & 7 deletions pkg/block/azure/multipart_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,20 @@ func getMultipartSize(ctx context.Context, container container.Client, objName s
return int64(size), nil
}

func copyPartRange(ctx context.Context, destinationContainer container.Client, destinationObjName string, sourceBlobURL blockblob.Client, startPosition, count int64) (*block.UploadPartResponse, error) {
func copyPartRange(ctx context.Context, clientCache *ClientCache, destinationKey, sourceKey BlobURLInfo, startPosition, count int64) (*block.UploadPartResponse, error) {
destinationContainer, err := clientCache.NewContainerClient(destinationKey.StorageAccountName, destinationKey.ContainerName)
if err != nil {
return nil, fmt.Errorf("copy part: get destination client: %w", err)
}
sourceContainer, err := clientCache.NewContainerClient(sourceKey.StorageAccountName, sourceKey.ContainerName)
if err != nil {
return nil, fmt.Errorf("copy part: get source client: %w", err)
}
base64BlockID := generateRandomBlockID()
_, err := sourceBlobURL.StageBlockFromURL(ctx, base64BlockID, sourceBlobURL.URL(),
destinationBlob := destinationContainer.NewBlockBlobClient(destinationKey.BlobURL)
sourceBlob := sourceContainer.NewBlockBlobClient(sourceKey.BlobURL)

_, err = destinationBlob.StageBlockFromURL(ctx, base64BlockID, sourceBlob.URL(),
&blockblob.StageBlockFromURLOptions{
Range: blob.HTTPRange{
Offset: startPosition,
Expand All @@ -188,24 +199,24 @@ func copyPartRange(ctx context.Context, destinationContainer container.Client, d
}

// add size and id to etag
response, err := sourceBlobURL.GetProperties(ctx, nil)
response, err := destinationBlob.GetProperties(ctx, nil)
if err != nil {
return nil, err
}
etag := "\"" + hex.EncodeToString(response.ContentMD5) + "\""
size := response.ContentLength
base64Etag := base64.StdEncoding.EncodeToString([]byte(etag))
// stage id data
blobIDsURL := destinationContainer.NewBlockBlobClient(destinationObjName + idSuffix)
_, err = blobIDsURL.StageBlock(ctx, base64Etag, streaming.NopCloser(strings.NewReader(base64BlockID+"\n")), nil)
blobIDsBlob := destinationContainer.NewBlockBlobClient(destinationKey.BlobURL + idSuffix)
_, err = blobIDsBlob.StageBlock(ctx, base64Etag, streaming.NopCloser(strings.NewReader(base64BlockID+"\n")), nil)
if err != nil {
return nil, fmt.Errorf("failed staging part data: %w", err)
}

// stage size data
sizeData := fmt.Sprintf("%d\n", size)
blobSizesURL := destinationContainer.NewBlockBlobClient(destinationObjName + sizeSuffix)
_, err = blobSizesURL.StageBlock(ctx, base64Etag, streaming.NopCloser(strings.NewReader(sizeData)), nil)
blobSizesBlob := destinationContainer.NewBlockBlobClient(destinationKey.BlobURL + sizeSuffix)
_, err = blobSizesBlob.StageBlock(ctx, base64Etag, streaming.NopCloser(strings.NewReader(sizeData)), nil)
if err != nil {
return nil, fmt.Errorf("failed staging part data: %w", err)
}
Expand Down

0 comments on commit 2d1fdf6

Please sign in to comment.