Skip to content

Commit

Permalink
support SAS
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Oct 23, 2023
1 parent debe1cd commit 48a0620
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 44 deletions.
1 change: 0 additions & 1 deletion core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,6 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64,
var rootPath string

if b.params.MinioCfg.RootPath != "" {
log.Debug("params.MinioCfg.RootPath", zap.String("params.MinioCfg.RootPath", b.params.MinioCfg.RootPath))
rootPath = fmt.Sprintf("%s/", b.params.MinioCfg.RootPath)
} else {
rootPath = ""
Expand Down
2 changes: 1 addition & 1 deletion core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
}
}

tempDir := "restore-temp-" + parentTaskID + SEPERATOR
tempDir := fmt.Sprintf("restore-temp-%s-%s-%s%s", parentTaskID, task.TargetDbName, task.TargetCollectionName, SEPERATOR)
isSameBucket := b.milvusBucketName == backupBucketName
// clean the temporary file
defer func() {
Expand Down
59 changes: 20 additions & 39 deletions core/storage/azure_chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,19 @@ func (mcm *AzureChunkManager) MultiWrite(ctx context.Context, bucketName string,

// Exist checks whether chunk is saved to minio storage.
func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, filePath string) (bool, error) {
_, err := mcm.getObjectSize(ctx, bucketName, filePath)
objs, err := mcm.aos.ListObjects(ctx, bucketName, filePath, true)
if err != nil {
if IsErrNoSuchKey(err) {
return false, nil
}
log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err))
return false, err
}
return true, nil
if len(objs) > 0 {
return true, nil
} else {
return false, nil
}
}

// Read reads the minio storage data if exists.
Expand Down Expand Up @@ -313,7 +317,7 @@ func (mcm *AzureChunkManager) RemoveWithPrefix(ctx context.Context, bucketName s

// ListWithPrefix returns objects with provided prefix.
func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []int64, error) {
objects, err := mcm.listObjects(ctx, bucketName, prefix, false)
objects, err := mcm.listObjects(ctx, bucketName, prefix, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -327,9 +331,9 @@ func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName str
return objectsKeys, sizes, nil
} else {
var objectsKeys []string
var sizes []int64
sizesDict := make(map[string]int64, 0)
objectsKeysDict := make(map[string]bool, 0)
for object, _ := range objects {
for object, size := range objects {
keyWithoutPrefix := strings.Replace(object, prefix, "", 1)
if strings.Contains(keyWithoutPrefix, "/") {
var key string
Expand All @@ -340,52 +344,29 @@ func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName str
}
if _, exist := objectsKeysDict[key]; !exist {
objectsKeys = append(objectsKeys, key)
sizes = append(sizes, 0)
sizesDict[key] = size
objectsKeysDict[key] = true
} else {
sizesDict[key] = size + sizesDict[key]
}
} else {
key := prefix + keyWithoutPrefix
if _, exist := objectsKeysDict[key]; !exist {
objectsKeys = append(objectsKeys, key)
sizes = append(sizes, 0)
sizesDict[key] = size
objectsKeysDict[key] = true
} else {
sizesDict[key] = size + sizesDict[key]
}
}
}
var sizes []int64
for _, objectKey := range objectsKeys {
sizes = append(sizes, sizesDict[objectKey])
}

return objectsKeys, sizes, nil
}

//var objectsKeys []string
//var sizes []int64
//tasks := list.New()
//tasks.PushBack(prefix)
//for tasks.Len() > 0 {
// e := tasks.Front()
// pre := e.Value.(string)
// tasks.Remove(e)
//
// // TODO add concurrent call if performance matters
// // only return current level per call
// objects, err := mcm.listObjects(ctx, bucketName, pre, false)
// if err != nil {
// return nil, nil, err
// }
//
// for object, contentLength := range objects {
// // with tailing "/", object is a "directory"
// if strings.HasSuffix(object, "/") && recursive {
// // enqueue when recursive is true
// if object != pre {
// tasks.PushBack(object)
// }
// continue
// }
// objectsKeys = append(objectsKeys, object)
// sizes = append(sizes, contentLength)
// }
//}
//
//return objectsKeys, sizes, nil
}

func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) {
Expand Down
48 changes: 45 additions & 3 deletions core/storage/azure_object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@ import (
"fmt"
"io"
"os"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"

"github.com/zilliztech/milvus-backup/internal/util/retry"
)

const sasSignMinute = 60

type innerAzureClient struct {
client *service.Client

Expand Down Expand Up @@ -195,7 +200,44 @@ func (aos *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, obj
}

func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, toBucketName, fromPath, toPath string) error {
fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath)
_, err := aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil)
return err
if aos.clients[fromBucketName].accessKeyID == aos.clients[toBucketName].accessKeyID {
fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath)
_, err := aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil)
return err
} else {
srcSAS, err := aos.getSAS(fromBucketName)
if err != nil {
return err
}
fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath, srcSAS.Encode())
_, err = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil)
return err
}
}

func (aos *AzureObjectStorage) getSAS(bucket string) (*sas.QueryParameters, error) {
srcSvcCli := aos.clients[bucket].client
// Set current and past time and create key
now := time.Now().UTC().Add(-10 * time.Second)
expiry := now.Add(48 * time.Hour)
info := service.KeyInfo{
Start: to.Ptr(now.UTC().Format(sas.TimeFormat)),
Expiry: to.Ptr(expiry.UTC().Format(sas.TimeFormat)),
}
udc, err := srcSvcCli.GetUserDelegationCredential(context.Background(), info, nil)
if err != nil {
return nil, err
}
// Create Blob Signature Values with desired permissions and sign with user delegation credential
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: time.Now().UTC().Add(time.Second * -10),
ExpiryTime: time.Now().UTC().Add(time.Duration(sasSignMinute * time.Minute)),
Permissions: to.Ptr(sas.ContainerPermissions{Read: true, List: true}).String(),
ContainerName: bucket,
}.SignWithUserDelegation(udc)
if err != nil {
return nil, err
}
return &sasQueryParams, nil
}

0 comments on commit 48a0620

Please sign in to comment.