diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index d249cb68ce569..5360ad766ebf0 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -1,7 +1,6 @@ package bloomshipper import ( - "fmt" "os" "path" "time" @@ -44,16 +43,8 @@ func NewBlocksCache(cfg cache.EmbeddedCacheConfig, reg prometheus.Registerer, lo logger, stats.BloomBlocksCache, calculateBlockDirectorySize, - func(_ string, value BlockDirectory) { - value.removeDirectoryAsync() - }) -} - -func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uint64 { - value := entry.Value - bloomFileStats, _ := os.Lstat(path.Join(value.Path, v1.BloomFileName)) - seriesFileStats, _ := os.Lstat(path.Join(value.Path, v1.SeriesFileName)) - return uint64(bloomFileStats.Size() + seriesFileStats.Size()) + removeBlockDirectory, + ) } func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirectory { @@ -61,9 +52,9 @@ func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirect BlockRef: ref, Path: path, refCount: atomic.NewInt32(0), - removeDirectoryTimeout: time.Minute, + removeDirectoryTimeout: 10 * time.Second, + activeQueriersCheckInterval: 100 * time.Millisecond, logger: logger, - activeQueriersCheckInterval: defaultActiveQueriersCheckInterval, } } @@ -72,25 +63,36 @@ func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirect type BlockDirectory struct { BlockRef Path string - removeDirectoryTimeout time.Duration refCount *atomic.Int32 - logger log.Logger + removeDirectoryTimeout time.Duration activeQueriersCheckInterval time.Duration + logger log.Logger } +// Convenience function to create a new block from a directory. +// Must not be called outside of BlockQuerier(). func (b BlockDirectory) Block() *v1.Block { return v1.NewBlock(v1.NewDirectoryBlockReader(b.Path)) } +// Acquire increases the ref counter on the directory. func (b BlockDirectory) Acquire() { _ = b.refCount.Inc() } +// Release decreases the ref counter on the directory. func (b BlockDirectory) Release() error { _ = b.refCount.Dec() return nil } +func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uint64 { + value := entry.Value + bloomFileStats, _ := os.Lstat(path.Join(value.Path, v1.BloomFileName)) + seriesFileStats, _ := os.Lstat(path.Join(value.Path, v1.SeriesFileName)) + return uint64(bloomFileStats.Size() + seriesFileStats.Size()) +} + // BlockQuerier returns a new block querier from the directory. // It increments the counter of active queriers for this directory. // The counter is decreased when the returned querier is closed. @@ -105,37 +107,31 @@ func (b BlockDirectory) BlockQuerier() *CloseableBlockQuerier { const defaultActiveQueriersCheckInterval = 100 * time.Millisecond -func (b *BlockDirectory) removeDirectoryAsync() { - go func() { - timeout := time.After(b.removeDirectoryTimeout) - ticker := time.NewTicker(b.activeQueriersCheckInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if b.refCount.Load() == 0 { - err := deleteFolder(b.Path) - if err == nil { - return - } +// removeBlockDirectory is called by the cache when an item is evicted +// The cache key and the cache value are passed to this function. +// The function needs to be synchronous, because otherwise we could get a cache +// race condition where the item is already evicted from the cache, but the +// underlying directory isn't. +func removeBlockDirectory(_ string, b BlockDirectory) { + timeout := time.After(b.removeDirectoryTimeout) + ticker := time.NewTicker(b.activeQueriersCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if b.refCount.Load() == 0 { + if err := os.RemoveAll(b.Path); err != nil { level.Error(b.logger).Log("msg", "error deleting block directory", "err", err) } - case <-timeout: - level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout) - err := deleteFolder(b.Path) - if err == nil { - return - } + return + } + case <-timeout: + level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout) + if err := os.RemoveAll(b.Path); err != nil { level.Error(b.logger).Log("msg", "error force deleting block directory", "err", err) } + return } - }() -} - -func deleteFolder(folderPath string) error { - err := os.RemoveAll(folderPath) - if err != nil { - return fmt.Errorf("error deleting bloom block directory: %w", err) } - return nil }