Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Transform ObjectExistsWithSize into GetAttributes #14329

Merged
merged 12 commits into from
Oct 2, 2024
6 changes: 3 additions & 3 deletions pkg/ingester-rf1/objstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ func (m *Multi) GetStoreFor(ts model.Time) (client.ObjectClient, error) {
return nil, fmt.Errorf("no store found for timestamp %s", ts)
}

func (m *Multi) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (m *Multi) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}
return s.ObjectExistsWithSize(ctx, objectKey)
return s.GetAttributes(ctx, objectKey)
}

func (m *Multi) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
Expand Down
22 changes: 16 additions & 6 deletions pkg/storage/chunk/client/alibaba/oss_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,24 @@ func (s *OssObjectClient) Stop() {
}

func (s *OssObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := s.objectAttributes(ctx, objectKey, "OSS.ObjectExists"); err != nil {
if s.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}

return true, nil
}

func (s *OssObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return s.objectAttributes(ctx, objectKey, "OSS.GetAttributes")
}

func (s *OssObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (s *OssObjectClient) objectAttributes(ctx context.Context, objectKey, operation string) (client.ObjectAttributes, error) {
var options []oss.Option
var objectSize int64
err := instrument.CollectedRequest(ctx, "OSS.ObjectExists", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
err := instrument.CollectedRequest(ctx, operation, ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
headers, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
if requestErr != nil {
return requestErr
Expand All @@ -90,10 +100,10 @@ func (s *OssObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey st
return nil
})
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, objectSize, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
Expand Down
29 changes: 17 additions & 12 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,20 +310,29 @@ func buckets(cfg S3Config) ([]string, error) {
func (a *S3ObjectClient) Stop() {}

func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := a.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := a.objectAttributes(ctx, objectKey, "S3.ObjectExists"); err != nil {
if a.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (a *S3ObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return a.objectAttributes(ctx, objectKey, "S3.GetAttributes")
}

func (a *S3ObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (a *S3ObjectClient) objectAttributes(ctx context.Context, objectKey, method string) (client.ObjectAttributes, error) {
var lastErr error
var objectSize int64

retries := backoff.New(ctx, a.cfg.BackoffConfig)
for retries.Ongoing() {
if ctx.Err() != nil {
return false, 0, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
return client.ObjectAttributes{}, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
}
lastErr = instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(_ context.Context) error {
lastErr = instrument.CollectedRequest(ctx, method, s3RequestDuration, instrument.ErrorCode, func(_ context.Context) error {
headObjectInput := &s3.HeadObjectInput{
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
Expand All @@ -338,21 +347,17 @@ func (a *S3ObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey str
return nil
})
if lastErr == nil {
return true, 0, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

if a.IsObjectNotFoundErr(lastErr) {
return false, 0, lastErr
return client.ObjectAttributes{}, lastErr
}

retries.Wait()
}

if lastErr != nil {
return false, 0, lastErr
}

return true, objectSize, nil
return client.ObjectAttributes{}, lastErr
}

// DeleteObject deletes the specified objectKey from the appropriate S3 bucket
Expand Down
41 changes: 38 additions & 3 deletions pkg/storage/chunk/client/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,36 @@ func (m *MockS3Client) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutp
return m.HeadObjectFunc(input)
}

func Test_GetAttributes(t *testing.T) {
mockS3 := &MockS3Client{
HeadObjectFunc: func(_ *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
var size int64 = 128
return &s3.HeadObjectOutput{ContentLength: &size}, nil
},
}

c, err := NewS3ObjectClient(S3Config{
AccessKeyID: "foo",
SecretAccessKey: flagext.SecretWithValue("bar"),
BackoffConfig: backoff.Config{MaxRetries: 3},
BucketNames: "foo",
Inject: func(_ http.RoundTripper) http.RoundTripper {
return RoundTripperFunc(func(_ *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte("object content"))),
}, nil
})
},
}, hedging.Config{})
require.NoError(t, err)
c.S3 = mockS3

attrs, err := c.GetAttributes(context.Background(), "abc")
require.NoError(t, err)
require.EqualValues(t, 128, attrs.Size)
}

func Test_RetryLogic(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down Expand Up @@ -339,7 +369,7 @@ func Test_RetryLogic(t *testing.T) {
3,
true,
func(c *S3ObjectClient) error {
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
_, err := c.ObjectExists(context.Background(), "foo")
return err
},
},
Expand All @@ -348,7 +378,12 @@ func Test_RetryLogic(t *testing.T) {
3,
false,
func(c *S3ObjectClient) error {
_, err := c.ObjectExists(context.Background(), "foo")
exists, err := c.ObjectExists(context.Background(), "foo")
if err == nil && !exists {
return awserr.NewRequestFailure(
awserr.New("NotFound", "Not Found", nil), 404, "abc",
)
}
return err
},
},
Expand All @@ -357,7 +392,7 @@ func Test_RetryLogic(t *testing.T) {
3,
false,
func(c *S3ObjectClient) error {
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
_, err := c.GetAttributes(context.Background(), "foo")
return err
},
},
Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,22 @@ func NewBlobStorage(cfg *BlobStorageConfig, metrics BlobStorageMetrics, hedgingC
func (b *BlobStorage) Stop() {}

func (b *BlobStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := b.objectAttributes(ctx, objectKey, "azure.ObjectExists"); err != nil {
if b.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (b *BlobStorage) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return b.objectAttributes(ctx, objectKey, "azure.GetAttributes")
}

func (b *BlobStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (b *BlobStorage) objectAttributes(ctx context.Context, objectKey, source string) (client.ObjectAttributes, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IIUC using a different source is just going to create a different metric? Does it make sense to separate calls to this function into separate metrics?
My suggestion would be to just drop the source parameter, if I'm understanding the code correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the idea is to allow disambiguation between GetAttributes and ObjectExists where it is possible to do it. Loki isn't using GetAttributes so this shouldn't cause any problems but if you're using Loki's API for doing something, having this separation will help your application separating what is leading the object storage requests.

var objectSize int64
err := loki_instrument.TimeRequest(ctx, "azure.ObjectExists", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
err := loki_instrument.TimeRequest(ctx, source, instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {
return err
Expand All @@ -245,10 +254,10 @@ func (b *BlobStorage) ObjectExistsWithSize(ctx context.Context, objectKey string
return nil
})
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, objectSize, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

// GetObject returns a reader and the size for the specified object key.
Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/chunk/client/baidubce/bos_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,22 @@ func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, obje
}

func (b *BOSObjectStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := b.objectAttributes(ctx, objectKey, "BOS.ObjectExists"); err != nil {
if b.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (b *BOSObjectStorage) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return b.objectAttributes(ctx, objectKey, "BOS.GetAttributes")
}

func (b *BOSObjectStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (b *BOSObjectStorage) objectAttributes(ctx context.Context, objectKey, source string) (client.ObjectAttributes, error) {
var objectSize int64
err := instrument.CollectedRequest(ctx, "BOS.ObjectExists", bosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
err := instrument.CollectedRequest(ctx, source, bosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
metaResult, requestErr := b.client.GetObjectMeta(b.cfg.BucketName, objectKey)
if requestErr != nil {
return requestErr
Expand All @@ -108,10 +117,10 @@ func (b *BOSObjectStorage) ObjectExistsWithSize(ctx context.Context, objectKey s
return nil
})
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, objectSize, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/chunk/client/congestion/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (a *AIMDController) ObjectExists(ctx context.Context, objectKey string) (bo
return a.inner.ObjectExists(ctx, objectKey)
}

func (a *AIMDController) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
return a.inner.ObjectExistsWithSize(ctx, objectKey)
func (a *AIMDController) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return a.inner.GetAttributes(ctx, objectKey)
}

func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error {
Expand Down Expand Up @@ -216,8 +216,8 @@ func NewNoopController(Config) *NoopController {
return &NoopController{}
}

func (n *NoopController) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
return true, 0, nil
func (n *NoopController) GetAttributes(context.Context, string) (client.ObjectAttributes, error) {
return client.ObjectAttributes{}, nil
}
func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil }
func (n *NoopController) PutObject(context.Context, string, io.Reader) error { return nil }
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/client/congestion/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (m *mockObjectClient) ObjectExists(context.Context, string) (bool, error) {
panic("not implemented")
}

func (m *mockObjectClient) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
func (m *mockObjectClient) GetAttributes(context.Context, string) (client.ObjectAttributes, error) {
panic("not implemented")
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/storage/chunk/client/gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,25 @@ func (s *GCSObjectClient) Stop() {
}

func (s *GCSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := s.GetAttributes(ctx, objectKey); err != nil {
if s.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (s *GCSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (s *GCSObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
attrs, err := s.getsBuckets.Object(objectKey).Attrs(ctx)
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

if attrs != nil {
return true, attrs.Size, nil
return client.ObjectAttributes{Size: attrs.Size}, nil
}
return true, 0, nil
return client.ObjectAttributes{}, nil
}

// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.
Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/chunk/client/ibmcloud/cos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,23 @@ func (c *COSObjectClient) DeleteObject(ctx context.Context, objectKey string) er
}

func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := c.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := c.objectAttributes(ctx, objectKey, "COS.ObjectExists"); err != nil {
if c.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (c *COSObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return c.objectAttributes(ctx, objectKey, "COS.GetAttributes")
}

func (c *COSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
func (c *COSObjectClient) objectAttributes(ctx context.Context, objectKey, source string) (client.ObjectAttributes, error) {
bucket := c.bucketFromKey(objectKey)
var objectSize int64
err := instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
err := instrument.CollectedRequest(ctx, source, cosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
headOutput, requestErr := c.hedgedCOS.HeadObject(&cos.HeadObjectInput{
Bucket: ibm.String(bucket),
Key: ibm.String(objectKey),
Expand All @@ -341,10 +350,10 @@ func (c *COSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey st
return nil
})
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, objectSize, nil
return client.ObjectAttributes{Size: objectSize}, nil
}

// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/chunk/client/local/fs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,23 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
func (FSObjectClient) Stop() {}

func (f *FSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
exists, _, err := f.ObjectExistsWithSize(ctx, objectKey)
return exists, err
if _, err := f.GetAttributes(ctx, objectKey); err != nil {
if f.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (f *FSObjectClient) ObjectExistsWithSize(_ context.Context, objectKey string) (bool, int64, error) {
func (f *FSObjectClient) GetAttributes(_ context.Context, objectKey string) (client.ObjectAttributes, error) {
fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
fi, err := os.Lstat(fullPath)
if err != nil {
return false, 0, err
return client.ObjectAttributes{}, err
}

return true, fi.Size(), nil
return client.ObjectAttributes{Size: fi.Size()}, nil
}

// GetObject from the store
Expand Down
Loading
Loading