From f838b1feb4c7d570bd8664eade5453906ba333f8 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 19 Feb 2024 17:18:48 +0100 Subject: [PATCH] Rename callback functions Signed-off-by: Christian Haudum --- pkg/storage/chunk/cache/embeddedcache.go | 35 ++++++++++--------- pkg/storage/chunk/cache/embeddedcache_test.go | 10 +++--- .../stores/shipper/bloomshipper/cache.go | 8 ++--- 3 files changed, 26 insertions(+), 27 deletions(-) diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index 46eb204125b0a..a55cfeca6c920 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -39,6 +39,9 @@ type TypedCache[K comparable, V any] interface { GetCacheType() stats.CacheType } +type EntrySizeFunc[K comparable, V any] func(entry *Entry[K, V]) uint64 +type Hook[K comparable, V any] func(entry *Entry[K, V]) + // EmbeddedCache is a simple (comparable -> any) cache which uses a fifo slide to // manage evictions. O(1) inserts and updates, O(1) gets. // @@ -55,10 +58,10 @@ type EmbeddedCache[K comparable, V any] struct { currSizeBytes uint64 entries map[K]*list.Element - cacheEntrySizeCalculator[K, V] - lru *list.List + lru *list.List - onEntryRemoved func(key K, value V) + sizeOf EntrySizeFunc[K, V] + entryRemovedHook Hook[K, V] done chan struct{} @@ -101,8 +104,6 @@ func (cfg *EmbeddedCacheConfig) IsEnabled() bool { return cfg.Enabled } -type cacheEntrySizeCalculator[K comparable, V any] func(entry *Entry[K, V]) uint64 - // NewEmbeddedCache returns a new initialised EmbeddedCache where the key is a string and the value is a slice of bytes. func NewEmbeddedCache(name string, cfg EmbeddedCacheConfig, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) *EmbeddedCache[string, []byte] { return NewTypedEmbeddedCache[string, []byte](name, cfg, reg, logger, cacheType, sizeOf, nil) @@ -118,8 +119,8 @@ func NewTypedEmbeddedCache[K comparable, V any]( reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType, - entrySizeCalculator cacheEntrySizeCalculator[K, V], - onEntryRemoved func(key K, value V), + entrySize EntrySizeFunc[K, V], + onEntryRemoved Hook[K, V], ) *EmbeddedCache[K, V] { if cfg.MaxSizeMB == 0 && cfg.MaxSizeItems == 0 { // zero cache capacity - no need to create cache @@ -136,12 +137,12 @@ func NewTypedEmbeddedCache[K comparable, V any]( cache := &EmbeddedCache[K, V]{ cacheType: cacheType, - maxSizeItems: cfg.MaxSizeItems, - maxSizeBytes: uint64(cfg.MaxSizeMB * 1e6), - entries: make(map[K]*list.Element), - lru: list.New(), - cacheEntrySizeCalculator: entrySizeCalculator, - onEntryRemoved: onEntryRemoved, + maxSizeItems: cfg.MaxSizeItems, + maxSizeBytes: uint64(cfg.MaxSizeMB * 1e6), + entries: make(map[K]*list.Element), + lru: list.New(), + sizeOf: entrySize, + entryRemovedHook: onEntryRemoved, done: make(chan struct{}), @@ -259,10 +260,10 @@ func (c *EmbeddedCache[K, V]) GetCacheType() stats.CacheType { func (c *EmbeddedCache[K, V]) remove(key K, element *list.Element, reason string) { entry := c.lru.Remove(element).(*Entry[K, V]) - sz := c.cacheEntrySizeCalculator(entry) + sz := c.sizeOf(entry) delete(c.entries, key) - if c.onEntryRemoved != nil { - c.onEntryRemoved(entry.Key, entry.Value) + if c.entryRemovedHook != nil { + c.entryRemovedHook(entry) } c.currSizeBytes -= sz c.entriesCurrent.Dec() @@ -282,7 +283,7 @@ func (c *EmbeddedCache[K, V]) put(key K, value V) { Key: key, Value: value, } - entrySz := c.cacheEntrySizeCalculator(entry) + entrySz := c.sizeOf(entry) if c.maxSizeBytes > 0 && entrySz > c.maxSizeBytes { // Cannot keep this item in the cache. diff --git a/pkg/storage/chunk/cache/embeddedcache_test.go b/pkg/storage/chunk/cache/embeddedcache_test.go index 473c1b8e83a09..850669da2a8da 100644 --- a/pkg/storage/chunk/cache/embeddedcache_test.go +++ b/pkg/storage/chunk/cache/embeddedcache_test.go @@ -48,10 +48,9 @@ func TestEmbeddedCacheEviction(t *testing.T) { for _, test := range tests { removedEntriesCount := atomic.NewInt64(0) - onEntryRemoved := func(key string, value []byte) { + c := NewTypedEmbeddedCache[string, []byte](test.name, test.cfg, nil, log.NewNopLogger(), "test", sizeOf, func(_ *Entry[string, []byte]) { removedEntriesCount.Inc() - } - c := NewTypedEmbeddedCache[string, []byte](test.name, test.cfg, nil, log.NewNopLogger(), "test", sizeOf, onEntryRemoved) + }) ctx := context.Background() // Check put / get works @@ -187,10 +186,9 @@ func TestEmbeddedCacheExpiry(t *testing.T) { } removedEntriesCount := atomic.NewInt64(0) - onEntryRemoved := func(key string, value []byte) { + c := NewTypedEmbeddedCache[string, []byte]("cache_exprity_test", cfg, nil, log.NewNopLogger(), "test", sizeOf, func(_ *Entry[string, []byte]) { removedEntriesCount.Inc() - } - c := NewTypedEmbeddedCache[string, []byte]("cache_exprity_test", cfg, nil, log.NewNopLogger(), "test", sizeOf, onEntryRemoved) + }) ctx := context.Background() err := c.Store(ctx, []string{key1, key2, key3, key4}, [][]byte{data1, data2, data3, data4}) diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index bfac656cc35d4..c7e5b80fd3c0b 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -46,9 +46,7 @@ func NewBlocksCache(cfg cache.EmbeddedCacheConfig, reg prometheus.Registerer, lo logger, stats.BloomBlocksCache, directorySize, - func(_ string, value BlockDirectory) { - removeBlockDirectory(value) - }, + removeBlockDirectory, ) } @@ -180,7 +178,9 @@ const defaultActiveQueriersCheckInterval = 100 * time.Millisecond // The function needs to be synchronous, because otherwise we could get a cache // race condition where the item is already evicted from the cache, but the // underlying directory isn't. -func removeBlockDirectory(b BlockDirectory) { +func removeBlockDirectory(entry *cache.Entry[string, BlockDirectory]) { + b := entry.Value + timeout := time.After(b.removeDirectoryTimeout) ticker := time.NewTicker(b.activeQueriersCheckInterval) defer ticker.Stop()