diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index 8e412af4a81e1..b27d6a903f23e 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -23,19 +23,20 @@ const ( defaultPurgeInterval = 1 * time.Minute - expiredReason string = "expired" //nolint:staticcheck - fullReason = "full" - tooBigReason = "object too big" + expiredReason string = "expired" //nolint:staticcheck + fullReason = "full" + tooBigReason = "object too big" + replacedReason = "replaced" ) -// EmbeddedCache is a simple string -> interface{} cache which uses a fifo slide to +// EmbeddedCache is a simple (comparable -> any) cache which uses a fifo slide to // manage evictions. O(1) inserts and updates, O(1) gets. // // This embedded cache implementation supports two eviction methods - based on number of items in the cache, and based on memory usage. // For the memory-based eviction, set EmbeddedCacheConfig.MaxSizeMB to a positive integer, indicating upper limit of memory allocated by items in the cache. // Alternatively, set EmbeddedCacheConfig.MaxSizeItems to a positive integer, indicating maximum number of items in the cache. // If both parameters are set, both methods are enforced, whichever hits first. -type EmbeddedCache struct { +type EmbeddedCache[K comparable, V any] struct { cacheType stats.CacheType lock sync.RWMutex @@ -43,8 +44,11 @@ type EmbeddedCache struct { maxSizeBytes uint64 currSizeBytes uint64 - entries map[string]*list.Element - lru *list.List + entries map[K]*list.Element + cacheEntrySizeCalculator[K, V] + lru *list.List + + onEntryRemoved func(key K, value V) done chan struct{} @@ -54,10 +58,10 @@ type EmbeddedCache struct { memoryBytes prometheus.Gauge } -type cacheEntry struct { +type cacheEntry[K comparable, V any] struct { updated time.Time - key string - value []byte + key K + value V } // EmbeddedCacheConfig represents in-process embedded cache config. @@ -83,8 +87,26 @@ func (cfg *EmbeddedCacheConfig) IsEnabled() bool { return cfg.Enabled } -// NewEmbeddedCache returns a new initialised EmbeddedCache. -func NewEmbeddedCache(name string, cfg EmbeddedCacheConfig, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) *EmbeddedCache { +type cacheEntrySizeCalculator[K comparable, V any] func(entry *cacheEntry[K, V]) uint64 + +// NewEmbeddedCache returns a new initialised EmbeddedCache where the key is a string and the value is a slice of bytes. +func NewEmbeddedCache(name string, cfg EmbeddedCacheConfig, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) *EmbeddedCache[string, []byte] { + return NewTypedEmbeddedCache[string, []byte](name, cfg, reg, logger, cacheType, sizeOf, nil) +} + +// NewTypedEmbeddedCache returns a new initialised EmbeddedCache with the key and value of requested types. +// To limit the memory allocated by items in the cache, it's necessary to pass cacheEntrySizeCalculator +// that calculates the size of an entry in bytes. +// Also, this constructor allows passing the callback that will be called for the entry whenever it is removed from the cache. +func NewTypedEmbeddedCache[K comparable, V any]( + name string, + cfg EmbeddedCacheConfig, + reg prometheus.Registerer, + logger log.Logger, + cacheType stats.CacheType, + entrySizeCalculator cacheEntrySizeCalculator[K, V], + onEntryRemoved func(key K, value V), +) *EmbeddedCache[K, V] { if cfg.MaxSizeMB == 0 && cfg.MaxSizeItems == 0 { // zero cache capacity - no need to create cache level.Warn(logger).Log("msg", "neither embedded-cache.max-size-mb nor embedded-cache.max-size-items is set", "cache", name) @@ -97,13 +119,15 @@ func NewEmbeddedCache(name string, cfg EmbeddedCacheConfig, reg prometheus.Regis cfg.PurgeInterval = defaultPurgeInterval } - cache := &EmbeddedCache{ + cache := &EmbeddedCache[K, V]{ cacheType: cacheType, - maxSizeItems: cfg.MaxSizeItems, - maxSizeBytes: uint64(cfg.MaxSizeMB * 1e6), - entries: make(map[string]*list.Element), - lru: list.New(), + maxSizeItems: cfg.MaxSizeItems, + maxSizeBytes: uint64(cfg.MaxSizeMB * 1e6), + entries: make(map[K]*list.Element), + lru: list.New(), + cacheEntrySizeCalculator: entrySizeCalculator, + onEntryRemoved: onEntryRemoved, done: make(chan struct{}), @@ -147,7 +171,7 @@ func NewEmbeddedCache(name string, cfg EmbeddedCacheConfig, reg prometheus.Regis return cache } -func (c *EmbeddedCache) runPruneJob(interval, ttl time.Duration) { +func (c *EmbeddedCache[K, V]) runPruneJob(interval, ttl time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -162,40 +186,36 @@ func (c *EmbeddedCache) runPruneJob(interval, ttl time.Duration) { } // pruneExpiredItems prunes items in the cache that exceeded their ttl -func (c *EmbeddedCache) pruneExpiredItems(ttl time.Duration) { +func (c *EmbeddedCache[K, V]) pruneExpiredItems(ttl time.Duration) { c.lock.Lock() defer c.lock.Unlock() for k, v := range c.entries { - entry := v.Value.(*cacheEntry) + entry := v.Value.(*cacheEntry[K, V]) if time.Since(entry.updated) > ttl { - _ = c.lru.Remove(v).(*cacheEntry) - delete(c.entries, k) - c.currSizeBytes -= sizeOf(entry) - c.entriesCurrent.Dec() - c.entriesEvicted.WithLabelValues(expiredReason).Inc() + c.remove(k, v, expiredReason) } } } // Fetch implements Cache. -func (c *EmbeddedCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { - found, missing, bufs = make([]string, 0, len(keys)), make([]string, 0, len(keys)), make([][]byte, 0, len(keys)) +func (c *EmbeddedCache[K, V]) Fetch(ctx context.Context, keys []K) (foundKeys []K, foundValues []V, missingKeys []K, err error) { + foundKeys, missingKeys, foundValues = make([]K, 0, len(keys)), make([]K, 0, len(keys)), make([]V, 0, len(keys)) for _, key := range keys { val, ok := c.Get(ctx, key) if !ok { - missing = append(missing, key) + missingKeys = append(missingKeys, key) continue } - found = append(found, key) - bufs = append(bufs, val) + foundKeys = append(foundKeys, key) + foundValues = append(foundValues, val) } return } // Store implements Cache. -func (c *EmbeddedCache) Store(_ context.Context, keys []string, values [][]byte) error { +func (c *EmbeddedCache[K, V]) Store(_ context.Context, keys []K, values []V) error { c.lock.Lock() defer c.lock.Unlock() @@ -206,13 +226,12 @@ func (c *EmbeddedCache) Store(_ context.Context, keys []string, values [][]byte) } // Stop implements Cache. -func (c *EmbeddedCache) Stop() { +func (c *EmbeddedCache[K, V]) Stop() { c.lock.Lock() defer c.lock.Unlock() close(c.done) - - c.entries = make(map[string]*list.Element) + c.entries = make(map[K]*list.Element) c.lru.Init() c.currSizeBytes = 0 @@ -220,27 +239,35 @@ func (c *EmbeddedCache) Stop() { c.memoryBytes.Set(float64(0)) } -func (c *EmbeddedCache) GetCacheType() stats.CacheType { +func (c *EmbeddedCache[K, V]) GetCacheType() stats.CacheType { return c.cacheType } -func (c *EmbeddedCache) put(key string, value []byte) { +func (c *EmbeddedCache[K, V]) remove(key K, element *list.Element, reason string) { + entry := c.lru.Remove(element).(*cacheEntry[K, V]) + delete(c.entries, key) + if c.onEntryRemoved != nil { + c.onEntryRemoved(entry.key, entry.value) + } + c.currSizeBytes -= c.cacheEntrySizeCalculator(entry) + c.entriesCurrent.Dec() + c.entriesEvicted.WithLabelValues(reason).Inc() +} + +func (c *EmbeddedCache[K, V]) put(key K, value V) { // See if we already have the item in the cache. element, ok := c.entries[key] if ok { // Remove the item from the cache. - entry := c.lru.Remove(element).(*cacheEntry) - delete(c.entries, key) - c.currSizeBytes -= sizeOf(entry) - c.entriesCurrent.Dec() + c.remove(key, element, replacedReason) } - entry := &cacheEntry{ + entry := &cacheEntry[K, V]{ updated: time.Now(), key: key, value: value, } - entrySz := sizeOf(entry) + entrySz := c.cacheEntrySizeCalculator(entry) if c.maxSizeBytes > 0 && entrySz > c.maxSizeBytes { // Cannot keep this item in the cache. @@ -258,11 +285,8 @@ func (c *EmbeddedCache) put(key string, value []byte) { if lastElement == nil { break } - evicted := c.lru.Remove(lastElement).(*cacheEntry) - delete(c.entries, evicted.key) - c.currSizeBytes -= sizeOf(evicted) - c.entriesCurrent.Dec() - c.entriesEvicted.WithLabelValues(fullReason).Inc() + entryToRemove := lastElement.Value.(*cacheEntry[K, V]) + c.remove(entryToRemove.key, lastElement, fullReason) } // Finally, we have space to add the item. @@ -276,20 +300,20 @@ func (c *EmbeddedCache) put(key string, value []byte) { } // Get returns the stored value against the key and when the key was last updated. -func (c *EmbeddedCache) Get(_ context.Context, key string) ([]byte, bool) { +func (c *EmbeddedCache[K, V]) Get(_ context.Context, key K) (V, bool) { c.lock.RLock() defer c.lock.RUnlock() element, ok := c.entries[key] if ok { - entry := element.Value.(*cacheEntry) + entry := element.Value.(*cacheEntry[K, V]) return entry.value, true } - - return nil, false + var empty V + return empty, false } -func sizeOf(item *cacheEntry) uint64 { +func sizeOf(item *cacheEntry[string, []byte]) uint64 { return uint64(int(unsafe.Sizeof(*item)) + // size of cacheEntry len(item.key) + // size of key cap(item.value) + // size of value diff --git a/pkg/storage/chunk/cache/embeddedcache_test.go b/pkg/storage/chunk/cache/embeddedcache_test.go index 33b933456a332..b318e0f6b5a75 100644 --- a/pkg/storage/chunk/cache/embeddedcache_test.go +++ b/pkg/storage/chunk/cache/embeddedcache_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "go.uber.org/atomic" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" @@ -21,11 +23,11 @@ func TestEmbeddedCacheEviction(t *testing.T) { // compute value size such that 10 entries account to exactly 1MB. // adding one more entry to the cache would result in eviction when MaxSizeMB is configured to a value of 1. // value cap = target size of each entry (0.1MB) - size of cache entry with empty value. - valueCap := (1e6 / cnt) - sizeOf(&cacheEntry{ + valueCap := (1e6 / cnt) - sizeOf(&cacheEntry[string, []byte]{ key: "00", }) - itemTemplate := &cacheEntry{ + itemTemplate := &cacheEntry[string, []byte]{ key: "00", value: make([]byte, 0, valueCap), } @@ -45,7 +47,11 @@ func TestEmbeddedCacheEviction(t *testing.T) { } for _, test := range tests { - c := NewEmbeddedCache(test.name, test.cfg, nil, log.NewNopLogger(), "test") + removedEntriesCount := atomic.NewInt64(0) + onEntryRemoved := func(key string, value []byte) { + removedEntriesCount.Inc() + } + c := NewTypedEmbeddedCache[string, []byte](test.name, test.cfg, nil, log.NewNopLogger(), "test", sizeOf, onEntryRemoved) ctx := context.Background() // Check put / get works @@ -66,6 +72,7 @@ func TestEmbeddedCacheEviction(t *testing.T) { assert.Equal(t, testutil.ToFloat64(c.entriesAddedNew), float64(cnt)) assert.Equal(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(reason)), float64(0)) + assert.Equal(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(replacedReason)), float64(0)) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(cnt)) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(len(c.entries))) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len())) @@ -79,7 +86,9 @@ func TestEmbeddedCacheEviction(t *testing.T) { } assert.Equal(t, testutil.ToFloat64(c.entriesAddedNew), float64(cnt)) - assert.Equal(t, testutil.ToFloat64(c.entriesEvicted), float64(0)) + assert.Equal(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(reason)), float64(0)) + assert.Equal(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(replacedReason)), float64(0)) + assert.Equal(t, int64(0), removedEntriesCount.Load()) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(cnt)) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(len(c.entries))) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len())) @@ -101,6 +110,8 @@ func TestEmbeddedCacheEviction(t *testing.T) { assert.Equal(t, testutil.ToFloat64(c.entriesAddedNew), float64(cnt+evicted)) assert.Equal(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(reason)), float64(evicted)) + assert.Equal(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(replacedReason)), float64(evicted)) + assert.Equalf(t, int64(evicted+evicted), removedEntriesCount.Load(), "%d items were evicted and %d items were replaced", evicted, evicted) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(cnt)) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(len(c.entries))) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len())) @@ -119,6 +130,8 @@ func TestEmbeddedCacheEviction(t *testing.T) { assert.Equal(t, testutil.ToFloat64(c.entriesAddedNew), float64(cnt+evicted)) assert.Equal(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(reason)), float64(evicted)) + assert.Equal(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(replacedReason)), float64(evicted)) + assert.Equal(t, int64(evicted+evicted), removedEntriesCount.Load(), "During this step the count of the calls must not be changed because we do read-only operations") assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(cnt)) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(len(c.entries))) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len())) @@ -146,12 +159,16 @@ func TestEmbeddedCacheEviction(t *testing.T) { assert.Equal(t, testutil.ToFloat64(c.entriesAddedNew), float64(cnt+evicted)) assert.Equal(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(reason)), float64(evicted)) + assert.Equalf(t, testutil.ToFloat64(c.entriesEvicted.WithLabelValues(replacedReason)), float64(evicted+evicted), + "During this step we replace %d more items", evicted) + assert.Equalf(t, int64(evicted+evicted+evicted), removedEntriesCount.Load(), "During this step we replace %d more items", evicted) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(cnt)) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(len(c.entries))) assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len())) assert.Equal(t, testutil.ToFloat64(c.memoryBytes), float64(cnt*sizeOf(itemTemplate))) c.Stop() + assert.Equal(t, int64(evicted*3), removedEntriesCount.Load(), "onEntryRemoved must not be called for the items during the stop") } } @@ -159,9 +176,9 @@ func TestEmbeddedCacheExpiry(t *testing.T) { key1, key2, key3, key4 := "01", "02", "03", "04" data1, data2, data3, data4 := genBytes(32), genBytes(64), genBytes(128), genBytes(32) - memorySz := sizeOf(&cacheEntry{key: key1, value: data1}) + - sizeOf(&cacheEntry{key: key2, value: data2}) + - sizeOf(&cacheEntry{key: key3, value: data3}) + memorySz := sizeOf(&cacheEntry[string, []byte]{key: key1, value: data1}) + + sizeOf(&cacheEntry[string, []byte]{key: key2, value: data2}) + + sizeOf(&cacheEntry[string, []byte]{key: key3, value: data3}) cfg := EmbeddedCacheConfig{ MaxSizeItems: 3, @@ -169,7 +186,11 @@ func TestEmbeddedCacheExpiry(t *testing.T) { PurgeInterval: 50 * time.Millisecond, } - c := NewEmbeddedCache("cache_exprity_test", cfg, nil, log.NewNopLogger(), "test") + removedEntriesCount := atomic.NewInt64(0) + onEntryRemoved := func(key string, value []byte) { + removedEntriesCount.Inc() + } + c := NewTypedEmbeddedCache[string, []byte]("cache_exprity_test", cfg, nil, log.NewNopLogger(), "test", sizeOf, onEntryRemoved) ctx := context.Background() err := c.Store(ctx, []string{key1, key2, key3, key4}, [][]byte{data1, data2, data3, data4}) @@ -189,6 +210,7 @@ func TestEmbeddedCacheExpiry(t *testing.T) { assert.Equal(t, float64(len(c.entries)), testutil.ToFloat64(c.entriesCurrent)) assert.Equal(t, float64(c.lru.Len()), testutil.ToFloat64(c.entriesCurrent)) assert.Equal(t, float64(memorySz), testutil.ToFloat64(c.memoryBytes)) + assert.Equal(t, int64(1), removedEntriesCount.Load(), "on removal callback had to be called for key1") // Expire the item. time.Sleep(2 * cfg.TTL) @@ -202,6 +224,7 @@ func TestEmbeddedCacheExpiry(t *testing.T) { assert.Equal(t, float64(len(c.entries)), testutil.ToFloat64(c.entriesCurrent)) assert.Equal(t, float64(c.lru.Len()), testutil.ToFloat64(c.entriesCurrent)) assert.Equal(t, float64(memorySz), testutil.ToFloat64(c.memoryBytes)) + assert.Equal(t, int64(4), removedEntriesCount.Load(), "on removal callback had to be called for all 3 expired entries") c.Stop() }