diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 40e98748356eb..898ca025ee97d 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2286,6 +2286,29 @@ bloom_shipper: # the tasks above this limit will fail an error. # CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant [max_tasks_enqueued_per_tenant: | default = 10000] + + blocks_cache: + # Whether embedded cache is enabled. + # CLI flag: -blocks-cache.enabled + [enabled: | default = false] + + # Maximum memory size of the cache in MB. + # CLI flag: -blocks-cache.max-size-mb + [max_size_mb: | default = 100] + + # Maximum number of entries in the cache. + # CLI flag: -blocks-cache.max-size-items + [max_size_items: | default = 0] + + # The time to live for items in the cache before they get purged. + # CLI flag: -blocks-cache.ttl + [ttl: | default = 0s] + + # During this period the process waits until the directory becomes not used + # and only after this it will be deleted. If the timeout is reached, the + # directory is force deleted. + # CLI flag: -blocks-cache.remove-directory-graceful-period + [remove_directory_graceful_period: | default = 5m] ``` ### chunk_store_config diff --git a/go.mod b/go.mod index 78ddccab4b499..aadc1fbbca84f 100644 --- a/go.mod +++ b/go.mod @@ -141,6 +141,7 @@ require ( golang.org/x/text v0.13.0 google.golang.org/protobuf v1.31.0 k8s.io/apimachinery v0.28.1 + k8s.io/utils v0.0.0-20230711102312-30195339c3c7 ) require ( @@ -328,7 +329,6 @@ require ( k8s.io/client-go v0.28.1 // indirect k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect - k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect rsc.io/binaryregexp v0.2.0 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 597da62805bb6..33517d6cce905 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -62,6 +62,7 @@ const ( VolumeResultCache = "volume-result" WriteDedupeCache = "write-dedupe" BloomFilterCache = "bloom-filter" + BloomBlocksCache = "bloom-blocks" ) // NewContext creates a new statistics context diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index f651b252cdaab..870d7c19e5c7c 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -51,7 +51,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.Memcache.RegisterFlagsWithPrefix(prefix, description, f) cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) - cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix+"embedded-cache.", description, f) f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index b27d6a903f23e..871c1ef4e1e78 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -58,10 +58,10 @@ type EmbeddedCache[K comparable, V any] struct { memoryBytes prometheus.Gauge } -type cacheEntry[K comparable, V any] struct { +type Entry[K comparable, V any] struct { updated time.Time - key K - value V + Key K + Value V } // EmbeddedCacheConfig represents in-process embedded cache config. @@ -77,17 +77,21 @@ type EmbeddedCacheConfig struct { } func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - f.BoolVar(&cfg.Enabled, prefix+"embedded-cache.enabled", false, description+"Whether embedded cache is enabled.") - f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.") - f.IntVar(&cfg.MaxSizeItems, prefix+"embedded-cache.max-size-items", 0, description+"Maximum number of entries in the cache.") - f.DurationVar(&cfg.TTL, prefix+"embedded-cache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") + cfg.RegisterFlagsWithPrefixAndDefaults(prefix, description, f, time.Hour) +} + +func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefixAndDefaults(prefix, description string, f *flag.FlagSet, defaultTTL time.Duration) { + f.BoolVar(&cfg.Enabled, prefix+"enabled", false, description+"Whether embedded cache is enabled.") + f.Int64Var(&cfg.MaxSizeMB, prefix+"max-size-mb", 100, description+"Maximum memory size of the cache in MB.") + f.IntVar(&cfg.MaxSizeItems, prefix+"max-size-items", 0, description+"Maximum number of entries in the cache.") + f.DurationVar(&cfg.TTL, prefix+"ttl", defaultTTL, description+"The time to live for items in the cache before they get purged.") } func (cfg *EmbeddedCacheConfig) IsEnabled() bool { return cfg.Enabled } -type cacheEntrySizeCalculator[K comparable, V any] func(entry *cacheEntry[K, V]) uint64 +type cacheEntrySizeCalculator[K comparable, V any] func(entry *Entry[K, V]) uint64 // NewEmbeddedCache returns a new initialised EmbeddedCache where the key is a string and the value is a slice of bytes. func NewEmbeddedCache(name string, cfg EmbeddedCacheConfig, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) *EmbeddedCache[string, []byte] { @@ -191,7 +195,7 @@ func (c *EmbeddedCache[K, V]) pruneExpiredItems(ttl time.Duration) { defer c.lock.Unlock() for k, v := range c.entries { - entry := v.Value.(*cacheEntry[K, V]) + entry := v.Value.(*Entry[K, V]) if time.Since(entry.updated) > ttl { c.remove(k, v, expiredReason) } @@ -244,10 +248,10 @@ func (c *EmbeddedCache[K, V]) GetCacheType() stats.CacheType { } func (c *EmbeddedCache[K, V]) remove(key K, element *list.Element, reason string) { - entry := c.lru.Remove(element).(*cacheEntry[K, V]) + entry := c.lru.Remove(element).(*Entry[K, V]) delete(c.entries, key) if c.onEntryRemoved != nil { - c.onEntryRemoved(entry.key, entry.value) + c.onEntryRemoved(entry.Key, entry.Value) } c.currSizeBytes -= c.cacheEntrySizeCalculator(entry) c.entriesCurrent.Dec() @@ -262,10 +266,10 @@ func (c *EmbeddedCache[K, V]) put(key K, value V) { c.remove(key, element, replacedReason) } - entry := &cacheEntry[K, V]{ + entry := &Entry[K, V]{ updated: time.Now(), - key: key, - value: value, + Key: key, + Value: value, } entrySz := c.cacheEntrySizeCalculator(entry) @@ -285,8 +289,8 @@ func (c *EmbeddedCache[K, V]) put(key K, value V) { if lastElement == nil { break } - entryToRemove := lastElement.Value.(*cacheEntry[K, V]) - c.remove(entryToRemove.key, lastElement, fullReason) + entryToRemove := lastElement.Value.(*Entry[K, V]) + c.remove(entryToRemove.Key, lastElement, fullReason) } // Finally, we have space to add the item. @@ -306,17 +310,17 @@ func (c *EmbeddedCache[K, V]) Get(_ context.Context, key K) (V, bool) { element, ok := c.entries[key] if ok { - entry := element.Value.(*cacheEntry[K, V]) - return entry.value, true + entry := element.Value.(*Entry[K, V]) + return entry.Value, true } var empty V return empty, false } -func sizeOf(item *cacheEntry[string, []byte]) uint64 { - return uint64(int(unsafe.Sizeof(*item)) + // size of cacheEntry - len(item.key) + // size of key - cap(item.value) + // size of value +func sizeOf(item *Entry[string, []byte]) uint64 { + return uint64(int(unsafe.Sizeof(*item)) + // size of Entry + len(item.Key) + // size of Key + cap(item.Value) + // size of Value elementSize + // size of the element in linked list elementPrtSize) // size of the pointer to an element in the map } diff --git a/pkg/storage/chunk/cache/embeddedcache_test.go b/pkg/storage/chunk/cache/embeddedcache_test.go index b318e0f6b5a75..473c1b8e83a09 100644 --- a/pkg/storage/chunk/cache/embeddedcache_test.go +++ b/pkg/storage/chunk/cache/embeddedcache_test.go @@ -23,13 +23,13 @@ func TestEmbeddedCacheEviction(t *testing.T) { // compute value size such that 10 entries account to exactly 1MB. // adding one more entry to the cache would result in eviction when MaxSizeMB is configured to a value of 1. // value cap = target size of each entry (0.1MB) - size of cache entry with empty value. - valueCap := (1e6 / cnt) - sizeOf(&cacheEntry[string, []byte]{ - key: "00", + valueCap := (1e6 / cnt) - sizeOf(&Entry[string, []byte]{ + Key: "00", }) - itemTemplate := &cacheEntry[string, []byte]{ - key: "00", - value: make([]byte, 0, valueCap), + itemTemplate := &Entry[string, []byte]{ + Key: "00", + Value: make([]byte, 0, valueCap), } tests := []struct { @@ -176,9 +176,9 @@ func TestEmbeddedCacheExpiry(t *testing.T) { key1, key2, key3, key4 := "01", "02", "03", "04" data1, data2, data3, data4 := genBytes(32), genBytes(64), genBytes(128), genBytes(32) - memorySz := sizeOf(&cacheEntry[string, []byte]{key: key1, value: data1}) + - sizeOf(&cacheEntry[string, []byte]{key: key2, value: data2}) + - sizeOf(&cacheEntry[string, []byte]{key: key3, value: data3}) + memorySz := sizeOf(&Entry[string, []byte]{Key: key1, Value: data1}) + + sizeOf(&Entry[string, []byte]{Key: key2, Value: data2}) + + sizeOf(&Entry[string, []byte]{Key: key3, Value: data3}) cfg := EmbeddedCacheConfig{ MaxSizeItems: 3, diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go index 610e5a5e4283e..3a5a1ef10a847 100644 --- a/pkg/storage/stores/shipper/bloomshipper/block_downloader.go +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "strconv" "strings" @@ -16,9 +17,13 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" + "k8s.io/utils/keymutex" + "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/queue" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/constants" @@ -27,16 +32,17 @@ import ( type blockDownloader struct { logger log.Logger - workingDirectory string - queueMetrics *queue.Metrics - queue *queue.RequestQueue - blockClient BlockClient + queueMetrics *queue.Metrics + queue *queue.RequestQueue + limits Limits activeUsersService *util.ActiveUsersCleanupService ctx context.Context manager *services.Manager wg sync.WaitGroup + + strategy downloadingStrategy } type queueLimits struct { @@ -63,13 +69,13 @@ func newBlockDownloader(config config.Config, blockClient BlockClient, limits Li return nil, fmt.Errorf("error starting service manager: %w", err) } + strategy := createDownloadingStrategy(config, blockClient, reg, logger) b := &blockDownloader{ ctx: ctx, logger: logger, - workingDirectory: config.WorkingDirectory, queueMetrics: queueMetrics, queue: downloadingQueue, - blockClient: blockClient, + strategy: strategy, activeUsersService: activeUsersService, limits: limits, manager: manager, @@ -130,28 +136,115 @@ func (d *blockDownloader) serveDownloadingTasks(workerID string) { } idx = newIdx - blockPath := task.block.BlockPath - //todo add cache before downloading - level.Debug(logger).Log("msg", "start downloading the block", "block", blockPath) - block, err := d.blockClient.GetBlock(task.ctx, task.block) - if err != nil { - level.Error(logger).Log("msg", "error downloading the block", "block", blockPath, "err", err) - task.ErrCh <- fmt.Errorf("error downloading the block %s : %w", blockPath, err) - continue - } - directory, err := d.extractBlock(&block, time.Now()) + result, err := d.strategy.downloadBlock(task, logger) if err != nil { - level.Error(logger).Log("msg", "error extracting the block", "block", blockPath, "err", err) - task.ErrCh <- fmt.Errorf("error extracting the block %s : %w", blockPath, err) + task.ErrCh <- err continue } - level.Debug(d.logger).Log("msg", "block has been downloaded and extracted", "block", task.block.BlockPath, "directory", directory) - blockQuerier := d.createBlockQuerier(directory) - task.ResultsCh <- blockWithQuerier{ - BlockRef: task.block, - BlockQuerier: blockQuerier, + task.ResultsCh <- result + continue + } +} + +func createDownloadingStrategy(cfg config.Config, blockClient BlockClient, reg prometheus.Registerer, logger log.Logger) downloadingStrategy { + if cfg.BlocksCache.EmbeddedCacheConfig.Enabled { + blocksCache := NewBlocksCache(cfg, reg, logger) + return &cacheDownloadingStrategy{ + config: cfg, + workingDirectory: cfg.WorkingDirectory, + blockClient: blockClient, + blocksCache: blocksCache, + keyMutex: keymutex.NewHashed(cfg.BlocksDownloadingQueue.WorkersCount), } } + return &storageDownloadingStrategy{ + workingDirectory: cfg.WorkingDirectory, + blockClient: blockClient, + } +} + +type downloadingStrategy interface { + downloadBlock(task *BlockDownloadingTask, logger log.Logger) (blockWithQuerier, error) + close() +} + +type cacheDownloadingStrategy struct { + config config.Config + workingDirectory string + blockClient BlockClient + blocksCache *cache.EmbeddedCache[string, *cachedBlock] + keyMutex keymutex.KeyMutex +} + +func (s *cacheDownloadingStrategy) downloadBlock(task *BlockDownloadingTask, logger log.Logger) (blockWithQuerier, error) { + blockPath := task.block.BlockPath + s.keyMutex.LockKey(blockPath) + defer func() { + _ = s.keyMutex.UnlockKey(blockPath) + }() + blockFromCache, exists := s.blocksCache.Get(task.ctx, task.block.BlockPath) + if exists { + return blockWithQuerier{ + BlockRef: task.block, + closableBlockQuerier: newBlockQuerierFromCache(blockFromCache), + }, nil + } + + directory, err := downloadBlockToDirectory(logger, task, s.workingDirectory, s.blockClient) + if err != nil { + return blockWithQuerier{}, err + } + blockFromCache = newCachedBlock(directory, s.config.BlocksCache.RemoveDirectoryGracefulPeriod, logger) + err = s.blocksCache.Store(task.ctx, []string{task.block.BlockPath}, []*cachedBlock{blockFromCache}) + if err != nil { + level.Error(logger).Log("msg", "error storing the block in the cache", "block", blockPath, "err", err) + return blockWithQuerier{}, fmt.Errorf("error storing the block %s in the cache : %w", blockPath, err) + } + return blockWithQuerier{ + BlockRef: task.block, + closableBlockQuerier: newBlockQuerierFromCache(blockFromCache), + }, nil +} + +func (s *cacheDownloadingStrategy) close() { + s.blocksCache.Stop() +} + +type storageDownloadingStrategy struct { + workingDirectory string + blockClient BlockClient +} + +func (s *storageDownloadingStrategy) downloadBlock(task *BlockDownloadingTask, logger log.Logger) (blockWithQuerier, error) { + directory, err := downloadBlockToDirectory(logger, task, s.workingDirectory, s.blockClient) + if err != nil { + return blockWithQuerier{}, err + } + return blockWithQuerier{ + BlockRef: task.block, + closableBlockQuerier: newBlockQuerierFromFS(directory), + }, nil +} + +func (s *storageDownloadingStrategy) close() { + // noop implementation +} + +func downloadBlockToDirectory(logger log.Logger, task *BlockDownloadingTask, workingDirectory string, blockClient BlockClient) (string, error) { + blockPath := task.block.BlockPath + level.Debug(logger).Log("msg", "start downloading the block", "block", blockPath) + block, err := blockClient.GetBlock(task.ctx, task.block) + if err != nil { + level.Error(logger).Log("msg", "error downloading the block", "block", blockPath, "err", err) + return "", fmt.Errorf("error downloading the block %s : %w", blockPath, err) + } + directory, err := extractBlock(&block, time.Now(), workingDirectory, logger) + if err != nil { + level.Error(logger).Log("msg", "error extracting the block", "block", blockPath, "err", err) + return "", fmt.Errorf("error extracting the block %s : %w", blockPath, err) + } + level.Debug(logger).Log("msg", "block has been downloaded and extracted", "block", task.block.BlockPath, "directory", directory) + return directory, nil } func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, references []BlockRef) (chan blockWithQuerier, chan error) { @@ -177,12 +270,12 @@ func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, r type blockWithQuerier struct { BlockRef - *v1.BlockQuerier + *closableBlockQuerier } // extract the files into directory and returns absolute path to this directory. -func (d *blockDownloader) extractBlock(block *LazyBlock, ts time.Time) (string, error) { - workingDirectoryPath := filepath.Join(d.workingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10)) +func extractBlock(block *LazyBlock, ts time.Time, workingDirectory string, logger log.Logger) (string, error) { + workingDirectoryPath := filepath.Join(workingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixNano(), 10)) err := os.MkdirAll(workingDirectoryPath, os.ModePerm) if err != nil { return "", fmt.Errorf("can not create directory to extract the block: %w", err) @@ -192,8 +285,10 @@ func (d *blockDownloader) extractBlock(block *LazyBlock, ts time.Time) (string, return "", fmt.Errorf("error writing data to temp file: %w", err) } defer func() { - os.Remove(archivePath) - // todo log err + err = os.Remove(archivePath) + if err != nil { + level.Error(logger).Log("msg", "error removing temp archive file", "err", err) + } }() err = extractArchive(archivePath, workingDirectoryPath) if err != nil { @@ -202,15 +297,10 @@ func (d *blockDownloader) extractBlock(block *LazyBlock, ts time.Time) (string, return workingDirectoryPath, nil } -func (d *blockDownloader) createBlockQuerier(directory string) *v1.BlockQuerier { - reader := v1.NewDirectoryBlockReader(directory) - block := v1.NewBlock(reader) - return v1.NewBlockQuerier(block) -} - func (d *blockDownloader) stop() { _ = services.StopManagerAndAwaitStopped(d.ctx, d.manager) d.wg.Wait() + d.strategy.close() } func writeDataToTempFile(workingDirectoryPath string, block *LazyBlock) (string, error) { @@ -236,3 +326,108 @@ func extractArchive(archivePath string, workingDirectoryPath string) error { } return v1.UnTarGz(workingDirectoryPath, file) } + +type closableBlockQuerier struct { + *v1.BlockQuerier + Close func() error +} + +func newBlockQuerierFromCache(cached *cachedBlock) *closableBlockQuerier { + cached.activeQueriers.Inc() + return &closableBlockQuerier{ + BlockQuerier: createBlockQuerier(cached.blockDirectory), + Close: func() error { + cached.activeQueriers.Dec() + return nil + }, + } +} + +func newBlockQuerierFromFS(blockDirectory string) *closableBlockQuerier { + return &closableBlockQuerier{ + BlockQuerier: createBlockQuerier(blockDirectory), + Close: func() error { + return deleteFolder(blockDirectory) + }, + } +} + +func createBlockQuerier(directory string) *v1.BlockQuerier { + reader := v1.NewDirectoryBlockReader(directory) + block := v1.NewBlock(reader) + return v1.NewBlockQuerier(block) +} + +func NewBlocksCache(config config.Config, reg prometheus.Registerer, logger log.Logger) *cache.EmbeddedCache[string, *cachedBlock] { + return cache.NewTypedEmbeddedCache[string, *cachedBlock]( + "bloom-blocks-cache", + config.BlocksCache.EmbeddedCacheConfig, + reg, + logger, + stats.BloomBlocksCache, + calculateBlockDirectorySize, + func(key string, value *cachedBlock) { + value.removeDirectoryAsync() + }) +} + +func calculateBlockDirectorySize(entry *cache.Entry[string, *cachedBlock]) uint64 { + value := entry.Value + bloomFileStats, _ := os.Lstat(path.Join(value.blockDirectory, v1.BloomFileName)) + seriesFileStats, _ := os.Lstat(path.Join(value.blockDirectory, v1.SeriesFileName)) + return uint64(bloomFileStats.Size() + seriesFileStats.Size()) +} + +func newCachedBlock(blockDirectory string, removeDirectoryTimeout time.Duration, logger log.Logger) *cachedBlock { + return &cachedBlock{ + blockDirectory: blockDirectory, + removeDirectoryTimeout: removeDirectoryTimeout, + logger: logger, + activeQueriersCheckInterval: defaultActiveQueriersCheckInterval, + } +} + +type cachedBlock struct { + blockDirectory string + removeDirectoryTimeout time.Duration + activeQueriers atomic.Int32 + logger log.Logger + activeQueriersCheckInterval time.Duration +} + +const defaultActiveQueriersCheckInterval = 100 * time.Millisecond + +func (b *cachedBlock) removeDirectoryAsync() { + go func() { + timeout := time.After(b.removeDirectoryTimeout) + ticker := time.NewTicker(b.activeQueriersCheckInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if b.activeQueriers.Load() == 0 { + err := deleteFolder(b.blockDirectory) + if err == nil { + return + } + level.Error(b.logger).Log("msg", "error deleting block directory", "err", err) + } + case <-timeout: + level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout) + err := deleteFolder(b.blockDirectory) + if err == nil { + return + } + level.Error(b.logger).Log("msg", "error force deleting block directory", "err", err) + } + } + }() +} + +func deleteFolder(folderPath string) error { + err := os.RemoveAll(folderPath) + if err != nil { + return fmt.Errorf("error deleting bloom block directory: %w", err) + } + return nil +} diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go index 5bf22f21ccaaa..a28c76c12f785 100644 --- a/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strconv" + "sync" "testing" "time" @@ -15,9 +16,12 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "go.uber.org/atomic" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/validation" ) @@ -67,46 +71,287 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) { require.Equal(t, 0, int(downloader.queue.GetConnectedConsumersMetric())) } +func Test_blockDownloader_downloadBlock(t *testing.T) { + tests := map[string]struct { + cacheEnabled bool + expectedTotalGetBlocksCalls int + }{ + "cache disabled": { + cacheEnabled: false, + expectedTotalGetBlocksCalls: 40, + }, + "cache enabled": { + cacheEnabled: true, + expectedTotalGetBlocksCalls: 20, + }, + } + for name, testData := range tests { + t.Run(name, func(t *testing.T) { + overrides, err := validation.NewOverrides(validation.Limits{BloomGatewayBlocksDownloadingParallelism: 20}, nil) + require.NoError(t, err) + workingDirectory := t.TempDir() + + blockReferences, blockClient := createFakeBlocks(t, 20) + workersCount := 10 + downloader, err := newBlockDownloader(config.Config{ + WorkingDirectory: workingDirectory, + BlocksDownloadingQueue: config.DownloadingQueueConfig{ + WorkersCount: workersCount, + MaxTasksEnqueuedPerTenant: 20, + }, + BlocksCache: config.BlocksCacheConfig{ + EmbeddedCacheConfig: cache.EmbeddedCacheConfig{ + Enabled: testData.cacheEnabled, + MaxSizeItems: 20, + }, + RemoveDirectoryGracefulPeriod: 1 * time.Second, + }, + }, blockClient, overrides, log.NewNopLogger(), prometheus.NewRegistry()) + t.Cleanup(downloader.stop) + require.NoError(t, err) + + blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences) + downloadedBlocks := make(map[string]any, len(blockReferences)) + done := make(chan bool) + go func() { + for i := 0; i < 20; i++ { + block := <-blocksCh + downloadedBlocks[block.BlockPath] = nil + } + done <- true + }() + + select { + case <-time.After(2 * time.Second): + t.Fatalf("test must complete before the timeout") + case err := <-errorsCh: + require.NoError(t, err) + case <-done: + } + require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded") + require.Equal(t, 20, blockClient.getBlockCalls) + + blocksCh, errorsCh = downloader.downloadBlocks(context.Background(), "fake", blockReferences) + downloadedBlocks = make(map[string]any, len(blockReferences)) + done = make(chan bool) + go func() { + for i := 0; i < 20; i++ { + block := <-blocksCh + downloadedBlocks[block.BlockPath] = nil + } + done <- true + }() + + select { + case <-time.After(2 * time.Second): + t.Fatalf("test must complete before the timeout") + case err := <-errorsCh: + require.NoError(t, err) + case <-done: + } + require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded") + require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls) + }) + } +} + +func Test_blockDownloader_downloadBlock_deduplication(t *testing.T) { + tests := map[string]struct { + cacheEnabled bool + expectedTotalGetBlocksCalls int + }{ + "requests to blockClient must be deduplicated by blockPath if cache is enabled": { + cacheEnabled: true, + expectedTotalGetBlocksCalls: 1, + }, + "requests to blockClient must NOT be deduplicated by blockPath if cache is disabled": { + cacheEnabled: false, + expectedTotalGetBlocksCalls: 10, + }, + } + for name, testData := range tests { + t.Run(name, func(t *testing.T) { + + overrides, err := validation.NewOverrides(validation.Limits{BloomGatewayBlocksDownloadingParallelism: 20}, nil) + require.NoError(t, err) + workingDirectory := t.TempDir() + + blockReferences, blockClient := createFakeBlocks(t, 1) + workersCount := 10 + downloader, err := newBlockDownloader(config.Config{ + WorkingDirectory: workingDirectory, + BlocksDownloadingQueue: config.DownloadingQueueConfig{ + WorkersCount: workersCount, + MaxTasksEnqueuedPerTenant: 20, + }, + BlocksCache: config.BlocksCacheConfig{ + EmbeddedCacheConfig: cache.EmbeddedCacheConfig{ + Enabled: testData.cacheEnabled, + MaxSizeItems: 20, + }, + RemoveDirectoryGracefulPeriod: 1 * time.Second, + }, + }, blockClient, overrides, log.NewNopLogger(), prometheus.NewRegistry()) + t.Cleanup(downloader.stop) + require.NoError(t, err) + + blocksDownloadedCount := atomic.Uint32{} + mutex := sync.Mutex{} + multiError := util.MultiError{} + waitGroup := sync.WaitGroup{} + for i := 0; i < 10; i++ { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + blocksCh, errCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences) + var err error + select { + case <-blocksCh: + blocksDownloadedCount.Inc() + case downloaderErr := <-errCh: + err = downloaderErr + case <-time.After(1 * time.Second): + err = fmt.Errorf("timeout in the test waiting for a single block to be downloaded") + } + if err == nil { + return + } + mutex.Lock() + defer mutex.Unlock() + multiError.Add(err) + }() + } + waitGroup.Wait() + + require.NoError(t, multiError.Err()) + require.Equal(t, uint32(10), blocksDownloadedCount.Load()) + require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls) + }) + } +} + +func Test_cachedBlock(t *testing.T) { + tests := map[string]struct { + releaseQuerier bool + expectDirectoryToBeDeletedWithin time.Duration + }{ + "expected block directory to be removed once all queriers are released": { + releaseQuerier: true, + // four times grater than activeQueriersCheckInterval + expectDirectoryToBeDeletedWithin: 200 * time.Millisecond, + }, + "expected block directory to be force removed after timeout": { + releaseQuerier: false, + // four times grater than removeDirectoryTimeout + expectDirectoryToBeDeletedWithin: 2 * time.Second, + }, + } + for name, testData := range tests { + t.Run(name, func(t *testing.T) { + extractedBlockDirectory := t.TempDir() + blockFilePath, _, _ := createBlockArchive(t) + err := extractArchive(blockFilePath, extractedBlockDirectory) + require.NoError(t, err) + require.DirExists(t, extractedBlockDirectory) + + cached := &cachedBlock{ + blockDirectory: extractedBlockDirectory, + removeDirectoryTimeout: 500 * time.Millisecond, + activeQueriersCheckInterval: 50 * time.Millisecond, + logger: log.NewLogfmtLogger(os.Stderr), + } + cached.activeQueriers.Inc() + cached.removeDirectoryAsync() + //ensure directory exists + require.Never(t, func() bool { + return directoryDoesNotExist(extractedBlockDirectory) + }, 200*time.Millisecond, 50*time.Millisecond) + + if testData.releaseQuerier { + cached.activeQueriers.Dec() + } + //ensure directory does not exist + require.Eventually(t, func() bool { + return directoryDoesNotExist(extractedBlockDirectory) + }, testData.expectDirectoryToBeDeletedWithin, 50*time.Millisecond) + }) + } +} + +func Test_closableBlockQuerier(t *testing.T) { + t.Run("cached", func(t *testing.T) { + blockFilePath, _, _ := createBlockArchive(t) + extractedBlockDirectory := t.TempDir() + err := extractArchive(blockFilePath, extractedBlockDirectory) + require.NoError(t, err) + + cached := &cachedBlock{blockDirectory: extractedBlockDirectory, removeDirectoryTimeout: 100 * time.Millisecond} + require.Equal(t, int32(0), cached.activeQueriers.Load()) + querier := newBlockQuerierFromCache(cached) + require.Equal(t, int32(1), cached.activeQueriers.Load()) + require.NoError(t, querier.Close()) + require.Equal(t, int32(0), cached.activeQueriers.Load()) + }) + + t.Run("file system", func(t *testing.T) { + blockFilePath, _, _ := createBlockArchive(t) + extractedBlockDirectory := t.TempDir() + err := extractArchive(blockFilePath, extractedBlockDirectory) + require.NoError(t, err) + + querier := newBlockQuerierFromFS(extractedBlockDirectory) + require.DirExists(t, extractedBlockDirectory) + + require.NoError(t, querier.Close()) + + //ensure directory does not exist + require.Eventually(t, func() bool { + return directoryDoesNotExist(extractedBlockDirectory) + }, 1*time.Second, 100*time.Millisecond) + }) +} + // creates fake blocks and returns map[block-path]Block and mockBlockClient func createFakeBlocks(t *testing.T, count int) ([]BlockRef, *mockBlockClient) { - mockData := make(map[string]Block, count) - mockLazyData := make(map[string]LazyBlock, count) + mockData := make(map[string]blockSupplier, count) refs := make([]BlockRef, 0, count) for i := 0; i < count; i++ { - archive, _, _ := createBlockArchive(t) - block := Block{ - BlockRef: BlockRef{ - BlockPath: fmt.Sprintf("block-path-%d", i), - }, - Data: archive, + archivePath, _, _ := createBlockArchive(t) + _, err := os.OpenFile(archivePath, os.O_RDONLY, 0700) + //ensure file can be opened + require.NoError(t, err) + blockRef := BlockRef{ + BlockPath: fmt.Sprintf("block-path-%d", i), } - lazyBlock := LazyBlock{ - BlockRef: BlockRef{ - BlockPath: fmt.Sprintf("block-path-%d", i), - }, - Data: archive, + mockData[blockRef.BlockPath] = func() LazyBlock { + file, _ := os.OpenFile(archivePath, os.O_RDONLY, 0700) + return LazyBlock{ + BlockRef: blockRef, + Data: file, + } } - mockData[block.BlockPath] = block - mockLazyData[block.BlockPath] = lazyBlock - refs = append(refs, block.BlockRef) + refs = append(refs, blockRef) } - return refs, &mockBlockClient{mockData: mockData, mockLazyData: mockLazyData} + return refs, &mockBlockClient{mockData: mockData} } +type blockSupplier func() LazyBlock + type mockBlockClient struct { responseDelay time.Duration - mockData map[string]Block - mockLazyData map[string]LazyBlock + mockData map[string]blockSupplier + getBlockCalls int } func (m *mockBlockClient) GetBlock(_ context.Context, reference BlockRef) (LazyBlock, error) { + m.getBlockCalls++ time.Sleep(m.responseDelay) - block, exists := m.mockLazyData[reference.BlockPath] + supplier, exists := m.mockData[reference.BlockPath] if exists { - return block, nil + return supplier(), nil } - return block, fmt.Errorf("block %s is not found in mockData", reference.BlockPath) + return LazyBlock{}, fmt.Errorf("block %s is not found in mockData", reference.BlockPath) } func (m *mockBlockClient) PutBlocks(_ context.Context, _ []Block) ([]Block, error) { @@ -118,20 +363,21 @@ func (m *mockBlockClient) DeleteBlocks(_ context.Context, _ []BlockRef) error { } func Test_blockDownloader_extractBlock(t *testing.T) { - blockFile, bloomFileContent, seriesFileContent := createBlockArchive(t) + blockFilePath, bloomFileContent, seriesFileContent := createBlockArchive(t) + blockFile, err := os.OpenFile(blockFilePath, os.O_RDONLY, 0700) + require.NoError(t, err) workingDir := t.TempDir() - downloader := &blockDownloader{workingDirectory: workingDir} ts := time.Now().UTC() block := LazyBlock{ BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"}, Data: blockFile, } - actualPath, err := downloader.extractBlock(&block, ts) + actualPath, err := extractBlock(&block, ts, workingDir, nil) require.NoError(t, err) - expectedPath := filepath.Join(workingDir, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10)) + expectedPath := filepath.Join(workingDir, block.BlockPath, strconv.FormatInt(ts.UnixNano(), 10)) require.Equal(t, expectedPath, actualPath, "expected archive to be extracted to working directory under the same path as blockPath and with timestamp suffix") require.FileExists(t, filepath.Join(expectedPath, v1.BloomFileName)) @@ -146,7 +392,14 @@ func Test_blockDownloader_extractBlock(t *testing.T) { require.Equal(t, seriesFileContent, string(actualSeriesFileContent)) } -func createBlockArchive(t *testing.T) (*os.File, string, string) { +func directoryDoesNotExist(path string) bool { + _, err := os.Lstat(path) + return err != nil +} + +const testArchiveFileName = "test-block-archive" + +func createBlockArchive(t *testing.T) (string, string, string) { dir := t.TempDir() mockBlockDir := filepath.Join(dir, "mock-block-dir") err := os.MkdirAll(mockBlockDir, 0777) @@ -163,13 +416,11 @@ func createBlockArchive(t *testing.T) (*os.File, string, string) { _, err = io.Copy(seriesFile, bytes.NewReader([]byte(seriesFileContent))) require.NoError(t, err) - blockFilePath := filepath.Join(dir, "test-block-archive") + blockFilePath := filepath.Join(dir, testArchiveFileName) file, err := os.OpenFile(blockFilePath, os.O_CREATE|os.O_RDWR, 0700) require.NoError(t, err) err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir)) require.NoError(t, err) - blockFile, err := os.OpenFile(blockFilePath, os.O_RDONLY, 0700) - require.NoError(t, err) - return blockFile, bloomFileContent, seriesFileContent + return blockFilePath, bloomFileContent, seriesFileContent } diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index 748e037ca57c1..fbfe5f7803516 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -5,11 +5,26 @@ import ( "errors" "flag" "strings" + "time" + + "github.com/grafana/loki/pkg/storage/chunk/cache" ) type Config struct { WorkingDirectory string `yaml:"working_directory"` BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"` + BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` +} + +type BlocksCacheConfig struct { + EmbeddedCacheConfig cache.EmbeddedCacheConfig `yaml:",inline"` + RemoveDirectoryGracefulPeriod time.Duration `yaml:"remove_directory_graceful_period"` +} + +func (c *BlocksCacheConfig) RegisterFlagsWithPrefixAndDefaults(prefix string, f *flag.FlagSet) { + c.EmbeddedCacheConfig.RegisterFlagsWithPrefixAndDefaults(prefix, "", f, 0) + f.DurationVar(&c.RemoveDirectoryGracefulPeriod, prefix+"remove-directory-graceful-period", 5*time.Minute, + "During this period the process waits until the directory becomes not used and only after this it will be deleted. If the timeout is reached, the directory is force deleted.") } type DownloadingQueueConfig struct { @@ -25,6 +40,7 @@ func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *fla func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.") c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f) + c.BlocksCache.RegisterFlagsWithPrefixAndDefaults("blocks-cache.", f) } func (c *Config) Validate() error { diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index c04cad433308a..ee0665c4f6c30 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -62,9 +62,9 @@ func (s *Shipper) Fetch(ctx context.Context, tenantID string, blocks []BlockRef, if !ok { return nil } - err := callback(result.BlockQuerier, result.MinFingerprint, result.MaxFingerprint) + err := runCallback(callback, result) if err != nil { - return fmt.Errorf("error running callback function for block %s err: %w", result.BlockPath, err) + return err } case err := <-errorsChannel: if err != nil { @@ -74,6 +74,17 @@ func (s *Shipper) Fetch(ctx context.Context, tenantID string, blocks []BlockRef, } } +func runCallback(callback ForEachBlockCallback, block blockWithQuerier) error { + defer func(result blockWithQuerier) { + _ = result.Close() + }(block) + err := callback(block.closableBlockQuerier.BlockQuerier, block.MinFingerprint, block.MaxFingerprint) + if err != nil { + return fmt.Errorf("error running callback function for block %s err: %w", block.BlockPath, err) + } + return nil +} + func (s *Shipper) ForEachBlock(ctx context.Context, tenantID string, from, through time.Time, fingerprints []uint64, callback ForEachBlockCallback) error { level.Debug(s.logger).Log("msg", "ForEachBlock", "tenant", tenantID, "from", from, "through", through, "fingerprints", len(fingerprints)) diff --git a/vendor/k8s.io/utils/keymutex/hashed.go b/vendor/k8s.io/utils/keymutex/hashed.go new file mode 100644 index 0000000000000..4ddb00867ff7d --- /dev/null +++ b/vendor/k8s.io/utils/keymutex/hashed.go @@ -0,0 +1,58 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keymutex + +import ( + "hash/fnv" + "runtime" + "sync" +) + +// NewHashed returns a new instance of KeyMutex which hashes arbitrary keys to +// a fixed set of locks. `n` specifies number of locks, if n <= 0, we use +// number of cpus. +// Note that because it uses fixed set of locks, different keys may share same +// lock, so it's possible to wait on same lock. +func NewHashed(n int) KeyMutex { + if n <= 0 { + n = runtime.NumCPU() + } + return &hashedKeyMutex{ + mutexes: make([]sync.Mutex, n), + } +} + +type hashedKeyMutex struct { + mutexes []sync.Mutex +} + +// Acquires a lock associated with the specified ID. +func (km *hashedKeyMutex) LockKey(id string) { + km.mutexes[km.hash(id)%uint32(len(km.mutexes))].Lock() +} + +// Releases the lock associated with the specified ID. +func (km *hashedKeyMutex) UnlockKey(id string) error { + km.mutexes[km.hash(id)%uint32(len(km.mutexes))].Unlock() + return nil +} + +func (km *hashedKeyMutex) hash(id string) uint32 { + h := fnv.New32a() + h.Write([]byte(id)) + return h.Sum32() +} diff --git a/vendor/k8s.io/utils/keymutex/keymutex.go b/vendor/k8s.io/utils/keymutex/keymutex.go new file mode 100644 index 0000000000000..89dc022397c72 --- /dev/null +++ b/vendor/k8s.io/utils/keymutex/keymutex.go @@ -0,0 +1,27 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keymutex + +// KeyMutex is a thread-safe interface for acquiring locks on arbitrary strings. +type KeyMutex interface { + // Acquires a lock associated with the specified ID, creates the lock if one doesn't already exist. + LockKey(id string) + + // Releases the lock associated with the specified ID. + // Returns an error if the specified ID doesn't exist. + UnlockKey(id string) error +} diff --git a/vendor/modules.txt b/vendor/modules.txt index bd4bf7d795a33..23e4a1802fb27 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2188,6 +2188,7 @@ k8s.io/utils/clock k8s.io/utils/clock/testing k8s.io/utils/integer k8s.io/utils/internal/third_party/forked/golang/net +k8s.io/utils/keymutex k8s.io/utils/net k8s.io/utils/pointer k8s.io/utils/strings/slices