Skip to content

Commit

Permalink
Added support for BlobStorage using thanos/objstore
Browse files Browse the repository at this point in the history
  • Loading branch information
JoaoBraveCoding committed Nov 24, 2023
1 parent b93fbcd commit a259abd
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 0 deletions.
126 changes: 126 additions & 0 deletions pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package azure

import (
"context"
"io"
"strings"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/pkg/storage/bucket"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
)

type BlobStorageThanosObjectClient struct {
client objstore.Bucket
hedgedClient objstore.Bucket
}

// NewBlobStorageObjectClient makes a new BlobStorage-backed ObjectClient.
func NewBlobStorageThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config, reg prometheus.Registerer) (*BlobStorageThanosObjectClient, error) {
client, err := newBlobStorageThanosObjClient(ctx, cfg, component, logger, false, hedgingCfg, prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "false"}, reg))
if err != nil {
return nil, err
}
hedgedClient, err := newBlobStorageThanosObjClient(ctx, cfg, component, logger, true, hedgingCfg, prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "true"}, reg))
if err != nil {
return nil, err
}
return &BlobStorageThanosObjectClient{
client: client,
hedgedClient: hedgedClient,
}, nil
}

func newBlobStorageThanosObjClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config, reg prometheus.Registerer) (objstore.Bucket, error) {
if hedging {
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, reg)
if err != nil {
return nil, err
}

cfg.Azure.HTTP.Transport = hedgedTrasport
}

return bucket.NewClient(ctx, cfg, component, logger, reg)
}

// Stop fulfills the chunk.ObjectClient interface
func (s *BlobStorageThanosObjectClient) Stop() {}

// ObjectExists checks if a given objectKey exists in the AWS bucket
func (s *BlobStorageThanosObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return s.hedgedClient.Exists(ctx, objectKey)
}

// PutObject into the store
func (s *BlobStorageThanosObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return s.client.Upload(ctx, objectKey, object)
}

// DeleteObject deletes the specified objectKey from the appropriate BlobStorage bucket
func (s *BlobStorageThanosObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return s.client.Delete(ctx, objectKey)
}

// GetObject returns a reader and the size for the specified object key from the configured BlobStorage bucket.
func (s *BlobStorageThanosObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
reader, err := s.hedgedClient.Get(ctx, objectKey)
if err != nil {
return nil, 0, err
}

attr, err := s.hedgedClient.Attributes(ctx, objectKey)
if err != nil {
return nil, 0, errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}

return reader, attr.Size, err
}

// List implements chunk.ObjectClient.
func (s *BlobStorageThanosObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
var iterParams []objstore.IterOption

// If delimiter is empty we want to list all files
if delimiter == "" {
iterParams = append(iterParams, objstore.WithRecursiveIter)
}

s.client.Iter(ctx, prefix, func(objectKey string) error {
// CommonPrefixes are keys that have the prefix and have the delimiter
// as a suffix
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey))
return nil
}
attr, err := s.client.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}

storageObjects = append(storageObjects, client.StorageObject{
Key: objectKey,
ModifiedAt: attr.LastModified,
})

return nil

}, iterParams...)

return storageObjects, commonPrefixes, nil
}

// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
func (s *BlobStorageThanosObjectClient) IsObjectNotFoundErr(err error) bool {
return s.client.IsObjNotFoundErr(err)
}

// TODO(dannyk): implement for client
func (s *BlobStorageThanosObjectClient) IsRetryableErr(error) bool { return false }
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package azure

import (
"bytes"
"context"
"sort"
"testing"

"github.com/grafana/loki/pkg/storage/bucket/filesystem"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/stretchr/testify/require"
)

func TestAzureThanosObjStore_List(t *testing.T) {
tests := []struct {
name string
prefix string
delimiter string
storageObjKeys []string
storageCommonPref []client.StorageCommonPrefix
wantErr error
}{
{
"list_top_level_only",
"",
"/",
[]string{"top-level-file-1", "top-level-file-2"},
[]client.StorageCommonPrefix{"dir-1/", "dir-2/", "depply/"},
nil,
},
{
"list_all_dir_1",
"dir-1",
"",
[]string{"dir-1/file-1", "dir-1/file-2"},
nil,
nil,
},
{
"list_recursive",
"",
"",
[]string{
"top-level-file-1",
"top-level-file-2",
"dir-1/file-1",
"dir-1/file-2",
"dir-2/file-3",
"dir-2/file-4",
"dir-2/file-5",
"depply/nested/folder/a",
"depply/nested/folder/b",
"depply/nested/folder/c",
},
nil,
nil,
},
{
"unknown_prefix",
"test",
"",
[]string{},
nil,
nil,
},
{
"only_storage_common_prefix",
"depply/",
"/",
[]string{},
[]client.StorageCommonPrefix{
"depply/nested/",
},
nil,
},
}

for _, tt := range tests {
config := filesystem.Config{
Directory: t.TempDir(),
}
newBucket, err := filesystem.NewBucketClient(config)
require.NoError(t, err)

buff := bytes.NewBufferString("foo")
require.NoError(t, newBucket.Upload(context.Background(), "top-level-file-1", buff))
require.NoError(t, newBucket.Upload(context.Background(), "top-level-file-2", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-1/file-1", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-1/file-2", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-3", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-4", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-5", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/a", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))

gcpClient := &BlobStorageThanosObjectClient{}
gcpClient.client = newBucket

storageObj, storageCommonPref, err := gcpClient.List(context.Background(), tt.prefix, tt.delimiter)
if tt.wantErr != nil {
require.Equal(t, tt.wantErr.Error(), err.Error())
continue
}

keys := []string{}
for _, key := range storageObj {
keys = append(keys, key.Key)
}

sort.Slice(tt.storageObjKeys, func(i, j int) bool {
return tt.storageObjKeys[i] < tt.storageObjKeys[j]
})
sort.Slice(tt.storageCommonPref, func(i, j int) bool {
return tt.storageCommonPref[i] < tt.storageCommonPref[j]
})

require.NoError(t, err)
require.Equal(t, tt.storageObjKeys, keys)
require.Equal(t, tt.storageCommonPref, storageCommonPref)
}
}
4 changes: 4 additions & 0 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/indexgateway"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
util_log "github.com/grafana/loki/pkg/util/log"
)

var (
Expand Down Expand Up @@ -732,6 +733,9 @@ func internalNewObjectClient(component, name string, cfg Config, clientMetrics C
}
azureCfg = (azure.BlobStorageConfig)(nsCfg)
}
if cfg.ThanosObjStore {
azure.NewBlobStorageThanosObjectClient(context.Background(), cfg.ObjStoreConf, component, util_log.Logger, cfg.Hedging, reg)
}
return azure.NewBlobStorage(&azureCfg, clientMetrics.AzureMetrics, cfg.Hedging)

case config.StorageTypeSwift:
Expand Down

0 comments on commit a259abd

Please sign in to comment.