Skip to content

Commit

Permalink
FetchBlocks returns CloseableBlockQuerier
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 6, 2024
1 parent f43395f commit b673afb
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 65 deletions.
23 changes: 14 additions & 9 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

type ClosableBlockQuerier struct {
*v1.BlockQuerier
Close func() error
Close func()
}

func NewBlocksCache(config config.Config, reg prometheus.Registerer, logger log.Logger) *cache.EmbeddedCache[string, BlockDirectory] {
Expand All @@ -46,7 +46,7 @@ func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirect
return BlockDirectory{
BlockRef: ref,
Path: path,
activeQueriers: atomic.NewInt32(0),
refCount: atomic.NewInt32(0),
removeDirectoryTimeout: time.Minute,
logger: logger,
activeQueriersCheckInterval: defaultActiveQueriersCheckInterval,
Expand All @@ -59,7 +59,7 @@ type BlockDirectory struct {
BlockRef
Path string
removeDirectoryTimeout time.Duration
activeQueriers *atomic.Int32
refCount *atomic.Int32
logger log.Logger
activeQueriersCheckInterval time.Duration
}
Expand All @@ -68,17 +68,22 @@ func (b BlockDirectory) Block() *v1.Block {
return v1.NewBlock(v1.NewDirectoryBlockReader(b.Path))
}

func (b BlockDirectory) Acquire() {
_ = b.refCount.Inc()
}

func (b BlockDirectory) Release() {
_ = b.refCount.Dec()
}

// BlockQuerier returns a new block querier from the directory.
// It increments the counter of active queriers for this directory.
// The counter is decreased when the returned querier is closed.
func (b BlockDirectory) BlockQuerier() *ClosableBlockQuerier {
b.activeQueriers.Inc()
b.Acquire()
return &ClosableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(b.Block()),
Close: func() error {
_ = b.activeQueriers.Dec()
return nil
},
Close: b.Release,
}
}

Expand All @@ -92,7 +97,7 @@ func (b *BlockDirectory) removeDirectoryAsync() {
for {
select {
case <-ticker.C:
if b.activeQueriers.Load() == 0 {
if b.refCount.Load() == 0 {
err := deleteFolder(b.Path)
if err == nil {
return
Expand Down
60 changes: 29 additions & 31 deletions pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bloomshipper

import (
"os"
"testing"
"time"

Expand All @@ -9,25 +10,23 @@ import (
"go.uber.org/atomic"
)

func TestBlockDirectory_Cleanup(t *testing.T) {
checkInterval := 50 * time.Millisecond
timeout := 200 * time.Millisecond

func Test_CachedBlock(t *testing.T) {
tests := map[string]struct {
releaseQuerier bool
expectDirectoryToBeDeletedWithin time.Duration
}{
"expect directory to be removed once all queriers are released": {
releaseQuerier: true,
expectDirectoryToBeDeletedWithin: 2 * checkInterval,
"expected block directory to be removed once all queriers are released": {
releaseQuerier: true,
// four times grater than activeQueriersCheckInterval
expectDirectoryToBeDeletedWithin: 200 * time.Millisecond,
},
"expect directory to be force removed after timeout": {
releaseQuerier: false,
expectDirectoryToBeDeletedWithin: 2 * timeout,
"expected block directory to be force removed after timeout": {
releaseQuerier: false,
// four times grater than removeDirectoryTimeout
expectDirectoryToBeDeletedWithin: 2 * time.Second,
},
}
for name, tc := range tests {
tc := tc
for name, testData := range tests {
t.Run(name, func(t *testing.T) {
extractedBlockDirectory := t.TempDir()
blockFilePath, _, _, _ := createBlockArchive(t)
Expand All @@ -37,25 +36,24 @@ func TestBlockDirectory_Cleanup(t *testing.T) {

cached := BlockDirectory{
Path: extractedBlockDirectory,
removeDirectoryTimeout: timeout,
activeQueriersCheckInterval: checkInterval,
logger: log.NewNopLogger(),
activeQueriers: atomic.NewInt32(0),
removeDirectoryTimeout: 500 * time.Millisecond,
activeQueriersCheckInterval: 50 * time.Millisecond,
logger: log.NewLogfmtLogger(os.Stderr),
refCount: atomic.NewInt32(1),
}
// acquire directory
cached.activeQueriers.Inc()
// start cleanup goroutine
cached.removeDirectoryAsync()
//ensure directory exists
require.Never(t, func() bool {
return directoryDoesNotExist(extractedBlockDirectory)
}, 200*time.Millisecond, 50*time.Millisecond)

if tc.releaseQuerier {
// release directory
cached.activeQueriers.Dec()
if testData.releaseQuerier {
cached.refCount.Dec()
}

// ensure directory does not exist any more
//ensure directory does not exist
require.Eventually(t, func() bool {
return directoryDoesNotExist(extractedBlockDirectory)
}, tc.expectDirectoryToBeDeletedWithin, 10*time.Millisecond)
}, testData.expectDirectoryToBeDeletedWithin, 50*time.Millisecond)
})
}
}
Expand All @@ -66,15 +64,15 @@ func Test_ClosableBlockQuerier(t *testing.T) {
err := extractArchive(blockFilePath, extractedBlockDirectory)
require.NoError(t, err)

cached := BlockDirectory{
blockDir := BlockDirectory{
Path: extractedBlockDirectory,
removeDirectoryTimeout: 100 * time.Millisecond,
activeQueriers: atomic.NewInt32(0),
refCount: atomic.NewInt32(0),
}

querier := cached.BlockQuerier()
require.Equal(t, int32(1), cached.activeQueriers.Load())
require.NoError(t, querier.Close())
require.Equal(t, int32(0), cached.activeQueriers.Load())
querier := blockDir.BlockQuerier()
require.Equal(t, int32(1), blockDir.refCount.Load())
querier.Close()
require.Equal(t, int32(0), blockDir.refCount.Load())

}
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type metrics struct{}

type fetcher interface {
FetchMetas(ctx context.Context, refs []MetaRef) ([]Meta, error)
FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error)
FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error)
Close()
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func (f *Fetcher) writeBackMetas(ctx context.Context, metas []Meta) error {
return f.metasCache.Store(ctx, keys, data)
}

func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) {
func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error) {
n := len(refs)

responses := make(chan downloadResponse[BlockDirectory], n)
Expand All @@ -140,13 +140,13 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDire
})
}

results := make([]BlockDirectory, len(refs))
results := make([]*ClosableBlockQuerier, n)
for i := 0; i < n; i++ {
select {
case err := <-errors:
return results, err
case res := <-responses:
results[res.idx] = res.item
results[res.idx] = res.item.BlockQuerier()
}
}

Expand Down
26 changes: 9 additions & 17 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,36 +58,28 @@ func (s *Shipper) GetBlockRefs(ctx context.Context, tenantID string, interval In
return blockRefs, nil
}

func (s *Shipper) ForEach(ctx context.Context, _ string, blocks []BlockRef, callback ForEachBlockCallback) error {
blockDirs, err := s.store.FetchBlocks(ctx, blocks)
func (s *Shipper) ForEach(ctx context.Context, _ string, refs []BlockRef, callback ForEachBlockCallback) error {
bqs, err := s.store.FetchBlocks(ctx, refs)

if err != nil {
return err
}

if len(blockDirs) != len(blocks) {
return fmt.Errorf("number of responses (%d) does not match number of requests (%d)", len(blockDirs), len(blocks))
if len(bqs) != len(refs) {
return fmt.Errorf("number of respones (%d) does not match number of requests (%d)", len(bqs), len(refs))
}

for i := range blocks {
if blockDirs[i].BlockRef != blocks[i] {
return fmt.Errorf("invalid order of responses: expected: %v, got: %v", blocks[i], blockDirs[i].BlockRef)
}
err := runCallback(callback, blockDirs[i].BlockQuerier(), blockDirs[i].BlockRef.Bounds)
for i := range bqs {
err := callback(bqs[i].BlockQuerier, refs[i].Bounds)
// close querier to decrement ref count
bqs[i].Close()
if err != nil {
return err
}
}
return nil
}

func runCallback(callback ForEachBlockCallback, bq *ClosableBlockQuerier, bounds v1.FingerprintBounds) error {
defer func(b *ClosableBlockQuerier) {
_ = b.Close()
}(bq)

return callback(bq.BlockQuerier, bounds)
}

func (s *Shipper) Stop() {
s.store.Stop()
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
type Store interface {
ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error)
FetchMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error)
FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error)
FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error)
Fetcher(ts model.Time) *Fetcher
Stop()
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (b *bloomStoreEntry) FetchMetas(ctx context.Context, params MetaSearchParam
}

// FetchBlocks implements Store.
func (b *bloomStoreEntry) FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) {
func (b *bloomStoreEntry) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error) {
return b.fetcher.FetchBlocks(ctx, refs)
}

Expand Down Expand Up @@ -273,7 +273,7 @@ func (b *BloomStore) FetchMetas(ctx context.Context, params MetaSearchParams) ([
}

// FetchBlocks implements Store.
func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]BlockDirectory, error) {
func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*ClosableBlockQuerier, error) {

var refs [][]BlockRef
var fetchers []*Fetcher
Expand All @@ -298,7 +298,7 @@ func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]Bloc
}
}

results := make([]BlockDirectory, 0, len(blocks))
results := make([]*ClosableBlockQuerier, 0, len(blocks))
for i := range fetchers {
res, err := fetchers[i].FetchBlocks(ctx, refs[i])
results = append(results, res...)
Expand Down

0 comments on commit b673afb

Please sign in to comment.