Skip to content

Commit

Permalink
Merge branch 'main' into detected-labels-api-ii
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanualsi authored Apr 3, 2024
2 parents b23d630 + bf0dd9d commit c1760b0
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 0 deletions.
87 changes: 87 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"hash"
"io"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
Expand Down Expand Up @@ -417,3 +419,88 @@ func findPeriod(configs []config.PeriodConfig, ts model.Time) (config.DayTime, e
}
return config.DayTime{}, fmt.Errorf("can not find period for timestamp %d", ts)
}

type listOpResult struct {
ts time.Time
objects []client.StorageObject
prefixes []client.StorageCommonPrefix
}

type listOpCache map[string]listOpResult

type cachedListOpObjectClient struct {
client.ObjectClient
cache listOpCache
mtx sync.RWMutex
ttl, interval time.Duration
done chan struct{}
}

func newCachedListOpObjectClient(oc client.ObjectClient, ttl, interval time.Duration) *cachedListOpObjectClient {
client := &cachedListOpObjectClient{
ObjectClient: oc,
cache: make(listOpCache),
done: make(chan struct{}),
ttl: ttl,
interval: interval,
}

go func(c *cachedListOpObjectClient) {
ticker := time.NewTicker(c.interval)
defer ticker.Stop()

for {
select {
case <-c.done:
return
case <-ticker.C:
c.mtx.Lock()
for k := range c.cache {
if time.Since(c.cache[k].ts) > c.ttl {
delete(c.cache, k)
}
}
c.mtx.Unlock()
}
}
}(client)

return client
}

func (c *cachedListOpObjectClient) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
if delimiter != "" {
return nil, nil, fmt.Errorf("does not support LIST calls with delimiter: %s", delimiter)
}
c.mtx.RLock()
cached, found := c.cache[prefix]
c.mtx.RUnlock()
if found {
return cached.objects, cached.prefixes, nil
}

c.mtx.Lock()
defer c.mtx.Unlock()

objects, prefixes, err := c.ObjectClient.List(ctx, prefix, delimiter)
if err != nil {
return nil, nil, err
}

c.cache[prefix] = listOpResult{
ts: time.Now(),
objects: objects,
prefixes: prefixes,
}

return objects, prefixes, err
}

func (c *cachedListOpObjectClient) Stop() {
c.mtx.Lock()
defer c.mtx.Unlock()

close(c.done)
c.cache = nil
c.ObjectClient.Stop()
}
67 changes: 67 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"testing"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/testutils"
"github.com/grafana/loki/v3/pkg/storage/config"
)
Expand Down Expand Up @@ -341,3 +343,68 @@ func TestBloomClient_DeleteBlocks(t *testing.T) {
require.False(t, found)
})
}

type mockListClient struct {
client.ObjectClient
counter int
}

func (c *mockListClient) List(_ context.Context, prefix string, _ string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
c.counter++
objects := []client.StorageObject{
{Key: path.Join(path.Base(prefix), "object")},
}
prefixes := []client.StorageCommonPrefix{
client.StorageCommonPrefix(prefix),
}
return objects, prefixes, nil
}

func (c *mockListClient) Stop() {
}

func TestBloomClient_CachedListOpObjectClient(t *testing.T) {

t.Run("list call with delimiter returns error", func(t *testing.T) {
downstreamClient := &mockListClient{}
c := newCachedListOpObjectClient(downstreamClient, 100*time.Millisecond, 10*time.Millisecond)
t.Cleanup(c.Stop)

_, _, err := c.List(context.Background(), "prefix/", "/")
require.Error(t, err)
})

t.Run("list calls are cached by prefix", func(t *testing.T) {
downstreamClient := &mockListClient{}
c := newCachedListOpObjectClient(downstreamClient, 100*time.Millisecond, 10*time.Millisecond)
t.Cleanup(c.Stop)

// cache miss
res, _, err := c.List(context.Background(), "a/", "")
require.NoError(t, err)
require.Equal(t, 1, downstreamClient.counter)
require.Equal(t, []client.StorageObject{{Key: "a/object"}}, res)

// cache miss
res, _, err = c.List(context.Background(), "b/", "")
require.NoError(t, err)
require.Equal(t, 2, downstreamClient.counter)
require.Equal(t, []client.StorageObject{{Key: "b/object"}}, res)

// cache hit
res, _, err = c.List(context.Background(), "a/", "")
require.NoError(t, err)
require.Equal(t, 2, downstreamClient.counter)
require.Equal(t, []client.StorageObject{{Key: "a/object"}}, res)

// wait for >=ttl so items are expired
time.Sleep(150 * time.Millisecond)

// cache miss
res, _, err = c.List(context.Background(), "a/", "")
require.NoError(t, err)
require.Equal(t, 3, downstreamClient.counter)
require.Equal(t, []client.StorageObject{{Key: "a/object"}}, res)
})

}
7 changes: 7 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type Config struct {
DownloadParallelism int `yaml:"download_parallelism"`
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
MetasCache cache.Config `yaml:"metas_cache"`

// This will always be set to true when flags are registered.
// In tests, where config is created as literal, it can be set manually.
CacheListOps bool `yaml:"-"`
}

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -27,6 +31,9 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&c.DownloadParallelism, prefix+"download-parallelism", 16, "The amount of maximum concurrent bloom blocks downloads.")
c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour)
c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f)

// always cache LIST operations
c.CacheListOps = true
}

func (c *Config) Validate() error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"path"
"sort"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -294,6 +295,9 @@ func NewBloomStore(
return nil, errors.Wrapf(err, "creating object client for period %s", periodicConfig.From)
}

if storageConfig.BloomShipperConfig.CacheListOps {
objectClient = newCachedListOpObjectClient(objectClient, 5*time.Minute, 10*time.Second)
}
bloomClient, err := NewBloomClient(cfg, objectClient, logger)
if err != nil {
return nil, errors.Wrapf(err, "creating bloom client for period %s", periodicConfig.From)
Expand Down

0 comments on commit c1760b0

Please sign in to comment.