From 48a0620c2ac8fe776b6f2993a0f4ea09e90a62b8 Mon Sep 17 00:00:00 2001 From: wayblink Date: Mon, 23 Oct 2023 11:53:48 +0800 Subject: [PATCH] support SAS --- core/backup_impl_create_backup.go | 1 - core/backup_impl_restore_backup.go | 2 +- core/storage/azure_chunk_manager.go | 59 ++++++++++------------------ core/storage/azure_object_storage.go | 48 ++++++++++++++++++++-- 4 files changed, 66 insertions(+), 44 deletions(-) diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index b70aad10..10aa7c66 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -1123,7 +1123,6 @@ func (b *BackupContext) readSegmentInfo(ctx context.Context, collectionID int64, var rootPath string if b.params.MinioCfg.RootPath != "" { - log.Debug("params.MinioCfg.RootPath", zap.String("params.MinioCfg.RootPath", b.params.MinioCfg.RootPath)) rootPath = fmt.Sprintf("%s/", b.params.MinioCfg.RootPath) } else { rootPath = "" diff --git a/core/backup_impl_restore_backup.go b/core/backup_impl_restore_backup.go index 40e809db..ff8bb112 100644 --- a/core/backup_impl_restore_backup.go +++ b/core/backup_impl_restore_backup.go @@ -449,7 +449,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup } } - tempDir := "restore-temp-" + parentTaskID + SEPERATOR + tempDir := fmt.Sprintf("restore-temp-%s-%s-%s%s", parentTaskID, task.TargetDbName, task.TargetCollectionName, SEPERATOR) isSameBucket := b.milvusBucketName == backupBucketName // clean the temporary file defer func() { diff --git a/core/storage/azure_chunk_manager.go b/core/storage/azure_chunk_manager.go index 4d2923bc..622d6eb8 100644 --- a/core/storage/azure_chunk_manager.go +++ b/core/storage/azure_chunk_manager.go @@ -148,7 +148,7 @@ func (mcm *AzureChunkManager) MultiWrite(ctx context.Context, bucketName string, // Exist checks whether chunk is saved to minio storage. func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, filePath string) (bool, error) { - _, err := mcm.getObjectSize(ctx, bucketName, filePath) + objs, err := mcm.aos.ListObjects(ctx, bucketName, filePath, true) if err != nil { if IsErrNoSuchKey(err) { return false, nil @@ -156,7 +156,11 @@ func (mcm *AzureChunkManager) Exist(ctx context.Context, bucketName string, file log.Warn("failed to stat object", zap.String("bucket", bucketName), zap.String("path", filePath), zap.Error(err)) return false, err } - return true, nil + if len(objs) > 0 { + return true, nil + } else { + return false, nil + } } // Read reads the minio storage data if exists. @@ -313,7 +317,7 @@ func (mcm *AzureChunkManager) RemoveWithPrefix(ctx context.Context, bucketName s // ListWithPrefix returns objects with provided prefix. func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []int64, error) { - objects, err := mcm.listObjects(ctx, bucketName, prefix, false) + objects, err := mcm.listObjects(ctx, bucketName, prefix, true) if err != nil { return nil, nil, err } @@ -327,9 +331,9 @@ func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName str return objectsKeys, sizes, nil } else { var objectsKeys []string - var sizes []int64 + sizesDict := make(map[string]int64, 0) objectsKeysDict := make(map[string]bool, 0) - for object, _ := range objects { + for object, size := range objects { keyWithoutPrefix := strings.Replace(object, prefix, "", 1) if strings.Contains(keyWithoutPrefix, "/") { var key string @@ -340,52 +344,29 @@ func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName str } if _, exist := objectsKeysDict[key]; !exist { objectsKeys = append(objectsKeys, key) - sizes = append(sizes, 0) + sizesDict[key] = size objectsKeysDict[key] = true + } else { + sizesDict[key] = size + sizesDict[key] } } else { key := prefix + keyWithoutPrefix if _, exist := objectsKeysDict[key]; !exist { objectsKeys = append(objectsKeys, key) - sizes = append(sizes, 0) + sizesDict[key] = size objectsKeysDict[key] = true + } else { + sizesDict[key] = size + sizesDict[key] } } } + var sizes []int64 + for _, objectKey := range objectsKeys { + sizes = append(sizes, sizesDict[objectKey]) + } + return objectsKeys, sizes, nil } - - //var objectsKeys []string - //var sizes []int64 - //tasks := list.New() - //tasks.PushBack(prefix) - //for tasks.Len() > 0 { - // e := tasks.Front() - // pre := e.Value.(string) - // tasks.Remove(e) - // - // // TODO add concurrent call if performance matters - // // only return current level per call - // objects, err := mcm.listObjects(ctx, bucketName, pre, false) - // if err != nil { - // return nil, nil, err - // } - // - // for object, contentLength := range objects { - // // with tailing "/", object is a "directory" - // if strings.HasSuffix(object, "/") && recursive { - // // enqueue when recursive is true - // if object != pre { - // tasks.PushBack(object) - // } - // continue - // } - // objectsKeys = append(objectsKeys, object) - // sizes = append(sizes, contentLength) - // } - //} - // - //return objectsKeys, sizes, nil } func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { diff --git a/core/storage/azure_object_storage.go b/core/storage/azure_object_storage.go index 580f0366..b71c18ad 100644 --- a/core/storage/azure_object_storage.go +++ b/core/storage/azure_object_storage.go @@ -21,18 +21,23 @@ import ( "fmt" "io" "os" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" "github.com/zilliztech/milvus-backup/internal/util/retry" ) +const sasSignMinute = 60 + type innerAzureClient struct { client *service.Client @@ -195,7 +200,44 @@ func (aos *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, obj } func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, toBucketName, fromPath, toPath string) error { - fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath) - _, err := aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil) - return err + if aos.clients[fromBucketName].accessKeyID == aos.clients[toBucketName].accessKeyID { + fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath) + _, err := aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil) + return err + } else { + srcSAS, err := aos.getSAS(fromBucketName) + if err != nil { + return err + } + fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath, srcSAS.Encode()) + _, err = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil) + return err + } +} + +func (aos *AzureObjectStorage) getSAS(bucket string) (*sas.QueryParameters, error) { + srcSvcCli := aos.clients[bucket].client + // Set current and past time and create key + now := time.Now().UTC().Add(-10 * time.Second) + expiry := now.Add(48 * time.Hour) + info := service.KeyInfo{ + Start: to.Ptr(now.UTC().Format(sas.TimeFormat)), + Expiry: to.Ptr(expiry.UTC().Format(sas.TimeFormat)), + } + udc, err := srcSvcCli.GetUserDelegationCredential(context.Background(), info, nil) + if err != nil { + return nil, err + } + // Create Blob Signature Values with desired permissions and sign with user delegation credential + sasQueryParams, err := sas.BlobSignatureValues{ + Protocol: sas.ProtocolHTTPS, + StartTime: time.Now().UTC().Add(time.Second * -10), + ExpiryTime: time.Now().UTC().Add(time.Duration(sasSignMinute * time.Minute)), + Permissions: to.Ptr(sas.ContainerPermissions{Read: true, List: true}).String(), + ContainerName: bucket, + }.SignWithUserDelegation(udc) + if err != nil { + return nil, err + } + return &sasQueryParams, nil }