Skip to content

Commit

Permalink
feat: Introduce new ObjectExistsWithSize API to (#14268)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Introduce a new `ObjectExistsWithSize` API to our object storage interface.
This is the same as `ObjectExists` but with the object size as part of the return value. This is useful to compare the object size present.
  • Loading branch information
DylanGuedes authored Sep 25, 2024
1 parent ee6e1cf commit ac422b3
Show file tree
Hide file tree
Showing 17 changed files with 178 additions and 43 deletions.
8 changes: 8 additions & 0 deletions pkg/ingester-rf1/objstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (m *Multi) GetStoreFor(ts model.Time) (client.ObjectClient, error) {
return nil, fmt.Errorf("no store found for timestamp %s", ts)
}

func (m *Multi) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return false, 0, err
}
return s.ObjectExistsWithSize(ctx, objectKey)
}

func (m *Multi) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/storage/chunk/client/alibaba/oss_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,27 @@ func (s *OssObjectClient) Stop() {
}

func (s *OssObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (s *OssObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
var options []oss.Option
var objectSize int64
err := instrument.CollectedRequest(ctx, "OSS.ObjectExists", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
_, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
return requestErr
headers, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
if requestErr != nil {
return requestErr
}

objectSize, _ = strconv.ParseInt(headers.Get(oss.HTTPHeaderContentLength), 10, 64)
return nil
})
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, objectSize, nil
}

// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
Expand Down
26 changes: 19 additions & 7 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,37 +310,49 @@ func buckets(cfg S3Config) ([]string, error) {
func (a *S3ObjectClient) Stop() {}

func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := a.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (a *S3ObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
var lastErr error
var objectSize int64

retries := backoff.New(ctx, a.cfg.BackoffConfig)
for retries.Ongoing() {
if ctx.Err() != nil {
return false, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
return false, 0, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
}
lastErr = instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(_ context.Context) error {
headObjectInput := &s3.HeadObjectInput{
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
}
_, requestErr := a.S3.HeadObject(headObjectInput)
return requestErr
headOutput, requestErr := a.S3.HeadObject(headObjectInput)
if requestErr != nil {
return requestErr
}
if headOutput != nil && headOutput.ContentLength != nil {
objectSize = *headOutput.ContentLength
}
return nil
})
if lastErr == nil {
return true, nil
return true, 0, nil
}

if a.IsObjectNotFoundErr(lastErr) {
return false, lastErr
return false, 0, lastErr
}

retries.Wait()
}

if lastErr != nil {
return false, lastErr
return false, 0, lastErr
}

return true, nil
return true, objectSize, nil
}

// DeleteObject deletes the specified objectKey from the appropriate S3 bucket
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/chunk/client/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,15 @@ func Test_RetryLogic(t *testing.T) {
return err
},
},
{
"object exists with size with retries",
3,
true,
func(c *S3ObjectClient) error {
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
return err
},
},
{
"object doesn't exist with retries",
3,
Expand All @@ -343,6 +352,15 @@ func Test_RetryLogic(t *testing.T) {
return err
},
},
{
"object doesn't exist (with size) with retries",
3,
false,
func(c *S3ObjectClient) error {
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
return err
},
},
} {
t.Run(tc.name, func(t *testing.T) {
callCount := atomic.NewInt32(0)
Expand Down
23 changes: 19 additions & 4 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,35 @@ func NewBlobStorage(cfg *BlobStorageConfig, metrics BlobStorageMetrics, hedgingC
func (b *BlobStorage) Stop() {}

func (b *BlobStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (b *BlobStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
var objectSize int64
err := loki_instrument.TimeRequest(ctx, "azure.ObjectExists", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {
return err
}

_, err = blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
return err
response, err := blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
if err != nil {
return err
}
if response != nil {
rawResponse := response.Response()
if rawResponse != nil {
objectSize = rawResponse.ContentLength
}
}
return nil
})
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, objectSize, nil
}

// GetObject returns a reader and the size for the specified object key.
Expand Down
21 changes: 16 additions & 5 deletions pkg/storage/chunk/client/baidubce/bos_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,27 @@ func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, obje
}

func (b *BOSObjectStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (b *BOSObjectStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
var objectSize int64
err := instrument.CollectedRequest(ctx, "BOS.ObjectExists", bosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var requestErr error
_, requestErr = b.client.GetObjectMeta(b.cfg.BucketName, objectKey)
return requestErr
metaResult, requestErr := b.client.GetObjectMeta(b.cfg.BucketName, objectKey)
if requestErr != nil {
return requestErr
}
if metaResult != nil {
objectSize = metaResult.ContentLength
}
return nil
})
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, objectSize, nil
}

func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/chunk/client/congestion/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func (a *AIMDController) ObjectExists(ctx context.Context, objectKey string) (bo
return a.inner.ObjectExists(ctx, objectKey)
}

func (a *AIMDController) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
return a.inner.ObjectExistsWithSize(ctx, objectKey)
}

func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error {
return a.inner.DeleteObject(ctx, objectKey)
}
Expand Down Expand Up @@ -212,6 +216,9 @@ func NewNoopController(Config) *NoopController {
return &NoopController{}
}

func (n *NoopController) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
return true, 0, nil
}
func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil }
func (n *NoopController) PutObject(context.Context, string, io.Reader) error { return nil }
func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/client/congestion/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ func (m *mockObjectClient) ObjectExists(context.Context, string) (bool, error) {
panic("not implemented")
}

