From ac422b3bc3e822b4525401496a8b73e91d566128 Mon Sep 17 00:00:00 2001 From: Dylan Guedes Date: Wed, 25 Sep 2024 14:29:12 -0300 Subject: [PATCH] feat: Introduce new `ObjectExistsWithSize` API to (#14268) **What this PR does / why we need it**: Introduce a new `ObjectExistsWithSize` API to our object storage interface. This is the same as `ObjectExists` but with the object size as part of the return value. This is useful to compare the object size present. --- pkg/ingester-rf1/objstore/storage.go | 8 ++++++ .../chunk/client/alibaba/oss_object_client.go | 19 +++++++++++--- .../chunk/client/aws/s3_storage_client.go | 26 ++++++++++++++----- .../client/aws/s3_storage_client_test.go | 18 +++++++++++++ .../chunk/client/azure/blob_storage_client.go | 23 +++++++++++++--- .../client/baidubce/bos_storage_client.go | 21 +++++++++++---- .../chunk/client/congestion/controller.go | 7 +++++ .../client/congestion/controller_test.go | 4 +++ .../chunk/client/gcp/gcs_object_client.go | 14 +++++++--- .../client/ibmcloud/cos_object_client.go | 21 +++++++++++---- .../chunk/client/local/fs_object_client.go | 13 +++++++--- .../client/local/fs_object_client_test.go | 5 ++++ pkg/storage/chunk/client/object_client.go | 1 + .../client/openstack/swift_object_client.go | 11 +++++--- .../chunk/client/prefixed_object_client.go | 4 +++ .../testutils/inmemory_storage_client.go | 15 +++++++---- pkg/tool/audit/audit_test.go | 11 +++++--- 17 files changed, 178 insertions(+), 43 deletions(-) diff --git a/pkg/ingester-rf1/objstore/storage.go b/pkg/ingester-rf1/objstore/storage.go index ec0d734b316b7..5a8c61fd5117b 100644 --- a/pkg/ingester-rf1/objstore/storage.go +++ b/pkg/ingester-rf1/objstore/storage.go @@ -70,6 +70,14 @@ func (m *Multi) GetStoreFor(ts model.Time) (client.ObjectClient, error) { return nil, fmt.Errorf("no store found for timestamp %s", ts) } +func (m *Multi) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return false, 0, err + } + return s.ObjectExistsWithSize(ctx, objectKey) +} + func (m *Multi) ObjectExists(ctx context.Context, objectKey string) (bool, error) { s, err := m.GetStoreFor(model.Now()) if err != nil { diff --git a/pkg/storage/chunk/client/alibaba/oss_object_client.go b/pkg/storage/chunk/client/alibaba/oss_object_client.go index 423a7348086e4..9d7f4cc48ce1c 100644 --- a/pkg/storage/chunk/client/alibaba/oss_object_client.go +++ b/pkg/storage/chunk/client/alibaba/oss_object_client.go @@ -73,16 +73,27 @@ func (s *OssObjectClient) Stop() { } func (s *OssObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + exists, _, err := s.ObjectExistsWithSize(ctx, objectKey) + return exists, err +} + +func (s *OssObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { var options []oss.Option + var objectSize int64 err := instrument.CollectedRequest(ctx, "OSS.ObjectExists", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error { - _, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...) - return requestErr + headers, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...) + if requestErr != nil { + return requestErr + } + + objectSize, _ = strconv.ParseInt(headers.Get(oss.HTTPHeaderContentLength), 10, 64) + return nil }) if err != nil { - return false, err + return false, 0, err } - return true, nil + return true, objectSize, nil } // GetObject returns a reader and the size for the specified object key from the configured OSS bucket. diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index a7489bf847706..26c2807e120e7 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -310,37 +310,49 @@ func buckets(cfg S3Config) ([]string, error) { func (a *S3ObjectClient) Stop() {} func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + exists, _, err := a.ObjectExistsWithSize(ctx, objectKey) + return exists, err +} + +func (a *S3ObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { var lastErr error + var objectSize int64 retries := backoff.New(ctx, a.cfg.BackoffConfig) for retries.Ongoing() { if ctx.Err() != nil { - return false, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists") + return false, 0, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists") } lastErr = instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(_ context.Context) error { headObjectInput := &s3.HeadObjectInput{ Bucket: aws.String(a.bucketFromKey(objectKey)), Key: aws.String(objectKey), } - _, requestErr := a.S3.HeadObject(headObjectInput) - return requestErr + headOutput, requestErr := a.S3.HeadObject(headObjectInput) + if requestErr != nil { + return requestErr + } + if headOutput != nil && headOutput.ContentLength != nil { + objectSize = *headOutput.ContentLength + } + return nil }) if lastErr == nil { - return true, nil + return true, 0, nil } if a.IsObjectNotFoundErr(lastErr) { - return false, lastErr + return false, 0, lastErr } retries.Wait() } if lastErr != nil { - return false, lastErr + return false, 0, lastErr } - return true, nil + return true, objectSize, nil } // DeleteObject deletes the specified objectKey from the appropriate S3 bucket diff --git a/pkg/storage/chunk/client/aws/s3_storage_client_test.go b/pkg/storage/chunk/client/aws/s3_storage_client_test.go index ba2939ff46884..e18160a3fa003 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client_test.go @@ -334,6 +334,15 @@ func Test_RetryLogic(t *testing.T) { return err }, }, + { + "object exists with size with retries", + 3, + true, + func(c *S3ObjectClient) error { + _, _, err := c.ObjectExistsWithSize(context.Background(), "foo") + return err + }, + }, { "object doesn't exist with retries", 3, @@ -343,6 +352,15 @@ func Test_RetryLogic(t *testing.T) { return err }, }, + { + "object doesn't exist (with size) with retries", + 3, + false, + func(c *S3ObjectClient) error { + _, _, err := c.ObjectExistsWithSize(context.Background(), "foo") + return err + }, + }, } { t.Run(tc.name, func(t *testing.T) { callCount := atomic.NewInt32(0) diff --git a/pkg/storage/chunk/client/azure/blob_storage_client.go b/pkg/storage/chunk/client/azure/blob_storage_client.go index 0a9d6300b1634..2a7014c29898e 100644 --- a/pkg/storage/chunk/client/azure/blob_storage_client.go +++ b/pkg/storage/chunk/client/azure/blob_storage_client.go @@ -220,20 +220,35 @@ func NewBlobStorage(cfg *BlobStorageConfig, metrics BlobStorageMetrics, hedgingC func (b *BlobStorage) Stop() {} func (b *BlobStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + exists, _, err := b.ObjectExistsWithSize(ctx, objectKey) + return exists, err +} + +func (b *BlobStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { + var objectSize int64 err := loki_instrument.TimeRequest(ctx, "azure.ObjectExists", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error { blockBlobURL, err := b.getBlobURL(objectKey, false) if err != nil { return err } - _, err = blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) - return err + response, err := blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return err + } + if response != nil { + rawResponse := response.Response() + if rawResponse != nil { + objectSize = rawResponse.ContentLength + } + } + return nil }) if err != nil { - return false, err + return false, 0, err } - return true, nil + return true, objectSize, nil } // GetObject returns a reader and the size for the specified object key. diff --git a/pkg/storage/chunk/client/baidubce/bos_storage_client.go b/pkg/storage/chunk/client/baidubce/bos_storage_client.go index b76db38e47c60..cc76b21624294 100644 --- a/pkg/storage/chunk/client/baidubce/bos_storage_client.go +++ b/pkg/storage/chunk/client/baidubce/bos_storage_client.go @@ -91,16 +91,27 @@ func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, obje } func (b *BOSObjectStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + exists, _, err := b.ObjectExistsWithSize(ctx, objectKey) + return exists, err +} + +func (b *BOSObjectStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { + var objectSize int64 err := instrument.CollectedRequest(ctx, "BOS.ObjectExists", bosRequestDuration, instrument.ErrorCode, func(_ context.Context) error { - var requestErr error - _, requestErr = b.client.GetObjectMeta(b.cfg.BucketName, objectKey) - return requestErr + metaResult, requestErr := b.client.GetObjectMeta(b.cfg.BucketName, objectKey) + if requestErr != nil { + return requestErr + } + if metaResult != nil { + objectSize = metaResult.ContentLength + } + return nil }) if err != nil { - return false, err + return false, 0, err } - return true, nil + return true, objectSize, nil } func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { diff --git a/pkg/storage/chunk/client/congestion/controller.go b/pkg/storage/chunk/client/congestion/controller.go index 1e3e2ee0dcb3b..1c69ef16139f7 100644 --- a/pkg/storage/chunk/client/congestion/controller.go +++ b/pkg/storage/chunk/client/congestion/controller.go @@ -145,6 +145,10 @@ func (a *AIMDController) ObjectExists(ctx context.Context, objectKey string) (bo return a.inner.ObjectExists(ctx, objectKey) } +func (a *AIMDController) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { + return a.inner.ObjectExistsWithSize(ctx, objectKey) +} + func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error { return a.inner.DeleteObject(ctx, objectKey) } @@ -212,6 +216,9 @@ func NewNoopController(Config) *NoopController { return &NoopController{} } +func (n *NoopController) ObjectExistsWithSize(context.Context, string) (bool, int64, error) { + return true, 0, nil +} func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil } func (n *NoopController) PutObject(context.Context, string, io.Reader) error { return nil } func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) { diff --git a/pkg/storage/chunk/client/congestion/controller_test.go b/pkg/storage/chunk/client/congestion/controller_test.go index a46466ebfc546..6d17573248f50 100644 --- a/pkg/storage/chunk/client/congestion/controller_test.go +++ b/pkg/storage/chunk/client/congestion/controller_test.go @@ -267,6 +267,10 @@ func (m *mockObjectClient) ObjectExists(context.Context, string) (bool, error) { panic("not implemented") } +func (m *mockObjectClient) ObjectExistsWithSize(context.Context, string) (bool, int64, error) { + panic("not implemented") +} + func (m *mockObjectClient) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { panic("not implemented") } diff --git a/pkg/storage/chunk/client/gcp/gcs_object_client.go b/pkg/storage/chunk/client/gcp/gcs_object_client.go index e8f5b4a64d850..c161705ecf7f5 100644 --- a/pkg/storage/chunk/client/gcp/gcs_object_client.go +++ b/pkg/storage/chunk/client/gcp/gcs_object_client.go @@ -127,12 +127,20 @@ func (s *GCSObjectClient) Stop() { } func (s *GCSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { - _, err := s.getsBuckets.Object(objectKey).Attrs(ctx) + exists, _, err := s.ObjectExistsWithSize(ctx, objectKey) + return exists, err +} + +func (s *GCSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { + attrs, err := s.getsBuckets.Object(objectKey).Attrs(ctx) if err != nil { - return false, err + return false, 0, err } - return true, nil + if attrs != nil { + return true, attrs.Size, nil + } + return true, 0, nil } // GetObject returns a reader and the size for the specified object key from the configured GCS bucket. diff --git a/pkg/storage/chunk/client/ibmcloud/cos_object_client.go b/pkg/storage/chunk/client/ibmcloud/cos_object_client.go index d432071293054..51a9c9fd93086 100644 --- a/pkg/storage/chunk/client/ibmcloud/cos_object_client.go +++ b/pkg/storage/chunk/client/ibmcloud/cos_object_client.go @@ -320,20 +320,31 @@ func (c *COSObjectClient) DeleteObject(ctx context.Context, objectKey string) er } func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + exists, _, err := c.ObjectExistsWithSize(ctx, objectKey) + return exists, err +} + +func (c *COSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { bucket := c.bucketFromKey(objectKey) + var objectSize int64 err := instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(_ context.Context) error { - var requestErr error - _, requestErr = c.hedgedCOS.HeadObject(&cos.HeadObjectInput{ + headOutput, requestErr := c.hedgedCOS.HeadObject(&cos.HeadObjectInput{ Bucket: ibm.String(bucket), Key: ibm.String(objectKey), }) - return requestErr + if requestErr != nil { + return requestErr + } + if headOutput != nil && headOutput.ContentLength != nil { + objectSize = *headOutput.ContentLength + } + return nil }) if err != nil { - return false, err + return false, 0, err } - return true, nil + return true, objectSize, nil } // GetObject returns a reader and the size for the specified object key from the configured S3 bucket. diff --git a/pkg/storage/chunk/client/local/fs_object_client.go b/pkg/storage/chunk/client/local/fs_object_client.go index 0eb027e9fd3cf..671b5df285870 100644 --- a/pkg/storage/chunk/client/local/fs_object_client.go +++ b/pkg/storage/chunk/client/local/fs_object_client.go @@ -67,14 +67,19 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) { // Stop implements ObjectClient func (FSObjectClient) Stop() {} -func (f *FSObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) { +func (f *FSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + exists, _, err := f.ObjectExistsWithSize(ctx, objectKey) + return exists, err +} + +func (f *FSObjectClient) ObjectExistsWithSize(_ context.Context, objectKey string) (bool, int64, error) { fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey)) - _, err := os.Lstat(fullPath) + fi, err := os.Lstat(fullPath) if err != nil { - return false, err + return false, 0, err } - return true, nil + return true, fi.Size(), nil } // GetObject from the store diff --git a/pkg/storage/chunk/client/local/fs_object_client_test.go b/pkg/storage/chunk/client/local/fs_object_client_test.go index 2dc059b3f5f1a..15ad96425dc9c 100644 --- a/pkg/storage/chunk/client/local/fs_object_client_test.go +++ b/pkg/storage/chunk/client/local/fs_object_client_test.go @@ -156,6 +156,11 @@ func TestFSObjectClient_List_and_ObjectExists(t *testing.T) { ok, err := bucketClient.ObjectExists(context.Background(), "outer-file2") require.NoError(t, err) require.True(t, ok) + + ok, objectSize, err := bucketClient.ObjectExistsWithSize(context.Background(), "outer-file2") + require.NoError(t, err) + require.True(t, ok) + require.EqualValues(t, len("outer-file2"), objectSize) } func TestFSObjectClient_DeleteObject(t *testing.T) { diff --git a/pkg/storage/chunk/client/object_client.go b/pkg/storage/chunk/client/object_client.go index 225f5025b1d51..95672c286ad09 100644 --- a/pkg/storage/chunk/client/object_client.go +++ b/pkg/storage/chunk/client/object_client.go @@ -19,6 +19,7 @@ import ( // ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...) type ObjectClient interface { ObjectExists(ctx context.Context, objectKey string) (bool, error) + ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) PutObject(ctx context.Context, objectKey string, object io.Reader) error // NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak. diff --git a/pkg/storage/chunk/client/openstack/swift_object_client.go b/pkg/storage/chunk/client/openstack/swift_object_client.go index 951a4d652a5a1..d3d978cd5ef72 100644 --- a/pkg/storage/chunk/client/openstack/swift_object_client.go +++ b/pkg/storage/chunk/client/openstack/swift_object_client.go @@ -125,12 +125,17 @@ func (s *SwiftObjectClient) Stop() { } func (s *SwiftObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { - _, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey) + exists, _, err := s.ObjectExistsWithSize(ctx, objectKey) + return exists, err +} + +func (s *SwiftObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { + info, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey) if err != nil { - return false, err + return false, 0, err } - return true, nil + return true, info.Bytes, nil } // GetObject returns a reader and the size for the specified object key from the configured swift container. diff --git a/pkg/storage/chunk/client/prefixed_object_client.go b/pkg/storage/chunk/client/prefixed_object_client.go index 899dcd2b21123..5a5bda7627708 100644 --- a/pkg/storage/chunk/client/prefixed_object_client.go +++ b/pkg/storage/chunk/client/prefixed_object_client.go @@ -23,6 +23,10 @@ func (p PrefixedObjectClient) ObjectExists(ctx context.Context, objectKey string return p.downstreamClient.ObjectExists(ctx, p.prefix+objectKey) } +func (p PrefixedObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { + return p.downstreamClient.ObjectExistsWithSize(ctx, p.prefix+objectKey) +} + func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { return p.downstreamClient.GetObject(ctx, p.prefix+objectKey) } diff --git a/pkg/storage/chunk/client/testutils/inmemory_storage_client.go b/pkg/storage/chunk/client/testutils/inmemory_storage_client.go index da65a35d53173..d937cd5fdfd4d 100644 --- a/pkg/storage/chunk/client/testutils/inmemory_storage_client.go +++ b/pkg/storage/chunk/client/testutils/inmemory_storage_client.go @@ -392,20 +392,25 @@ func (m *MockStorage) query(ctx context.Context, query index.Query, callback fun } // ObjectExists implments client.ObjectClient -func (m *InMemoryObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) { +func (m *InMemoryObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + exists, _, err := m.ObjectExistsWithSize(ctx, objectKey) + return exists, err +} + +func (m *InMemoryObjectClient) ObjectExistsWithSize(_ context.Context, objectKey string) (bool, int64, error) { m.mtx.RLock() defer m.mtx.RUnlock() if m.mode == MockStorageModeWriteOnly { - return false, errPermissionDenied + return false, 0, errPermissionDenied } _, ok := m.objects[objectKey] if !ok { - return false, nil + return false, 0, nil } - - return true, nil + objectSize := len(m.objects[objectKey]) + return true, int64(objectSize), nil } // GetObject implements client.ObjectClient. diff --git a/pkg/tool/audit/audit_test.go b/pkg/tool/audit/audit_test.go index c9afa7b34f357..d591fca7e1ddb 100644 --- a/pkg/tool/audit/audit_test.go +++ b/pkg/tool/audit/audit_test.go @@ -17,11 +17,16 @@ type testObjClient struct { client.ObjectClient } -func (t testObjClient) ObjectExists(_ context.Context, object string) (bool, error) { +func (t testObjClient) ObjectExistsWithSize(_ context.Context, object string) (bool, int64, error) { if strings.Contains(object, "missing") { - return false, nil + return false, 0, nil } - return true, nil + return true, 0, nil +} + +func (t testObjClient) ObjectExists(ctx context.Context, object string) (bool, error) { + exists, _, err := t.ObjectExistsWithSize(ctx, object) + return exists, err } type testCompactedIdx struct {