Skip to content

Commit

Permalink
Rename callback functions
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Mar 14, 2024
1 parent b7b2e58 commit f838b1f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 27 deletions.
35 changes: 18 additions & 17 deletions pkg/storage/chunk/cache/embeddedcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type TypedCache[K comparable, V any] interface {
GetCacheType() stats.CacheType
}

type EntrySizeFunc[K comparable, V any] func(entry *Entry[K, V]) uint64
type Hook[K comparable, V any] func(entry *Entry[K, V])

// EmbeddedCache is a simple (comparable -> any) cache which uses a fifo slide to
// manage evictions. O(1) inserts and updates, O(1) gets.
//
Expand All @@ -55,10 +58,10 @@ type EmbeddedCache[K comparable, V any] struct {
currSizeBytes uint64

entries map[K]*list.Element
cacheEntrySizeCalculator[K, V]
lru *list.List
lru *list.List

onEntryRemoved func(key K, value V)
sizeOf EntrySizeFunc[K, V]
entryRemovedHook Hook[K, V]

done chan struct{}

Expand Down Expand Up @@ -101,8 +104,6 @@ func (cfg *EmbeddedCacheConfig) IsEnabled() bool {
return cfg.Enabled
}

type cacheEntrySizeCalculator[K comparable, V any] func(entry *Entry[K, V]) uint64

// NewEmbeddedCache returns a new initialised EmbeddedCache where the key is a string and the value is a slice of bytes.
func NewEmbeddedCache(name string, cfg EmbeddedCacheConfig, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) *EmbeddedCache[string, []byte] {
return NewTypedEmbeddedCache[string, []byte](name, cfg, reg, logger, cacheType, sizeOf, nil)
Expand All @@ -118,8 +119,8 @@ func NewTypedEmbeddedCache[K comparable, V any](
reg prometheus.Registerer,
logger log.Logger,
cacheType stats.CacheType,
entrySizeCalculator cacheEntrySizeCalculator[K, V],
onEntryRemoved func(key K, value V),
entrySize EntrySizeFunc[K, V],
onEntryRemoved Hook[K, V],
) *EmbeddedCache[K, V] {
if cfg.MaxSizeMB == 0 && cfg.MaxSizeItems == 0 {
// zero cache capacity - no need to create cache
Expand All @@ -136,12 +137,12 @@ func NewTypedEmbeddedCache[K comparable, V any](
cache := &EmbeddedCache[K, V]{
cacheType: cacheType,

maxSizeItems: cfg.MaxSizeItems,
maxSizeBytes: uint64(cfg.MaxSizeMB * 1e6),
entries: make(map[K]*list.Element),
lru: list.New(),
cacheEntrySizeCalculator: entrySizeCalculator,
onEntryRemoved: onEntryRemoved,
maxSizeItems: cfg.MaxSizeItems,
maxSizeBytes: uint64(cfg.MaxSizeMB * 1e6),
entries: make(map[K]*list.Element),
lru: list.New(),
sizeOf: entrySize,
entryRemovedHook: onEntryRemoved,

done: make(chan struct{}),

Expand Down Expand Up @@ -259,10 +260,10 @@ func (c *EmbeddedCache[K, V]) GetCacheType() stats.CacheType {

func (c *EmbeddedCache[K, V]) remove(key K, element *list.Element, reason string) {
entry := c.lru.Remove(element).(*Entry[K, V])
sz := c.cacheEntrySizeCalculator(entry)
sz := c.sizeOf(entry)
delete(c.entries, key)
if c.onEntryRemoved != nil {
c.onEntryRemoved(entry.Key, entry.Value)
if c.entryRemovedHook != nil {
c.entryRemovedHook(entry)
}
c.currSizeBytes -= sz
c.entriesCurrent.Dec()
Expand All @@ -282,7 +283,7 @@ func (c *EmbeddedCache[K, V]) put(key K, value V) {
Key: key,
Value: value,
}
entrySz := c.cacheEntrySizeCalculator(entry)
entrySz := c.sizeOf(entry)

if c.maxSizeBytes > 0 && entrySz > c.maxSizeBytes {
// Cannot keep this item in the cache.
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/chunk/cache/embeddedcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ func TestEmbeddedCacheEviction(t *testing.T) {

for _, test := range tests {
removedEntriesCount := atomic.NewInt64(0)
onEntryRemoved := func(key string, value []byte) {
c := NewTypedEmbeddedCache[string, []byte](test.name, test.cfg, nil, log.NewNopLogger(), "test", sizeOf, func(_ *Entry[string, []byte]) {
removedEntriesCount.Inc()
}
c := NewTypedEmbeddedCache[string, []byte](test.name, test.cfg, nil, log.NewNopLogger(), "test", sizeOf, onEntryRemoved)
})
ctx := context.Background()

// Check put / get works
Expand Down Expand Up @@ -187,10 +186,9 @@ func TestEmbeddedCacheExpiry(t *testing.T) {
}

removedEntriesCount := atomic.NewInt64(0)
onEntryRemoved := func(key string, value []byte) {
c := NewTypedEmbeddedCache[string, []byte]("cache_exprity_test", cfg, nil, log.NewNopLogger(), "test", sizeOf, func(_ *Entry[string, []byte]) {
removedEntriesCount.Inc()
}
c := NewTypedEmbeddedCache[string, []byte]("cache_exprity_test", cfg, nil, log.NewNopLogger(), "test", sizeOf, onEntryRemoved)
})
ctx := context.Background()

err := c.Store(ctx, []string{key1, key2, key3, key4}, [][]byte{data1, data2, data3, data4})
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ func NewBlocksCache(cfg cache.EmbeddedCacheConfig, reg prometheus.Registerer, lo
logger,
stats.BloomBlocksCache,
directorySize,
func(_ string, value BlockDirectory) {
removeBlockDirectory(value)
},
removeBlockDirectory,
)
}

Expand Down Expand Up @@ -180,7 +178,9 @@ const defaultActiveQueriersCheckInterval = 100 * time.Millisecond
// The function needs to be synchronous, because otherwise we could get a cache
// race condition where the item is already evicted from the cache, but the
// underlying directory isn't.
func removeBlockDirectory(b BlockDirectory) {
func removeBlockDirectory(entry *cache.Entry[string, BlockDirectory]) {
b := entry.Value

timeout := time.After(b.removeDirectoryTimeout)
ticker := time.NewTicker(b.activeQueriersCheckInterval)
defer ticker.Stop()
Expand Down

0 comments on commit f838b1f

Please sign in to comment.