Skip to content

Commit

Permalink
refactor(bloomstore): Introduce fetch option for blocks fetcher (graf…
Browse files Browse the repository at this point in the history
…ana#12160)

Requests to the bloom gateway should not wait for blocks to be downloaded into cache first, but should operate on readily available blocks.

This PR introduces fetch options for the bloom store that specify how the store should behave when requesting blocks from block refs.

`WithIgnoreMissing`: Ignore errors from blocks that could not be found in object storage, and instead return a `nil` value in the response.
`WithFetchAsync`: Return only the blocks that are available locally in cache and dispatch downloading of blocks to the blocks downloader, that operates in the background, and make them available for subsequent requests.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Mar 8, 2024
1 parent f4b2c5d commit f6cfff5
Show file tree
Hide file tree
Showing 16 changed files with 309 additions and 83 deletions.
5 changes: 0 additions & 5 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2349,11 +2349,6 @@ bloom_shipper:
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]

# In an eventually consistent system like the bloom components, we usually
# want to ignore blocks that are missing in storage.
# CLI flag: -bloom.shipper.ignore-missing-blocks
[ignore_missing_blocks: <boolean> | default = true]

blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,9 @@ func (s *SimpleBloomController) loadWorkForGap(

// NB(owen-d): we filter out nil blocks here to avoid panics in the bloom generator since the fetcher
// input->output length and indexing in its contract
// NB(chaudum): Do we want to fetch in strict mode and fail instead?
f := FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier](func(ctx context.Context, refs []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) {
blks, err := fetcher.FetchBlocks(ctx, refs)
blks, err := fetcher.FetchBlocks(ctx, refs, bloomshipper.WithFetchAsync(false), bloomshipper.WithIgnoreNotFound(true))
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"math"
"sync"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
)

type tableRangeProgress struct {
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package bloomcompactor
import (
"testing"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
)

func mkTblRange(tenant string, tbl config.DayTime, from, through model.Fingerprint) *tenantTableRange {
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/versioned_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package bloomcompactor
import (
"sort"

"github.com/prometheus/common/model"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/prometheus/common/model"
)

type tsdbToken struct {
Expand Down Expand Up @@ -94,7 +95,7 @@ func (t tsdbTokenRange) Add(version int, bounds v1.FingerprintBounds) (res tsdbT
}

// If we need to update the range, there are 5 cases:
// 1. `equal`: the incoming range equals an exising range ()
// 1. `equal`: the incoming range equals an existing range ()
// ------ # addition
// ------ # src
// 2. `subset`: the incoming range is a subset of an existing range
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/versioned_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package bloomcompactor
import (
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

func Test_TsdbTokenRange(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {
storageCfg := storage.Config{
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: t.TempDir(),
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},
},
FSConfig: local.FSConfig{
Directory: t.TempDir(),
Expand Down
8 changes: 6 additions & 2 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,19 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er
refs = append(refs, block.ref)
}

bqs, err := p.store.FetchBlocks(ctx, refs)
start := time.Now()
bqs, err := p.store.FetchBlocks(ctx, refs, bloomshipper.WithFetchAsync(true), bloomshipper.WithIgnoreNotFound(true))
level.Debug(p.logger).Log("msg", "fetch blocks", "count", len(bqs), "duration", time.Since(start), "err", err)

if err != nil {
return err
}

// TODO(chaudum): use `concurrency` lib with bound parallelism
for i, bq := range bqs {
block := data[i]
if bq == nil {
level.Warn(p.logger).Log("msg", "skipping not found block", "block", block.ref)
// TODO(chaudum): Add metric for skipped blocks
continue
}
level.Debug(p.logger).Log(
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) {
func (s *dummyStore) Stop() {
}

func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) {
func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef, _ ...bloomshipper.FetchOption) ([]*bloomshipper.CloseableBlockQuerier, error) {
result := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.querieres))

if s.err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,13 +672,19 @@ func (t *Loki) initBloomStore() (services.Service, error) {
if err != nil {
return nil, fmt.Errorf("failed to create metas cache: %w", err)
}
} else {
level.Info(logger).Log("msg", "no metas cache configured")
}

var blocksCache cache.TypedCache[string, bloomshipper.BlockDirectory]
if bsCfg.BlocksCache.IsEnabled() {
blocksCache = bloomshipper.NewBlocksCache(bsCfg.BlocksCache, reg, logger)
err = bloomshipper.LoadBlocksDirIntoCache(t.Cfg.StorageConfig.BloomShipperConfig.WorkingDirectory, blocksCache, logger)
level.Warn(logger).Log("msg", "failed to preload blocks cache", "err", err)
if err != nil {
level.Warn(logger).Log("msg", "failed to preload blocks cache", "err", err)
}
} else {
level.Info(logger).Log("msg", "no blocks cache configured")
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, reg, logger)
Expand Down
3 changes: 3 additions & 0 deletions pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f
FSConfig: local.FSConfig{Directory: dir},
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: filepath.Join(dir, "blooms"),
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},
},
BoltDBShipperConfig: boltdb.IndexCfg{
Config: indexshipper.Config{
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
IgnoreMissingBlocks bool `yaml:"ignore_missing_blocks"`
BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"`
BlocksCache cache.EmbeddedCacheConfig `yaml:"blocks_cache"`
MetasCache cache.Config `yaml:"metas_cache"`
Expand All @@ -30,7 +29,6 @@ func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *fla

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.")
f.BoolVar(&c.IgnoreMissingBlocks, prefix+"shipper.ignore-missing-blocks", true, "In an eventually consistent system like the bloom components, we usually want to ignore blocks that are missing in storage.")
c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f)
c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour)
c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f)
Expand Down
Loading

0 comments on commit f6cfff5

Please sign in to comment.