Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BloomShipper: add cache for downloaded blocks #11394

Merged
merged 7 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2281,6 +2281,29 @@ bloom_shipper:
# the tasks above this limit will fail an error.
# CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant
[max_tasks_enqueued_per_tenant: <int> | default = 10000]

blocks_cache:
# Whether embedded cache is enabled.
# CLI flag: -blocks-cache.enabled
[enabled: <boolean> | default = false]

# Maximum memory size of the cache in MB.
# CLI flag: -blocks-cache.max-size-mb
[max_size_mb: <int> | default = 100]
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved

# Maximum number of entries in the cache.
# CLI flag: -blocks-cache.max-size-items
[max_size_items: <int> | default = 0]

# The time to live for items in the cache before they get purged.
# CLI flag: -blocks-cache.ttl
[ttl: <duration> | default = 0s]

# During this period the process waits until the directory becomes not used
# and only after this it will be deleted. If the timeout reached, the
# directory force deleted.
# CLI flag: -blocks-cache.remove-directory-graceful-period
[remove_directory_graceful_period: <duration> | default = 5m]
```

### chunk_store_config
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ require (
golang.org/x/text v0.13.0
google.golang.org/protobuf v1.31.0
k8s.io/apimachinery v0.28.1
k8s.io/utils v0.0.0-20230711102312-30195339c3c7
)

require (
Expand Down Expand Up @@ -328,7 +329,6 @@ require (
k8s.io/client-go v0.28.1 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
rsc.io/binaryregexp v0.2.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
VolumeResultCache = "volume-result"
WriteDedupeCache = "write-dedupe"
BloomFilterCache = "bloom-filter"
BloomBlocksCache = "bloom-blocks"
)

// NewContext creates a new statistics context
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f
cfg.Memcache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f)
cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix+"embedded-cache.", description, f)
f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.")
f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.")
f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.")
Expand Down
48 changes: 26 additions & 22 deletions pkg/storage/chunk/cache/embeddedcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ type EmbeddedCache[K comparable, V any] struct {
memoryBytes prometheus.Gauge
}

type cacheEntry[K comparable, V any] struct {
type Entry[K comparable, V any] struct {
updated time.Time
key K
value V
Key K
Value V
}

// EmbeddedCacheConfig represents in-process embedded cache config.
Expand All @@ -77,17 +77,21 @@ type EmbeddedCacheConfig struct {
}

func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, prefix+"embedded-cache.enabled", false, description+"Whether embedded cache is enabled.")
f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.")
f.IntVar(&cfg.MaxSizeItems, prefix+"embedded-cache.max-size-items", 0, description+"Maximum number of entries in the cache.")
f.DurationVar(&cfg.TTL, prefix+"embedded-cache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.")
cfg.RegisterFlagsWithPrefixAndDefaults(prefix, description, f, time.Hour)
}

func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefixAndDefaults(prefix, description string, f *flag.FlagSet, defaultTTL time.Duration) {
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, description+"Whether embedded cache is enabled.")
f.Int64Var(&cfg.MaxSizeMB, prefix+"max-size-mb", 100, description+"Maximum memory size of the cache in MB.")
f.IntVar(&cfg.MaxSizeItems, prefix+"max-size-items", 0, description+"Maximum number of entries in the cache.")
f.DurationVar(&cfg.TTL, prefix+"ttl", defaultTTL, description+"The time to live for items in the cache before they get purged.")
}

func (cfg *EmbeddedCacheConfig) IsEnabled() bool {
return cfg.Enabled
}

type cacheEntrySizeCalculator[K comparable, V any] func(entry *cacheEntry[K, V]) uint64
type cacheEntrySizeCalculator[K comparable, V any] func(entry *Entry[K, V]) uint64

// NewEmbeddedCache returns a new initialised EmbeddedCache where the key is a string and the value is a slice of bytes.
func NewEmbeddedCache(name string, cfg EmbeddedCacheConfig, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) *EmbeddedCache[string, []byte] {
Expand Down Expand Up @@ -191,7 +195,7 @@ func (c *EmbeddedCache[K, V]) pruneExpiredItems(ttl time.Duration) {
defer c.lock.Unlock()

for k, v := range c.entries {
entry := v.Value.(*cacheEntry[K, V])
entry := v.Value.(*Entry[K, V])
if time.Since(entry.updated) > ttl {
c.remove(k, v, expiredReason)
}
Expand Down Expand Up @@ -244,10 +248,10 @@ func (c *EmbeddedCache[K, V]) GetCacheType() stats.CacheType {
}

func (c *EmbeddedCache[K, V]) remove(key K, element *list.Element, reason string) {
entry := c.lru.Remove(element).(*cacheEntry[K, V])
entry := c.lru.Remove(element).(*Entry[K, V])
delete(c.entries, key)
if c.onEntryRemoved != nil {
c.onEntryRemoved(entry.key, entry.value)
c.onEntryRemoved(entry.Key, entry.Value)
}
c.currSizeBytes -= c.cacheEntrySizeCalculator(entry)
c.entriesCurrent.Dec()
Expand All @@ -262,10 +266,10 @@ func (c *EmbeddedCache[K, V]) put(key K, value V) {
c.remove(key, element, replacedReason)
}

entry := &cacheEntry[K, V]{
entry := &Entry[K, V]{
updated: time.Now(),
key: key,
value: value,
Key: key,
Value: value,
}
entrySz := c.cacheEntrySizeCalculator(entry)

Expand All @@ -285,8 +289,8 @@ func (c *EmbeddedCache[K, V]) put(key K, value V) {
if lastElement == nil {
break
}
entryToRemove := lastElement.Value.(*cacheEntry[K, V])
c.remove(entryToRemove.key, lastElement, fullReason)
entryToRemove := lastElement.Value.(*Entry[K, V])
c.remove(entryToRemove.Key, lastElement, fullReason)
}

// Finally, we have space to add the item.
Expand All @@ -306,17 +310,17 @@ func (c *EmbeddedCache[K, V]) Get(_ context.Context, key K) (V, bool) {

element, ok := c.entries[key]
if ok {
entry := element.Value.(*cacheEntry[K, V])
return entry.value, true
entry := element.Value.(*Entry[K, V])
return entry.Value, true
}
var empty V
return empty, false
}

func sizeOf(item *cacheEntry[string, []byte]) uint64 {
return uint64(int(unsafe.Sizeof(*item)) + // size of cacheEntry
len(item.key) + // size of key
cap(item.value) + // size of value
func sizeOf(item *Entry[string, []byte]) uint64 {
return uint64(int(unsafe.Sizeof(*item)) + // size of Entry
len(item.Key) + // size of Key
cap(item.Value) + // size of Value
elementSize + // size of the element in linked list
elementPrtSize) // size of the pointer to an element in the map
}
16 changes: 8 additions & 8 deletions pkg/storage/chunk/cache/embeddedcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ func TestEmbeddedCacheEviction(t *testing.T) {
// compute value size such that 10 entries account to exactly 1MB.
// adding one more entry to the cache would result in eviction when MaxSizeMB is configured to a value of 1.
// value cap = target size of each entry (0.1MB) - size of cache entry with empty value.
valueCap := (1e6 / cnt) - sizeOf(&cacheEntry[string, []byte]{
key: "00",
valueCap := (1e6 / cnt) - sizeOf(&Entry[string, []byte]{
Key: "00",
})

itemTemplate := &cacheEntry[string, []byte]{
key: "00",
value: make([]byte, 0, valueCap),
itemTemplate := &Entry[string, []byte]{
Key: "00",
Value: make([]byte, 0, valueCap),
}

tests := []struct {
Expand Down Expand Up @@ -176,9 +176,9 @@ func TestEmbeddedCacheExpiry(t *testing.T) {
key1, key2, key3, key4 := "01", "02", "03", "04"
data1, data2, data3, data4 := genBytes(32), genBytes(64), genBytes(128), genBytes(32)

memorySz := sizeOf(&cacheEntry[string, []byte]{key: key1, value: data1}) +
sizeOf(&cacheEntry[string, []byte]{key: key2, value: data2}) +
sizeOf(&cacheEntry[string, []byte]{key: key3, value: data3})
memorySz := sizeOf(&Entry[string, []byte]{Key: key1, Value: data1}) +
sizeOf(&Entry[string, []byte]{Key: key2, Value: data2}) +
sizeOf(&Entry[string, []byte]{Key: key3, Value: data3})

cfg := EmbeddedCacheConfig{
MaxSizeItems: 3,
Expand Down
Loading
Loading