From 0f48ce8f1817903fe5de385cbaf004d44005bbd7 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 20 Mar 2024 16:40:56 +0100 Subject: [PATCH] feat(bloomstore): Introduce specialised cache for blocks (#12257) The blocks cache is an in-memory cache that represents the single source of truth of what blocks are available on the file system and manages their end-of-life behaviour. There is a follow-up branch to wire up the new cache, based on this branch. Signed-off-by: Christian Haudum --- .../shipper/bloomshipper/blockscache.go | 503 ++++++++++++++++++ .../shipper/bloomshipper/blockscache_test.go | 413 ++++++++++++++ 2 files changed, 916 insertions(+) create mode 100644 pkg/storage/stores/shipper/bloomshipper/blockscache.go create mode 100644 pkg/storage/stores/shipper/bloomshipper/blockscache_test.go diff --git a/pkg/storage/stores/shipper/bloomshipper/blockscache.go b/pkg/storage/stores/shipper/bloomshipper/blockscache.go new file mode 100644 index 0000000000000..29274a624f47c --- /dev/null +++ b/pkg/storage/stores/shipper/bloomshipper/blockscache.go @@ -0,0 +1,503 @@ +package bloomshipper + +import ( + "container/list" + "context" + "flag" + "fmt" + "os" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/flagext" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/atomic" + + "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/constants" +) + +const ( + defaultPurgeInterval = 1 * time.Minute + + // eviction reasons + reasonExpired = "expired" + reasonFull = "full" + + // errors when putting entries + errAlreadyExists = "entry already exists: %s" + errTooBig = "entry exceeds hard limit: %s" +) + +type Cache interface { + Put(ctx context.Context, key string, value BlockDirectory) error + PutInc(ctx context.Context, key string, value BlockDirectory) error + PutMany(ctx context.Context, keys []string, values []BlockDirectory) error + Get(ctx context.Context, key string) (BlockDirectory, bool) + Release(ctx context.Context, key string) error + Stop() +} + +type blocksCacheMetrics struct { + entriesAdded prometheus.Counter + entriesEvicted *prometheus.CounterVec + entriesFetched *prometheus.CounterVec + entriesCurrent prometheus.Gauge + usageBytes prometheus.Gauge + + // collecting hits/misses for every Get() is a performance killer + // instead, increment a counter and collect them in an interval + hits *atomic.Int64 + misses *atomic.Int64 +} + +func newBlocksCacheMetrics(reg prometheus.Registerer, namespace, subsystem string) *blocksCacheMetrics { + return &blocksCacheMetrics{ + entriesAdded: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "added_total", + Help: "The total number of entries added to the cache", + }), + entriesEvicted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "evicted_total", + Help: "The total number of entries evicted from the cache", + }, []string{"reason"}), + entriesFetched: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "fetched_total", + Help: "Total number of entries fetched from cache, grouped by status", + }, []string{"status"}), + entriesCurrent: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "entries", + Help: "Current number of entries in the cache", + }), + usageBytes: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "usage_bytes", + Help: "The current size of entries managed by the cache in bytes", + }), + + hits: atomic.NewInt64(0), + misses: atomic.NewInt64(0), + } +} + +func (m *blocksCacheMetrics) Collect() { + m.entriesFetched.WithLabelValues("hit").Add(float64(m.hits.Swap(0))) + m.entriesFetched.WithLabelValues("miss").Add(float64(m.misses.Swap(0))) +} + +// compiler check to enforce implementation of the Cache interface +var _ Cache = &BlocksCache{} + +// BlocksCache is an in-memory cache that manages block directories on the filesystem. +type BlocksCache struct { + cfg BlocksCacheConfig + metrics *blocksCacheMetrics + logger log.Logger + + lock sync.RWMutex // lock for cache entries + entries map[string]*list.Element + lru *list.List + + done chan struct{} + triggerEviction chan struct{} + + currSizeBytes int64 +} + +type Entry struct { + Key string + Value BlockDirectory + + created time.Time + + refCount *atomic.Int32 +} + +// BlocksCacheConfig represents in-process embedded cache config. +type BlocksCacheConfig struct { + Enabled bool `yaml:"enabled,omitempty"` + SoftLimit flagext.Bytes `yaml:"soft_limit"` + HardLimit flagext.Bytes `yaml:"hard_limit"` + TTL time.Duration `yaml:"ttl"` + + // PurgeInterval tell how often should we remove keys that are expired. + // by default it takes `defaultPurgeInterval` + PurgeInterval time.Duration `yaml:"-"` +} + +func (cfg *BlocksCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefixAndDefaults(prefix, description, f, time.Hour) +} + +func (cfg *BlocksCacheConfig) RegisterFlagsWithPrefixAndDefaults(prefix, description string, f *flag.FlagSet, defaultTTL time.Duration) { + f.BoolVar(&cfg.Enabled, prefix+"enabled", false, description+"Whether blocks cache is enabled.") + f.Var(&cfg.SoftLimit, prefix+"soft-limit", description+"Soft limit of the cache in bytes. Exceeding this limit will trigger evictions of least recently used items in the background.") + _ = cfg.SoftLimit.Set("32GiB") + f.Var(&cfg.HardLimit, prefix+"hard-limit", description+"Hard limit of the cache in bytes. Exceeding this limit will block execution until soft limit is deceeded.") + _ = cfg.HardLimit.Set("64GiB") + f.DurationVar(&cfg.TTL, prefix+"ttl", defaultTTL, description+"The time to live for items in the cache before they get purged.") +} + +func (cfg *BlocksCacheConfig) IsEnabled() bool { + return cfg.Enabled +} + +func (cfg *BlocksCacheConfig) Validate() error { + if !cfg.Enabled { + return nil + } + if cfg.TTL == 0 { + return errors.New("blocks cache ttl must not be 0") + } + if cfg.SoftLimit == 0 { + return errors.New("blocks cache soft_limit must not be 0") + } + if cfg.SoftLimit > cfg.HardLimit { + return errors.New("blocks cache soft_limit must not be greater than hard_limit") + } + return nil +} + +// NewFsBlocksCache returns a new file-system mapping cache for bloom blocks, +// where entries map block directories on disk. +func NewFsBlocksCache(cfg BlocksCacheConfig, reg prometheus.Registerer, logger log.Logger) *BlocksCache { + cache := &BlocksCache{ + cfg: cfg, + logger: logger, + metrics: newBlocksCacheMetrics(reg, constants.Loki, "bloom_blocks_cache"), + entries: make(map[string]*list.Element), + lru: list.New(), + + done: make(chan struct{}), + triggerEviction: make(chan struct{}, 1), + } + + // Set a default interval for the ticker + // This can be overwritten to a smaller value in tests + if cfg.PurgeInterval == 0 { + cfg.PurgeInterval = defaultPurgeInterval + } + + go cache.runTTLEvictJob(cfg.PurgeInterval, cfg.TTL) + go cache.runLRUevictJob() + go cache.runMetricsCollectJob(5 * time.Second) + + return cache +} + +// Put implements Cache. +// It stores a value with given key. +func (c *BlocksCache) Put(ctx context.Context, key string, value BlockDirectory) error { + if ctx.Err() != nil { + return ctx.Err() + } + + c.lock.Lock() + defer c.lock.Unlock() + _, err := c.put(key, value) + return err +} + +// PutInc implements Cache. +// It stores a value with given key and increments the ref counter on that item. +func (c *BlocksCache) PutInc(ctx context.Context, key string, value BlockDirectory) error { + if ctx.Err() != nil { + return ctx.Err() + } + + c.lock.Lock() + defer c.lock.Unlock() + + entry, err := c.put(key, value) + if err != nil { + return err + } + + entry.refCount.Inc() + return nil +} + +// PutMany implements Cache. +func (c *BlocksCache) PutMany(ctx context.Context, keys []string, values []BlockDirectory) error { + if ctx.Err() != nil { + return ctx.Err() + } + + c.lock.Lock() + defer c.lock.Unlock() + + var err util.MultiError + for i := range keys { + if _, e := c.put(keys[i], values[i]); e != nil { + err.Add(e) + } + } + return err.Err() +} + +func (c *BlocksCache) put(key string, value BlockDirectory) (*Entry, error) { + // Blocks cache does not allow updating, so it rejects values with the same key + _, exists := c.entries[key] + if exists { + return nil, fmt.Errorf(errAlreadyExists, key) + } + + entry := &Entry{ + Key: key, + Value: value, + created: time.Now(), + refCount: atomic.NewInt32(0), + } + size := entry.Value.Size() + + // Reject items that are larger than the hard limit + if size > int64(c.cfg.HardLimit) { + // It's safe to clean up the disk, since it does not have any references + // yet. Ideally, we avoid downloading blocks that do not fit into the cache + // upfront. + _ = c.remove(entry) + return nil, fmt.Errorf(errTooBig, key) + } + + // Allow adding the new item even if the cache exceeds its soft limit. + // However, broadcast the condition that the cache should be cleaned up. + if c.currSizeBytes+size > int64(c.cfg.SoftLimit) { + level.Debug(c.logger).Log( + "msg", "adding item exceeds soft limit", + "action", "trigger soft eviction", + "curr_size_bytes", c.currSizeBytes, + "entry_size_bytes", size, + "soft_limit_bytes", c.cfg.SoftLimit, + "hard_limit_bytes", c.cfg.HardLimit, + ) + + select { + case c.triggerEviction <- struct{}{}: + // nothing + default: + level.Debug(c.logger).Log("msg", "eviction already in progress") + } + } + + // Adding an item blocks if the cache would exceed its hard limit. + if c.currSizeBytes+size > int64(c.cfg.HardLimit) { + level.Debug(c.logger).Log( + "msg", "adding item exceeds hard limit", + "action", "evict items until space is freed", + "curr_size_bytes", c.currSizeBytes, + "entry_size_bytes", size, + "soft_limit_bytes", c.cfg.SoftLimit, + "hard_limit_bytes", c.cfg.HardLimit, + ) + // TODO(chaudum): implement case + return nil, errors.New("todo: implement waiting for evictions to free up space") + } + + // Cache has space to add the item. + c.entries[key] = c.lru.PushFront(entry) + c.currSizeBytes += size + + c.metrics.entriesAdded.Inc() + c.metrics.entriesCurrent.Inc() + c.metrics.usageBytes.Set(float64(c.currSizeBytes)) + return entry, nil +} + +func (c *BlocksCache) evict(key string, element *list.Element, reason string) { + entry := element.Value.(*Entry) + err := c.remove(entry) + if err != nil { + level.Error(c.logger).Log("msg", "failed to remove entry from disk", "err", err) + return + } + c.lru.Remove(element) + delete(c.entries, key) + c.currSizeBytes -= entry.Value.Size() + c.metrics.entriesCurrent.Dec() + c.metrics.entriesEvicted.WithLabelValues(reason).Inc() +} + +// Get implements Cache. +// Get returns the stored value against the given key. +func (c *BlocksCache) Get(ctx context.Context, key string) (BlockDirectory, bool) { + if ctx.Err() != nil { + return BlockDirectory{}, false + } + + c.lock.RLock() + defer c.lock.RUnlock() + + entry := c.get(key) + if entry == nil { + return BlockDirectory{}, false + } + return entry.Value, true +} + +func (c *BlocksCache) get(key string) *Entry { + element, exists := c.entries[key] + if !exists { + c.metrics.misses.Inc() + return nil + } + + entry := element.Value.(*Entry) + entry.refCount.Inc() + + c.lru.MoveToFront(element) + + c.metrics.hits.Inc() + return entry +} + +// Release decrements the ref counter on the cached item with given key. +func (c *BlocksCache) Release(ctx context.Context, key string) error { + if ctx.Err() != nil { + return ctx.Err() + } + + // We can use a read lock because we only update a field on an existing entry + // and we do not modify the map of entries or the order in the LRU list. + c.lock.RLock() + defer c.lock.RUnlock() + + element, exists := c.entries[key] + if !exists { + return nil + } + + entry := element.Value.(*Entry) + entry.refCount.Dec() + return nil +} + +// Stop implements Cache. +func (c *BlocksCache) Stop() { + c.lock.Lock() + defer c.lock.Unlock() + + c.entries = make(map[string]*list.Element) + c.lru.Init() + + c.metrics.entriesCurrent.Set(float64(0)) + c.metrics.usageBytes.Set(float64(0)) + + close(c.done) +} + +func (c *BlocksCache) remove(entry *Entry) error { + level.Info(c.logger).Log("msg", "remove entry from disk", "path", entry.Value.Path) + err := os.RemoveAll(entry.Value.Path) + if err != nil { + return fmt.Errorf("error removing bloom block directory from disk: %w", err) + } + return nil +} + +func (c *BlocksCache) runMetricsCollectJob(interval time.Duration) { + level.Info(c.logger).Log("msg", "run metrics collect job") + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-c.done: + return + case <-ticker.C: + c.metrics.Collect() + } + } +} + +func (c *BlocksCache) runLRUevictJob() { + level.Info(c.logger).Log("msg", "run lru evict job") + for { + select { + case <-c.done: + return + case <-c.triggerEviction: + c.evictLeastRecentlyUsedItems() + } + } +} + +func (c *BlocksCache) evictLeastRecentlyUsedItems() { + c.lock.Lock() + defer c.lock.Unlock() + + level.Debug(c.logger).Log( + "msg", "evict least recently used entries", + "curr_size_bytes", c.currSizeBytes, + "soft_limit_bytes", c.cfg.SoftLimit, + "hard_limit_bytes", c.cfg.HardLimit, + ) + elem := c.lru.Back() + for c.currSizeBytes >= int64(c.cfg.SoftLimit) && elem != nil { + entry := elem.Value.(*Entry) + if entry.refCount.Load() == 0 { + level.Debug(c.logger).Log( + "msg", "evict least recently used entry", + "entry", entry.Key, + ) + c.evict(entry.Key, elem, reasonFull) + } + elem = elem.Prev() + } +} + +func (c *BlocksCache) runTTLEvictJob(interval, ttl time.Duration) { + if interval == 0 || ttl == 0 { + return + } + + level.Info(c.logger).Log("msg", "run ttl evict job") + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-c.done: + return + case <-ticker.C: + c.evictExpiredItems(ttl) + } + } +} + +// evictExpiredItems prunes items in the cache that exceeded their ttl after last access +func (c *BlocksCache) evictExpiredItems(ttl time.Duration) { + c.lock.Lock() + defer c.lock.Unlock() + + level.Debug(c.logger).Log( + "msg", "evict expired entries", + "curr_size_bytes", c.currSizeBytes, + "soft_limit_bytes", c.cfg.SoftLimit, + "hard_limit_bytes", c.cfg.HardLimit, + ) + for k, v := range c.entries { + entry := v.Value.(*Entry) + if time.Since(entry.created) > ttl && entry.refCount.Load() == 0 { + level.Debug(c.logger).Log( + "msg", "evict expired entry", + "entry", entry.Key, + "age", time.Since(entry.created), + "ttl", ttl, + ) + c.evict(k, v, reasonExpired) + } + } +} diff --git a/pkg/storage/stores/shipper/bloomshipper/blockscache_test.go b/pkg/storage/stores/shipper/bloomshipper/blockscache_test.go new file mode 100644 index 0000000000000..5a036d7c3457d --- /dev/null +++ b/pkg/storage/stores/shipper/bloomshipper/blockscache_test.go @@ -0,0 +1,413 @@ +package bloomshipper + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/grafana/loki/pkg/storage/chunk/cache" +) + +var ( + logger = log.NewNopLogger() +) + +func TestBlocksCacheConfig_Validate(t *testing.T) { + for _, tc := range []struct { + desc string + cfg BlocksCacheConfig + err error + }{ + { + desc: "not enabled does not yield error when incorrectly configured", + cfg: BlocksCacheConfig{Enabled: false}, + err: nil, + }, + { + desc: "ttl not set", + cfg: BlocksCacheConfig{ + Enabled: true, + SoftLimit: 1, + HardLimit: 2, + }, + err: errors.New("blocks cache ttl must not be 0"), + }, + { + desc: "soft limit not set", + cfg: BlocksCacheConfig{ + Enabled: true, + TTL: 1, + HardLimit: 2, + }, + err: errors.New("blocks cache soft_limit must not be 0"), + }, + { + desc: "hard limit not set", + cfg: BlocksCacheConfig{ + Enabled: true, + TTL: 1, + SoftLimit: 1, + }, + err: errors.New("blocks cache soft_limit must not be greater than hard_limit"), + }, + { + desc: "soft limit greater than hard limit", + cfg: BlocksCacheConfig{ + Enabled: true, + TTL: 1, + SoftLimit: 2, + HardLimit: 1, + }, + err: errors.New("blocks cache soft_limit must not be greater than hard_limit"), + }, + { + desc: "all good", + cfg: BlocksCacheConfig{ + Enabled: true, + TTL: 1, + SoftLimit: 1, + HardLimit: 2, + }, + err: nil, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + err := tc.cfg.Validate() + if tc.err != nil { + require.ErrorContains(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestBlocksCache_ErrorCases(t *testing.T) { + cfg := BlocksCacheConfig{ + Enabled: true, + TTL: time.Hour, + SoftLimit: flagext.Bytes(100), + HardLimit: flagext.Bytes(200), + } + cache := NewFsBlocksCache(cfg, nil, logger) + t.Cleanup(cache.Stop) + + t.Run("cancelled context", func(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + cancel() + + err := cache.Put(ctx, "key", BlockDirectory{}) + require.ErrorContains(t, err, "context canceled") + + err = cache.PutMany(ctx, []string{"key"}, []BlockDirectory{{}}) + require.ErrorContains(t, err, "context canceled") + + _, ok := cache.Get(ctx, "key") + require.False(t, ok) + }) + + t.Run("duplicate keys", func(t *testing.T) { + ctx := context.Background() + + err := cache.Put(ctx, "key", CacheValue("a", 10)) + require.NoError(t, err) + + err = cache.Put(ctx, "key", CacheValue("b", 10)) + require.ErrorContains(t, err, fmt.Sprintf(errAlreadyExists, "key")) + }) + + t.Run("multierror when putting many fails", func(t *testing.T) { + ctx := context.Background() + + err := cache.PutMany( + ctx, + []string{"x", "y", "x", "z"}, + []BlockDirectory{ + CacheValue("x", 2), + CacheValue("y", 2), + CacheValue("x", 2), + CacheValue("z", 250), + }, + ) + require.ErrorContains(t, err, "2 errors: entry already exists: x; entry exceeds hard limit: z") + }) + + // TODO(chaudum): Implement blocking evictions + t.Run("todo: blocking evictions", func(t *testing.T) { + ctx := context.Background() + + err := cache.Put(ctx, "a", CacheValue("a", 5)) + require.NoError(t, err) + + err = cache.Put(ctx, "b", CacheValue("b", 10)) + require.NoError(t, err) + + err = cache.Put(ctx, "c", CacheValue("c", 190)) + require.Error(t, err, "todo: implement waiting for evictions to free up space") + }) +} + +func CacheValue(path string, size int64) BlockDirectory { + return BlockDirectory{ + Path: path, + size: size, + } +} + +func TestBlocksCache_PutAndGet(t *testing.T) { + cfg := BlocksCacheConfig{ + Enabled: true, + TTL: time.Hour, + SoftLimit: flagext.Bytes(10), + HardLimit: flagext.Bytes(20), + // no need for TTL evictions + PurgeInterval: time.Minute, + } + cache := NewFsBlocksCache(cfg, nil, logger) + t.Cleanup(cache.Stop) + + ctx := context.Background() + err := cache.PutMany( + ctx, + []string{"a", "b", "c"}, + []BlockDirectory{CacheValue("a", 1), CacheValue("b", 2), CacheValue("c", 3)}, + ) + require.NoError(t, err) + + // key does not exist + _, found := cache.Get(ctx, "d") + require.False(t, found) + + // existing keys + _, found = cache.Get(ctx, "b") + require.True(t, found) + _, found = cache.Get(ctx, "c") + require.True(t, found) + _, found = cache.Get(ctx, "a") + require.True(t, found) + + require.Equal(t, 3, cache.lru.Len()) + + // check LRU order + elem := cache.lru.Front() + require.Equal(t, "a", elem.Value.(*Entry).Key) + require.Equal(t, int32(1), elem.Value.(*Entry).refCount.Load()) + + elem = elem.Next() + require.Equal(t, "c", elem.Value.(*Entry).Key) + require.Equal(t, int32(1), elem.Value.(*Entry).refCount.Load()) + + elem = elem.Next() + require.Equal(t, "b", elem.Value.(*Entry).Key) + require.Equal(t, int32(1), elem.Value.(*Entry).refCount.Load()) + + // fetch more + _, _ = cache.Get(ctx, "a") + _, _ = cache.Get(ctx, "a") + _, _ = cache.Get(ctx, "b") + + // LRU order changed + elem = cache.lru.Front() + require.Equal(t, "b", elem.Value.(*Entry).Key) + require.Equal(t, int32(2), elem.Value.(*Entry).refCount.Load()) + + elem = elem.Next() + require.Equal(t, "a", elem.Value.(*Entry).Key) + require.Equal(t, int32(3), elem.Value.(*Entry).refCount.Load()) + + elem = elem.Next() + require.Equal(t, "c", elem.Value.(*Entry).Key) + require.Equal(t, int32(1), elem.Value.(*Entry).refCount.Load()) + +} + +func TestBlocksCache_TTLEviction(t *testing.T) { + cfg := BlocksCacheConfig{ + Enabled: true, + TTL: 100 * time.Millisecond, + SoftLimit: flagext.Bytes(10), + HardLimit: flagext.Bytes(20), + + PurgeInterval: 100 * time.Millisecond, + } + cache := NewFsBlocksCache(cfg, nil, logger) + t.Cleanup(cache.Stop) + + ctx := context.Background() + + err := cache.Put(ctx, "a", CacheValue("a", 5)) + require.NoError(t, err) + time.Sleep(75 * time.Millisecond) + + err = cache.Put(ctx, "b", CacheValue("b", 5)) + require.NoError(t, err) + time.Sleep(75 * time.Millisecond) + + // "a" got evicted + _, found := cache.Get(ctx, "a") + require.False(t, found) + + // "b" is still in cache + _, found = cache.Get(ctx, "b") + require.True(t, found) + + require.Equal(t, 1, cache.lru.Len()) + require.Equal(t, 1, len(cache.entries)) +} + +func TestBlocksCache_LRUEviction(t *testing.T) { + cfg := BlocksCacheConfig{ + Enabled: true, + TTL: time.Hour, + SoftLimit: flagext.Bytes(15), + HardLimit: flagext.Bytes(20), + // no need for TTL evictions + PurgeInterval: time.Minute, + } + cache := NewFsBlocksCache(cfg, nil, logger) + t.Cleanup(cache.Stop) + + ctx := context.Background() + + // oldest with refcount - will not be evicted + err := cache.PutInc(ctx, "a", CacheValue("a", 4)) + require.NoError(t, err) + // will become newest with Get() call + err = cache.Put(ctx, "b", CacheValue("b", 4)) + require.NoError(t, err) + // oldest without refcount - will be evicted + err = cache.Put(ctx, "c", CacheValue("c", 4)) + require.NoError(t, err) + + // increase ref counter on "b" + _, found := cache.Get(ctx, "b") + require.True(t, found) + + // exceed soft limit + err = cache.Put(ctx, "d", CacheValue("d", 4)) + require.NoError(t, err) + + time.Sleep(time.Second) + + require.Equal(t, 3, cache.lru.Len()) + require.Equal(t, 3, len(cache.entries)) + + // key "b" was evicted because it was the oldest + // and it had no ref counts + _, found = cache.Get(ctx, "c") + require.False(t, found) + + require.Equal(t, int64(12), cache.currSizeBytes) +} + +func TestBlocksCache_RefCounter(t *testing.T) { + cfg := BlocksCacheConfig{ + Enabled: true, + TTL: time.Hour, + SoftLimit: flagext.Bytes(10), + HardLimit: flagext.Bytes(20), + // no need for TTL evictions + PurgeInterval: time.Minute, + } + cache := NewFsBlocksCache(cfg, nil, logger) + t.Cleanup(cache.Stop) + + ctx := context.Background() + + _ = cache.PutInc(ctx, "a", CacheValue("a", 5)) + require.Equal(t, int32(1), cache.entries["a"].Value.(*Entry).refCount.Load()) + + _, _ = cache.Get(ctx, "a") + require.Equal(t, int32(2), cache.entries["a"].Value.(*Entry).refCount.Load()) + + _ = cache.Release(ctx, "a") + require.Equal(t, int32(1), cache.entries["a"].Value.(*Entry).refCount.Load()) + + _ = cache.Release(ctx, "a") + require.Equal(t, int32(0), cache.entries["a"].Value.(*Entry).refCount.Load()) +} + +func prepareBenchmark(b *testing.B) map[string]BlockDirectory { + b.Helper() + + entries := make(map[string]BlockDirectory) + for i := 0; i < 1000; i++ { + key := fmt.Sprintf("block-%04x", i) + entries[key] = BlockDirectory{ + BlockRef: BlockRef{}, + Path: fmt.Sprintf("blooms/%s", key), + removeDirectoryTimeout: time.Minute, + refCount: atomic.NewInt32(0), + logger: logger, + activeQueriersCheckInterval: time.Minute, + size: 4 << 10, + } + } + return entries +} + +func Benchmark_BlocksCacheOld(b *testing.B) { + prepareBenchmark(b) + b.StopTimer() + cfg := cache.EmbeddedCacheConfig{ + Enabled: true, + MaxSizeMB: 100, + MaxSizeItems: 10000, + TTL: time.Hour, + PurgeInterval: time.Hour, + } + cache := NewBlocksCache(cfg, nil, logger) + entries := prepareBenchmark(b) + ctx := context.Background() + b.ReportAllocs() + b.StartTimer() + + // write + for k, v := range entries { + err := cache.Store(ctx, []string{k}, []BlockDirectory{v}) + require.NoError(b, err) + } + for i := 0; i < b.N; i++ { + // read + for k := range entries { + _, _ = cache.Get(ctx, k) + } + } + +} + +func Benchmark_BlocksCacheNew(b *testing.B) { + prepareBenchmark(b) + b.StopTimer() + cfg := BlocksCacheConfig{ + Enabled: true, + SoftLimit: 100 << 20, + HardLimit: 120 << 20, + TTL: time.Hour, + PurgeInterval: time.Hour, + } + cache := NewFsBlocksCache(cfg, nil, logger) + entries := prepareBenchmark(b) + ctx := context.Background() + b.ReportAllocs() + b.StartTimer() + + // write + for k, v := range entries { + _ = cache.PutMany(ctx, []string{k}, []BlockDirectory{v}) + } + // read + for i := 0; i < b.N; i++ { + for k := range entries { + _, _ = cache.Get(ctx, k) + } + } +}