func (m *mockObjectClient) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
panic("not implemented")
}

func (m *mockObjectClient) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
panic("not implemented")
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/storage/chunk/client/gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,20 @@ func (s *GCSObjectClient) Stop() {
}

func (s *GCSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
_, err := s.getsBuckets.Object(objectKey).Attrs(ctx)
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (s *GCSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
attrs, err := s.getsBuckets.Object(objectKey).Attrs(ctx)
if err != nil {
return false, err
return false, 0, err
}

return true, nil
if attrs != nil {
return true, attrs.Size, nil
}
return true, 0, nil
}

// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.
Expand Down
21 changes: 16 additions & 5 deletions pkg/storage/chunk/client/ibmcloud/cos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,20 +320,31 @@ func (c *COSObjectClient) DeleteObject(ctx context.Context, objectKey string) er
}

func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := c.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (c *COSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
bucket := c.bucketFromKey(objectKey)
var objectSize int64
err := instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var requestErr error
_, requestErr = c.hedgedCOS.HeadObject(&cos.HeadObjectInput{
headOutput, requestErr := c.hedgedCOS.HeadObject(&cos.HeadObjectInput{
Bucket: ibm.String(bucket),
Key: ibm.String(objectKey),
})
return requestErr
if requestErr != nil {
return requestErr
}
if headOutput != nil && headOutput.ContentLength != nil {
objectSize = *headOutput.ContentLength
}
return nil
})
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, objectSize, nil
}

// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.
Expand Down
13 changes: 9 additions & 4 deletions pkg/storage/chunk/client/local/fs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
// Stop implements ObjectClient
func (FSObjectClient) Stop() {}

func (f *FSObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) {
func (f *FSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := f.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (f *FSObjectClient) ObjectExistsWithSize(_ context.Context, objectKey string) (bool, int64, error) {
fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
_, err := os.Lstat(fullPath)
fi, err := os.Lstat(fullPath)
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, fi.Size(), nil
}

// GetObject from the store
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/chunk/client/local/fs_object_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func TestFSObjectClient_List_and_ObjectExists(t *testing.T) {
ok, err := bucketClient.ObjectExists(context.Background(), "outer-file2")
require.NoError(t, err)
require.True(t, ok)

ok, objectSize, err := bucketClient.ObjectExistsWithSize(context.Background(), "outer-file2")
require.NoError(t, err)
require.True(t, ok)
require.EqualValues(t, len("outer-file2"), objectSize)
}

func TestFSObjectClient_DeleteObject(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/chunk/client/object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...)
type ObjectClient interface {
ObjectExists(ctx context.Context, objectKey string) (bool, error)
ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error)

PutObject(ctx context.Context, objectKey string, object io.Reader) error
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.
Expand Down
11 changes: 8 additions & 3 deletions pkg/storage/chunk/client/openstack/swift_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,17 @@ func (s *SwiftObjectClient) Stop() {
}

func (s *SwiftObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
_, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey)
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
}

func (s *SwiftObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
info, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey)
if err != nil {
return false, err
return false, 0, err
}

return true, nil
return true, info.Bytes, nil
}

// GetObject returns a reader and the size for the specified object key from the configured swift container.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/client/prefixed_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (p PrefixedObjectClient) ObjectExists(ctx context.Context, objectKey string
return p.downstreamClient.ObjectExists(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
return p.downstreamClient.ObjectExistsWithSize(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
return p.downstreamClient.GetObject(ctx, p.prefix+objectKey)
}
Expand Down
Loading

0 comments on commit ac422b3

Please sign in to comment.