diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 2f9e98f89d9c0..7bdb56755be01 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -1,7 +1,6 @@ package bloomshipper import ( - "fmt" "os" "path" "time" @@ -37,26 +36,18 @@ func NewBlocksCache(config config.Config, reg prometheus.Registerer, logger log. logger, stats.BloomBlocksCache, calculateBlockDirectorySize, - func(_ string, value BlockDirectory) { - value.removeDirectoryAsync() - }) -} - -func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uint64 { - value := entry.Value - bloomFileStats, _ := os.Lstat(path.Join(value.Path, v1.BloomFileName)) - seriesFileStats, _ := os.Lstat(path.Join(value.Path, v1.SeriesFileName)) - return uint64(bloomFileStats.Size() + seriesFileStats.Size()) + removeBlockDirectory, + ) } func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirectory { return BlockDirectory{ BlockRef: ref, Path: path, - activeQueriers: atomic.NewInt32(0), - removeDirectoryTimeout: time.Minute, + refCount: atomic.NewInt32(0), + removeDirectoryTimeout: 10 * time.Second, + activeQueriersCheckInterval: 100 * time.Millisecond, logger: logger, - activeQueriersCheckInterval: defaultActiveQueriersCheckInterval, } } @@ -65,63 +56,72 @@ func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirect type BlockDirectory struct { BlockRef Path string + refCount *atomic.Int32 removeDirectoryTimeout time.Duration - activeQueriers *atomic.Int32 - logger log.Logger activeQueriersCheckInterval time.Duration + logger log.Logger } +// Convenience function to create a new block from a directory. +// Must not be called outside of BlockQuerier(). func (b BlockDirectory) Block() *v1.Block { return v1.NewBlock(v1.NewDirectoryBlockReader(b.Path)) } +// Acquire increases the ref counter on the directory. +func (b BlockDirectory) Acquire() { + _ = b.refCount.Inc() +} + +// Release decreases the ref counter on the directory. +func (b BlockDirectory) Release() error { + _ = b.refCount.Dec() + return nil +} + // BlockQuerier returns a new block querier from the directory. // It increments the counter of active queriers for this directory. // The counter is decreased when the returned querier is closed. func (b BlockDirectory) BlockQuerier() *ClosableBlockQuerier { - b.activeQueriers.Inc() + b.Acquire() return &ClosableBlockQuerier{ BlockQuerier: v1.NewBlockQuerier(b.Block()), - close: func() error { - _ = b.activeQueriers.Dec() - return nil - }, + close: b.Release, } } -const defaultActiveQueriersCheckInterval = 100 * time.Millisecond +func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uint64 { + value := entry.Value + bloomFileStats, _ := os.Lstat(path.Join(value.Path, v1.BloomFileName)) + seriesFileStats, _ := os.Lstat(path.Join(value.Path, v1.SeriesFileName)) + return uint64(bloomFileStats.Size() + seriesFileStats.Size()) +} -func (b *BlockDirectory) removeDirectoryAsync() { - go func() { - timeout := time.After(b.removeDirectoryTimeout) - ticker := time.NewTicker(b.activeQueriersCheckInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if b.activeQueriers.Load() == 0 { - err := deleteFolder(b.Path) - if err == nil { - return - } +// removeBlockDirectory is called by the cache when an item is evicted +// The cache key and the cache value are passed to this function. +// The function needs to be synchronous, because otherwise we could get a cache +// race condition where the item is already evicted from the cache, but the +// underlying directory isn't. +func removeBlockDirectory(_ string, b BlockDirectory) { + timeout := time.After(b.removeDirectoryTimeout) + ticker := time.NewTicker(b.activeQueriersCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if b.refCount.Load() == 0 { + if err := os.RemoveAll(b.Path); err != nil { level.Error(b.logger).Log("msg", "error deleting block directory", "err", err) } - case <-timeout: - level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout) - err := deleteFolder(b.Path) - if err == nil { - return - } + return + } + case <-timeout: + level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout) + if err := os.RemoveAll(b.Path); err != nil { level.Error(b.logger).Log("msg", "error force deleting block directory", "err", err) } + return } - }() -} - -func deleteFolder(folderPath string) error { - err := os.RemoveAll(folderPath) - if err != nil { - return fmt.Errorf("error deleting bloom block directory: %w", err) } - return nil }