Skip to content

Commit

Permalink
BloomShipper: add cache for downloaded blocks (#11394)
Browse files Browse the repository at this point in the history
adapted embeddedcache.go to store downloaded bloom blocks on local
filesystem.

**What this PR does / why we need it**:
This cache will be used by bloom-gateway to not download the blocks each
time.

In the cache we store the objects(`cachedBlock` struct) that contain: 
1. `blockDirectory`: the `path` to the directory where the block was
extracted
2. `activeQueriers`: thread-safe counter of active block users. This
field is important because we do not want the directory to be removed
while it's in use.
3. rest fields are needed for service needs

When the downloadingWorker receives this entity, it increases the
counter `activeQueriers` and creates blockQuerier wrapper that has
`Close` function that will decrease the counter once blockQuerier is not
needed anymore.

When the cache entry is being removed from the cache, the cache calls
the function `removeDirectoryAsync` which asynchronously removes the
block's folder. This function checks every `Xms` if there are still
active block queriers and once `activeQueriers` count is `0` the folder
will be removed.
Also, there is a timeout in this function, and once the timeout is
reached, the folder will be force removed.

**Special notes for your reviewer**:
* If the cache is disabled, then the blocks will be downloaded each time
the block is requested.
* If the cache is used, the folder will be deleted by embeddedCache when
it reaches memory size limit or items count limit. Otherwise, the
block's folder will be deleted when `Close` function is called.


**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)

---------

Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko authored Dec 13, 2023
1 parent b180788 commit 9b69190
Show file tree
Hide file tree
Showing 13 changed files with 686 additions and 99 deletions.
23 changes: 23 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2286,6 +2286,29 @@ bloom_shipper:
# the tasks above this limit will fail an error.
# CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant
[max_tasks_enqueued_per_tenant: <int> | default = 10000]

blocks_cache:
# Whether embedded cache is enabled.
# CLI flag: -blocks-cache.enabled
[enabled: <boolean> | default = false]

# Maximum memory size of the cache in MB.
# CLI flag: -blocks-cache.max-size-mb
[max_size_mb: <int> | default = 100]

# Maximum number of entries in the cache.
# CLI flag: -blocks-cache.max-size-items
[max_size_items: <int> | default = 0]

# The time to live for items in the cache before they get purged.
# CLI flag: -blocks-cache.ttl
[ttl: <duration> | default = 0s]

