From be54b1efc8727eb3a12df8cacd8300201c76527a Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 14 Mar 2024 13:09:22 +0100 Subject: [PATCH] Rename directory upon eviction Signed-off-by: Christian Haudum --- .../stores/shipper/bloomshipper/cache.go | 84 ++++++++++++------- .../stores/shipper/bloomshipper/cache_test.go | 21 +++-- .../bloomshipper/compress_utils_test.go | 16 +++- 3 files changed, 80 insertions(+), 41 deletions(-) diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index c7e5b80fd3c0b..63d7bd3e2010f 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -94,12 +94,12 @@ func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uin // NewBlockDirectory creates a new BlockDirectory. Must exist on disk. func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirectory { bd := BlockDirectory{ - BlockRef: ref, - Path: path, - refCount: atomic.NewInt32(0), - removeDirectoryTimeout: 5 * time.Second, - activeQueriersCheckInterval: 100 * time.Millisecond, - logger: logger, + BlockRef: ref, + Path: path, + refCount: atomic.NewInt32(0), + deleteTimeout: 5 * time.Second, + checkInterval: 50 * time.Millisecond, + logger: logger, } if err := bd.resolveSize(); err != nil { panic(err) @@ -111,12 +111,12 @@ func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirect // It maintains a counter for currently active readers. type BlockDirectory struct { BlockRef - Path string - refCount *atomic.Int32 - removeDirectoryTimeout time.Duration - activeQueriersCheckInterval time.Duration - size int64 - logger log.Logger + Path string + refCount *atomic.Int32 + deleteTimeout time.Duration + checkInterval time.Duration + size int64 + logger log.Logger } // Convenience function to create a new block from a directory. @@ -175,31 +175,53 @@ const defaultActiveQueriersCheckInterval = 100 * time.Millisecond // removeBlockDirectory is called by the cache when an item is evicted // The cache key and the cache value are passed to this function. -// The function needs to be synchronous, because otherwise we could get a cache -// race condition where the item is already evicted from the cache, but the -// underlying directory isn't. +// This function does not immediately remove the block directory, but only +// renames it, which allows that existing readers can still used it. Once the +// reader count is down to 0 the moved directory is deleted. func removeBlockDirectory(entry *cache.Entry[string, BlockDirectory]) { b := entry.Value - timeout := time.After(b.removeDirectoryTimeout) - ticker := time.NewTicker(b.activeQueriersCheckInterval) - defer ticker.Stop() + // Shortcut: Remove directory immediately if there are no readers accessing the directory. + if b.refCount.Load() == 0 { + if err := os.RemoveAll(b.Path); err != nil { + level.Error(b.logger).Log("msg", "error deleting block directory", "path", b.Path, "err", err) + } + return + } + + // Otherwise, rename the current block directory. + // Existing readers will still be able to access the files via their inodes. + newPath := b.Path + "-removed" + if err := os.Rename(b.Path, newPath); err != nil { + level.Error(b.logger).Log("msg", "failed to move block directory", "err", err) + return + } - for { - select { - case <-ticker.C: - if b.refCount.Load() == 0 { - if err := os.RemoveAll(b.Path); err != nil { - level.Error(b.logger).Log("msg", "error deleting block directory", "err", err) + // NB(chaudum): If a single goroutine for each directory turns out to be a + // problem then run a a single goroutine cleanup job instead. + go func(bd BlockDirectory, path string) { + timeout := time.After(bd.deleteTimeout) + ticker := time.NewTicker(bd.checkInterval) + defer ticker.Stop() + + start := time.Now() + for { + select { + case <-ticker.C: + if b.refCount.Load() == 0 { + if err := os.RemoveAll(path); err != nil { + level.Error(b.logger).Log("msg", "error deleting block directory", "path", path, "err", err) + } + level.Debug(b.logger).Log("msg", "deleted block directory", "after", time.Since(start)) + return + } + case <-timeout: + level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", bd.deleteTimeout) + if err := os.RemoveAll(path); err != nil { + level.Error(b.logger).Log("msg", "error force deleting block directory", "path", path, "err", err) } return } - case <-timeout: - level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout) - if err := os.RemoveAll(b.Path); err != nil { - level.Error(b.logger).Log("msg", "error force deleting block directory", "err", err) - } - return } - } + }(b, newPath) } diff --git a/pkg/storage/stores/shipper/bloomshipper/cache_test.go b/pkg/storage/stores/shipper/bloomshipper/cache_test.go index dbcb797e92c44..74904a29b84eb 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache_test.go @@ -83,11 +83,11 @@ func TestBlockDirectory_Cleanup(t *testing.T) { require.DirExists(t, extractedBlockDirectory) blockDir := BlockDirectory{ - Path: extractedBlockDirectory, - removeDirectoryTimeout: timeout, - activeQueriersCheckInterval: checkInterval, - logger: log.NewNopLogger(), - refCount: atomic.NewInt32(0), + Path: extractedBlockDirectory, + deleteTimeout: timeout, + checkInterval: checkInterval, + logger: log.NewNopLogger(), + refCount: atomic.NewInt32(0), } // acquire directory blockDir.refCount.Inc() @@ -96,7 +96,14 @@ func TestBlockDirectory_Cleanup(t *testing.T) { Key: blockDir.Path, Value: blockDir, } - go removeBlockDirectory(e) + removeBlockDirectory(e) + + // old block dir does not exist any more + require.NoDirExists(t, extractedBlockDirectory) + + // has been renamed + newPath := extractedBlockDirectory + "-removed" + require.DirExists(t, newPath) if tc.releaseQuerier { // release directory @@ -105,7 +112,7 @@ func TestBlockDirectory_Cleanup(t *testing.T) { // ensure directory does not exist any more require.Eventually(t, func() bool { - return directoryDoesNotExist(extractedBlockDirectory) + return !DirExists(newPath) }, tc.expectDirectoryToBeDeletedWithin, 10*time.Millisecond) }) } diff --git a/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go b/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go index 11a6afb21af48..58a691302c668 100644 --- a/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go @@ -2,6 +2,7 @@ package bloomshipper import ( "bytes" + "fmt" "io" "os" "path/filepath" @@ -13,9 +14,18 @@ import ( v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) -func directoryDoesNotExist(path string) bool { - _, err := os.Lstat(path) - return err != nil +func DirExists(path string) bool { + info, err := os.Lstat(path) + if err != nil { + if os.IsNotExist(err) { + return false + } + panic(fmt.Sprintf("error running os.Lstat(%q): %s", path, err)) + } + if !info.IsDir() { + panic(fmt.Sprintf("%q is not a directory", path)) + } + return true } const testArchiveFileName = "test-block-archive"