Skip to content

Commit

Permalink
fix: Transform ObjectExistsWithSize into GetAttributes (#14329)
Browse files Browse the repository at this point in the history
- Transform our `ObjectExistsWithSize` API into `GetAttributes` to conform with the existing Thanos API.
- Fix we not returning the size when we should on our s3 implementation of `ObjectSize`
- Fix not returning `false, nil` to `ObjectExists` when the only error seen is a `NotFound`. If someone calls `ObjectExists` it should return `false, nil` to a `NotExist` error, instead of `false, err`
  • Loading branch information
DylanGuedes authored Oct 2, 2024
1 parent c843288 commit 2f56f50
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 81 deletions.
6 changes: 3 additions & 3 deletions pkg/ingester-rf1/objstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ func (m *Multi) GetStoreFor(ts model.Time) (client.ObjectClient, error) {
return nil, fmt.Errorf("no store found for timestamp %s", ts)
}

func (m *Multi) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (m *Multi) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}
return s.ObjectExistsWithSize(ctx, objectKey)
return s.GetAttributes(ctx, objectKey)
}

func (m *Multi) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
Expand Down
22 changes: 16 additions & 6 deletions pkg/storage/chunk/client/alibaba/oss_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,24 @@ func (s *OssObjectClient) Stop() {
}

func (s *OssObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := s.objectAttributes(ctx, objectKey, "OSS.ObjectExists"); err != nil {
if s.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}

return true, nil
}

func (s *OssObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return s.objectAttributes(ctx, objectKey, "OSS.GetAttributes")
}

