From bf0dd9dda639580a9fcc8e570c3ab654340eae1d Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 3 Apr 2024 08:49:17 +0200 Subject: [PATCH] perf(bloomstore): Cache metas LIST operation (#12414) Reduce the amount of class A list operations on object storage. Signed-off-by: Christian Haudum --- .../stores/shipper/bloomshipper/client.go | 87 +++++++++++++++++++ .../shipper/bloomshipper/client_test.go | 67 ++++++++++++++ .../shipper/bloomshipper/config/config.go | 7 ++ .../stores/shipper/bloomshipper/store.go | 4 + 4 files changed, 165 insertions(+) diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 4ac0c24732b69..488815b835142 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -8,6 +8,8 @@ import ( "hash" "io" "strings" + "sync" + "time" "github.com/go-kit/log" "github.com/grafana/dskit/concurrency" @@ -417,3 +419,88 @@ func findPeriod(configs []config.PeriodConfig, ts model.Time) (config.DayTime, e } return config.DayTime{}, fmt.Errorf("can not find period for timestamp %d", ts) } + +type listOpResult struct { + ts time.Time + objects []client.StorageObject + prefixes []client.StorageCommonPrefix +} + +type listOpCache map[string]listOpResult + +type cachedListOpObjectClient struct { + client.ObjectClient + cache listOpCache + mtx sync.RWMutex + ttl, interval time.Duration + done chan struct{} +} + +func newCachedListOpObjectClient(oc client.ObjectClient, ttl, interval time.Duration) *cachedListOpObjectClient { + client := &cachedListOpObjectClient{ + ObjectClient: oc, + cache: make(listOpCache), + done: make(chan struct{}), + ttl: ttl, + interval: interval, + } + + go func(c *cachedListOpObjectClient) { + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + + for { + select { + case <-c.done: + return + case <-ticker.C: + c.mtx.Lock() + for k := range c.cache { + if time.Since(c.cache[k].ts) > c.ttl { + delete(c.cache, k) + } + } + c.mtx.Unlock() + } + } + }(client) + + return client +} + +func (c *cachedListOpObjectClient) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { + if delimiter != "" { + return nil, nil, fmt.Errorf("does not support LIST calls with delimiter: %s", delimiter) + } + c.mtx.RLock() + cached, found := c.cache[prefix] + c.mtx.RUnlock() + if found { + return cached.objects, cached.prefixes, nil + } + + c.mtx.Lock() + defer c.mtx.Unlock() + + objects, prefixes, err := c.ObjectClient.List(ctx, prefix, delimiter) + if err != nil { + return nil, nil, err + } + + c.cache[prefix] = listOpResult{ + ts: time.Now(), + objects: objects, + prefixes: prefixes, + } + + return objects, prefixes, err +} + +func (c *cachedListOpObjectClient) Stop() { + c.mtx.Lock() + defer c.mtx.Unlock() + + close(c.done) + c.cache = nil + c.ObjectClient.Stop() +} diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 9bfd3d1674f66..cd77339c0932c 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "os" + "path" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/stretchr/testify/require" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/testutils" "github.com/grafana/loki/v3/pkg/storage/config" ) @@ -341,3 +343,68 @@ func TestBloomClient_DeleteBlocks(t *testing.T) { require.False(t, found) }) } + +type mockListClient struct { + client.ObjectClient + counter int +} + +func (c *mockListClient) List(_ context.Context, prefix string, _ string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { + c.counter++ + objects := []client.StorageObject{ + {Key: path.Join(path.Base(prefix), "object")}, + } + prefixes := []client.StorageCommonPrefix{ + client.StorageCommonPrefix(prefix), + } + return objects, prefixes, nil +} + +func (c *mockListClient) Stop() { +} + +func TestBloomClient_CachedListOpObjectClient(t *testing.T) { + + t.Run("list call with delimiter returns error", func(t *testing.T) { + downstreamClient := &mockListClient{} + c := newCachedListOpObjectClient(downstreamClient, 100*time.Millisecond, 10*time.Millisecond) + t.Cleanup(c.Stop) + + _, _, err := c.List(context.Background(), "prefix/", "/") + require.Error(t, err) + }) + + t.Run("list calls are cached by prefix", func(t *testing.T) { + downstreamClient := &mockListClient{} + c := newCachedListOpObjectClient(downstreamClient, 100*time.Millisecond, 10*time.Millisecond) + t.Cleanup(c.Stop) + + // cache miss + res, _, err := c.List(context.Background(), "a/", "") + require.NoError(t, err) + require.Equal(t, 1, downstreamClient.counter) + require.Equal(t, []client.StorageObject{{Key: "a/object"}}, res) + + // cache miss + res, _, err = c.List(context.Background(), "b/", "") + require.NoError(t, err) + require.Equal(t, 2, downstreamClient.counter) + require.Equal(t, []client.StorageObject{{Key: "b/object"}}, res) + + // cache hit + res, _, err = c.List(context.Background(), "a/", "") + require.NoError(t, err) + require.Equal(t, 2, downstreamClient.counter) + require.Equal(t, []client.StorageObject{{Key: "a/object"}}, res) + + // wait for >=ttl so items are expired + time.Sleep(150 * time.Millisecond) + + // cache miss + res, _, err = c.List(context.Background(), "a/", "") + require.NoError(t, err) + require.Equal(t, 3, downstreamClient.counter) + require.Equal(t, []client.StorageObject{{Key: "a/object"}}, res) + }) + +} diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index de1ad3a12034c..be4d96c765e1c 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -17,6 +17,10 @@ type Config struct { DownloadParallelism int `yaml:"download_parallelism"` BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` MetasCache cache.Config `yaml:"metas_cache"` + + // This will always be set to true when flags are registered. + // In tests, where config is created as literal, it can be set manually. + CacheListOps bool `yaml:"-"` } func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -27,6 +31,9 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&c.DownloadParallelism, prefix+"download-parallelism", 16, "The amount of maximum concurrent bloom blocks downloads.") c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour) c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f) + + // always cache LIST operations + c.CacheListOps = true } func (c *Config) Validate() error { diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index 42e9b66eae6ad..5e1363d0cb731 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -5,6 +5,7 @@ import ( "fmt" "path" "sort" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -294,6 +295,9 @@ func NewBloomStore( return nil, errors.Wrapf(err, "creating object client for period %s", periodicConfig.From) } + if storageConfig.BloomShipperConfig.CacheListOps { + objectClient = newCachedListOpObjectClient(objectClient, 5*time.Minute, 10*time.Second) + } bloomClient, err := NewBloomClient(cfg, objectClient, logger) if err != nil { return nil, errors.Wrapf(err, "creating bloom client for period %s", periodicConfig.From)