# During this period the process waits until the directory becomes not used
# and only after this it will be deleted. If the timeout is reached, the
# directory is force deleted.
# CLI flag: -blocks-cache.remove-directory-graceful-period
[remove_directory_graceful_period: <duration> | default = 5m]
```
### chunk_store_config
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ require (
golang.org/x/text v0.13.0
google.golang.org/protobuf v1.31.0
k8s.io/apimachinery v0.28.1
k8s.io/utils v0.0.0-20230711102312-30195339c3c7
)

require (
Expand Down Expand Up @@ -328,7 +329,6 @@ require (
k8s.io/client-go v0.28.1 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
rsc.io/binaryregexp v0.2.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
VolumeResultCache = "volume-result"
WriteDedupeCache = "write-dedupe"
BloomFilterCache = "bloom-filter"
BloomBlocksCache = "bloom-blocks"
)

// NewContext creates a new statistics context
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f
cfg.Memcache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f)
cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix+"embedded-cache.", description, f)
f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.")
f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.")
f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.")
Expand Down
48 changes: 26 additions & 22 deletions pkg/storage/chunk/cache/embeddedcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ type EmbeddedCache[K comparable, V any] struct {
memoryBytes prometheus.Gauge
}

type cacheEntry[K comparable, V any] struct {
type Entry[K comparable, V any] struct {
updated time.Time
key K
value V
Key K
Value V
}

// EmbeddedCacheConfig represents in-process embedded cache config.
Expand All @@ -77,17 +77,21 @@ type EmbeddedCacheConfig struct {
}

func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, prefix+"embedded-cache.enabled", false, description+"Whether embedded cache is enabled.")
f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.")
f.IntVar(&cfg.MaxSizeItems, prefix+"embedded-cache.max-size-items", 0, description+"Maximum number of entries in the cache.")
f.DurationVar(&cfg.TTL, prefix+"embedded-cache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.")
cfg.RegisterFlagsWithPrefixAndDefaults(prefix, description, f, time.Hour)
}

func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefixAndDefaults(prefix, description string, f *flag.FlagSet, defaultTTL time.Duration) {
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, description+"Whether embedded cache is enabled.")
f.Int64Var(&cfg.MaxSizeMB, prefix+"max-size-mb", 100, description+"Maximum memory size of the cache in MB.")
f.IntVar(&cfg.MaxSizeItems, prefix+"max-size-items", 0, description+"Maximum number of entries in the cache.")
f.DurationVar(&cfg.TTL, prefix+"ttl", defaultTTL, description+"The time to live for items in the cache before they get purged.")
}

func (cfg *EmbeddedCacheConfig) IsEnabled() bool {
return cfg.Enabled
}

type cacheEntrySizeCalculator[K comparable, V any] func(entry *cacheEntry[K, V]) uint64
type cacheEntrySizeCalculator[K comparable, V any] func(entry *Entry[K, V]) uint64

// NewEmbeddedCache returns a new initialised EmbeddedCache where the key is a string and the value is a slice of bytes.
func NewEmbeddedCache(name string, cfg EmbeddedCacheConfig, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) *EmbeddedCache[string, []byte] {
Expand Down Expand Up @@ -191,7 +195,7 @@ func (c *EmbeddedCache[K, V]) pruneExpiredItems(ttl time.Duration) {
defer c.lock.Unlock()

for k, v := range c.entries {
entry := v.Value.(*cacheEntry[K, V])
entry := v.Value.(*Entry[K, V])
if time.Since(entry.updated) > ttl {
c.remove(k, v, expiredReason)
}
Expand Down Expand Up @@ -244,10 +248,10 @@ func (c *EmbeddedCache[K, V]) GetCacheType() stats.CacheType {
}

func (c *EmbeddedCache[K, V]) remove(key K, element *list.Element, reason string) {
entry := c.lru.Remove(element).(*cacheEntry[K, V])
entry := c.lru.Remove(element).(*Entry[K, V])
delete(c.entries, key)
if c.onEntryRemoved != nil {
c.onEntryRemoved(entry.key, entry.value)
c.onEntryRemoved(entry.Key, entry.Value)
}
c.currSizeBytes -= c.cacheEntrySizeCalculator(entry)
c.entriesCurrent.Dec()
Expand All @@ -262,10 +266,10 @@ func (c *EmbeddedCache[K, V]) put(key K, value V) {
c.remove(key, element, replacedReason)
}

entry := &cacheEntry[K, V]{
entry := &Entry[K, V]{
updated: time.Now(),
key: key,
value: value,
Key: key,
Value: value,
}
entrySz := c.cacheEntrySizeCalculator(entry)

Expand All @@ -285,8 +289,8 @@ func (c *EmbeddedCache[K, V]) put(key K, value V) {
if lastElement == nil {
break
}
entryToRemove := lastElement.Value.(*cacheEntry[K, V])
c.remove(entryToRemove.key, lastElement, fullReason)
entryToRemove := lastElement.Value.(*Entry[K, V])
c.remove(entryToRemove.Key, lastElement, fullReason)
}

// Finally, we have space to add the item.
Expand All @@ -306,17 +310,17 @@ func (c *EmbeddedCache[K, V]) Get(_ context.Context, key K) (V, bool) {

element, ok := c.entries[key]
if ok {
entry := element.Value.(*cacheEntry[K, V])
return entry.value, true
entry := element.Value.(*Entry[K, V])
return entry.Value, true
}
var empty V
return empty, false
}

func sizeOf(item *cacheEntry[string, []byte]) uint64 {
return uint64(int(unsafe.Sizeof(*item)) + // size of cacheEntry
len(item.key) + // size of key
cap(item.value) + // size of value
func sizeOf(item *Entry[string, []byte]) uint64 {
return uint64(int(unsafe.Sizeof(*item)) + // size of Entry
len(item.Key) + // size of Key
cap(item.Value) + // size of Value
elementSize + // size of the element in linked list
elementPrtSize) // size of the pointer to an element in the map
}
16 changes: 8 additions & 8 deletions pkg/storage/chunk/cache/embeddedcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ func TestEmbeddedCacheEviction(t *testing.T) {
// compute value size such that 10 entries account to exactly 1MB.
// adding one more entry to the cache would result in eviction when MaxSizeMB is configured to a value of 1.
// value cap = target size of each entry (0.1MB) - size of cache entry with empty value.
valueCap := (1e6 / cnt) - sizeOf(&cacheEntry[string, []byte]{
key: "00",
valueCap := (1e6 / cnt) - sizeOf(&Entry[string, []byte]{
Key: "00",
})

itemTemplate := &cacheEntry[string, []byte]{
key: "00",
value: make([]byte, 0, valueCap),
itemTemplate := &Entry[string, []byte]{
Key: "00",
Value: make([]byte, 0, valueCap),
}

tests := []struct {
Expand Down Expand Up @@ -176,9 +176,9 @@ func TestEmbeddedCacheExpiry(t *testing.T) {
key1, key2, key3, key4 := "01", "02", "03", "04"
data1, data2, data3, data4 := genBytes(32), genBytes(64), genBytes(128), genBytes(32)

memorySz := sizeOf(&cacheEntry[string, []byte]{key: key1, value: data1}) +
sizeOf(&cacheEntry[string, []byte]{key: key2, value: data2}) +
sizeOf(&cacheEntry[string, []byte]{key: key3, value: data3})
memorySz := sizeOf(&Entry[string, []byte]{Key: key1, Value: data1}) +
sizeOf(&Entry[string, []byte]{Key: key2, Value: data2}) +
sizeOf(&Entry[string, []byte]{Key: key3, Value: data3})

cfg := EmbeddedCacheConfig{
MaxSizeItems: 3,
Expand Down
Loading

0 comments on commit 9b69190

Please sign in to comment.