func (s *OssObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (s *OssObjectClient) objectAttributes(ctx context.Context, objectKey, operation string) (client.ObjectAttributes, error) {
var options []oss.Option
var objectSize int64
err := instrument.CollectedRequest(ctx, "OSS.ObjectExists", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
err := instrument.CollectedRequest(ctx, operation, ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
headers, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
if requestErr != nil {
return requestErr
Expand All @@ -90,10 +100,10 @@ func (s *OssObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey st
return nil
})
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, objectSize, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
Expand Down
29 changes: 17 additions & 12 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,20 +310,29 @@ func buckets(cfg S3Config) ([]string, error) {
func (a *S3ObjectClient) Stop() {}

func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := a.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := a.objectAttributes(ctx, objectKey, "S3.ObjectExists"); err != nil {
if a.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (a *S3ObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return a.objectAttributes(ctx, objectKey, "S3.GetAttributes")
}

func (a *S3ObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (a *S3ObjectClient) objectAttributes(ctx context.Context, objectKey, method string) (client.ObjectAttributes, error) {
var lastErr error
var objectSize int64

retries := backoff.New(ctx, a.cfg.BackoffConfig)
for retries.Ongoing() {
if ctx.Err() != nil {
return false, 0, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
return client.ObjectAttributes{}, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
}
lastErr = instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(_ context.Context) error {
lastErr = instrument.CollectedRequest(ctx, method, s3RequestDuration, instrument.ErrorCode, func(_ context.Context) error {
headObjectInput := &s3.HeadObjectInput{
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
Expand All @@ -338,21 +347,17 @@ func (a *S3ObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey str
return nil
})
if lastErr == nil {
return true, 0, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

if a.IsObjectNotFoundErr(lastErr) {
return false, 0, lastErr
return client.ObjectAttributes{}, lastErr
}

retries.Wait()
}

if lastErr != nil {
return false, 0, lastErr
}

return true, objectSize, nil
return client.ObjectAttributes{}, lastErr
}

// DeleteObject deletes the specified objectKey from the appropriate S3 bucket
Expand Down
41 changes: 38 additions & 3 deletions pkg/storage/chunk/client/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,36 @@ func (m *MockS3Client) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutp
return m.HeadObjectFunc(input)
}

func Test_GetAttributes(t *testing.T) {
mockS3 := &MockS3Client{
HeadObjectFunc: func(_ *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
var size int64 = 128
return &s3.HeadObjectOutput{ContentLength: &size}, nil
},
}

c, err := NewS3ObjectClient(S3Config{
AccessKeyID: "foo",
SecretAccessKey: flagext.SecretWithValue("bar"),
BackoffConfig: backoff.Config{MaxRetries: 3},
BucketNames: "foo",
Inject: func(_ http.RoundTripper) http.RoundTripper {
return RoundTripperFunc(func(_ *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte("object content"))),
}, nil
})
},
}, hedging.Config{})
require.NoError(t, err)
c.S3 = mockS3

attrs, err := c.GetAttributes(context.Background(), "abc")
require.NoError(t, err)
require.EqualValues(t, 128, attrs.Size)
}

func Test_RetryLogic(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down Expand Up @@ -339,7 +369,7 @@ func Test_RetryLogic(t *testing.T) {
3,
true,
func(c *S3ObjectClient) error {
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
_, err := c.ObjectExists(context.Background(), "foo")
return err
},
},
Expand All @@ -348,7 +378,12 @@ func Test_RetryLogic(t *testing.T) {
3,
false,
func(c *S3ObjectClient) error {
_, err := c.ObjectExists(context.Background(), "foo")
exists, err := c.ObjectExists(context.Background(), "foo")
if err == nil && !exists {
return awserr.NewRequestFailure(
awserr.New("NotFound", "Not Found", nil), 404, "abc",
)
}
return err
},
},
Expand All @@ -357,7 +392,7 @@ func Test_RetryLogic(t *testing.T) {
3,
false,
func(c *S3ObjectClient) error {
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
_, err := c.GetAttributes(context.Background(), "foo")
return err
},
},
Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,22 @@ func NewBlobStorage(cfg *BlobStorageConfig, metrics BlobStorageMetrics, hedgingC
func (b *BlobStorage) Stop() {}

func (b *BlobStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := b.objectAttributes(ctx, objectKey, "azure.ObjectExists"); err != nil {
if b.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (b *BlobStorage) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return b.objectAttributes(ctx, objectKey, "azure.GetAttributes")
}

func (b *BlobStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (b *BlobStorage) objectAttributes(ctx context.Context, objectKey, source string) (client.ObjectAttributes, error) {
var objectSize int64
err := loki_instrument.TimeRequest(ctx, "azure.ObjectExists", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
err := loki_instrument.TimeRequest(ctx, source, instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {
return err
Expand All @@ -245,10 +254,10 @@ func (b *BlobStorage) ObjectExistsWithSize(ctx context.Context, objectKey string
return nil
})
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, objectSize, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

// GetObject returns a reader and the size for the specified object key.
Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/chunk/client/baidubce/bos_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,22 @@ func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, obje
}

func (b *BOSObjectStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := b.objectAttributes(ctx, objectKey, "BOS.ObjectExists"); err != nil {
if b.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (b *BOSObjectStorage) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return b.objectAttributes(ctx, objectKey, "BOS.GetAttributes")
}

func (b *BOSObjectStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (b *BOSObjectStorage) objectAttributes(ctx context.Context, objectKey, source string) (client.ObjectAttributes, error) {
var objectSize int64
err := instrument.CollectedRequest(ctx, "BOS.ObjectExists", bosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
err := instrument.CollectedRequest(ctx, source, bosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
metaResult, requestErr := b.client.GetObjectMeta(b.cfg.BucketName, objectKey)
if requestErr != nil {
return requestErr
Expand All @@ -108,10 +117,10 @@ func (b *BOSObjectStorage) ObjectExistsWithSize(ctx context.Context, objectKey s
return nil
})
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, objectSize, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/chunk/client/congestion/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (a *AIMDController) ObjectExists(ctx context.Context, objectKey string) (bo
return a.inner.ObjectExists(ctx, objectKey)
}

func (a *AIMDController) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
return a.inner.ObjectExistsWithSize(ctx, objectKey)
func (a *AIMDController) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return a.inner.GetAttributes(ctx, objectKey)
}

func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error {
Expand Down Expand Up @@ -216,8 +216,8 @@ func NewNoopController(Config) *NoopController {
return &NoopController{}
}

func (n *NoopController) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
return true, 0, nil
func (n *NoopController) GetAttributes(context.Context, string) (client.ObjectAttributes, error) {
return client.ObjectAttributes{}, nil
}
func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil }
func (n *NoopController) PutObject(context.Context, string, io.Reader) error { return nil }
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/client/congestion/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (m *mockObjectClient) ObjectExists(context.Context, string) (bool, error) {
panic("not implemented")
}

func (m *mockObjectClient) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
func (m *mockObjectClient) GetAttributes(context.Context, string) (client.ObjectAttributes, error) {
panic("not implemented")
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/storage/chunk/client/gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,25 @@ func (s *GCSObjectClient) Stop() {
}

func (s *GCSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := s.GetAttributes(ctx, objectKey); err != nil {
if s.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (s *GCSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (s *GCSObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
attrs, err := s.getsBuckets.Object(objectKey).Attrs(ctx)
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

if attrs != nil {
return true, attrs.Size, nil
return client.ObjectAttributes{Size: attrs.Size}, nil
}
return true, 0, nil
return client.ObjectAttributes{}, nil
}

// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.
Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/chunk/client/ibmcloud/cos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,23 @@ func (c *COSObjectClient) DeleteObject(ctx context.Context, objectKey string) er
}

func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := c.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := c.objectAttributes(ctx, objectKey, "COS.ObjectExists"); err != nil {
if c.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (c *COSObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return c.objectAttributes(ctx, objectKey, "COS.GetAttributes")
}

func (c *COSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (c *COSObjectClient) objectAttributes(ctx context.Context, objectKey, source string) (client.ObjectAttributes, error) {
bucket := c.bucketFromKey(objectKey)
var objectSize int64
err := instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
err := instrument.CollectedRequest(ctx, source, cosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
headOutput, requestErr := c.hedgedCOS.HeadObject(&cos.HeadObjectInput{
Bucket: ibm.String(bucket),
Key: ibm.String(objectKey),
Expand All @@ -341,10 +350,10 @@ func (c *COSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey st
return nil
})
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, objectSize, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/chunk/client/local/fs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,23 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
func (FSObjectClient) Stop() {}

func (f *FSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := f.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := f.GetAttributes(ctx, objectKey); err != nil {
if f.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (f *FSObjectClient) ObjectExistsWithSize(_ context.Context, objectKey string) (bool, int64, error) {
func (f *FSObjectClient) GetAttributes(_ context.Context, objectKey string) (client.ObjectAttributes, error) {
fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
fi, err := os.Lstat(fullPath)
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, fi.Size(), nil
return client.ObjectAttributes{Size: fi.Size()}, nil
}

// GetObject from the store
Expand Down
Loading

0 comments on commit 2f56f50

Please sign in to comment.