From 3ca8543a69f453ac2ed9a26140c8b6779d66a44f Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 21 Mar 2024 21:13:38 +0100 Subject: [PATCH] chore(blooms): Make block query concurrency configurable (#12292) A concurrency of 10 per worker seems to be too high for a reasonable 8GiB memory limit on bloom gateways. So this PR makes the concurrency configurable and uses a default of 4. Memory usage of loading block pages into memory therefore being: ``` worker_concurrency x block_query_concurrency x max_block_page_size x 2 ``` With current defaults: ``` 4 x 4 x 32MiB x 2 = 1GiB ``` Signed-off-by: Christian Haudum --- docs/sources/configure/_index.md | 4 ++++ pkg/bloomgateway/bloomgateway.go | 3 ++- pkg/bloomgateway/bloomgateway_test.go | 18 +++++++------- pkg/bloomgateway/config.go | 2 ++ pkg/bloomgateway/processor.go | 24 ++++++++++--------- pkg/bloomgateway/processor_test.go | 4 ++-- pkg/bloomgateway/worker.go | 5 ++-- pkg/distributor/http_test.go | 8 ++++--- pkg/logql/log/filter.go | 3 ++- pkg/querier/ingester_querier_test.go | 3 ++- pkg/storage/chunk/predicate.go | 3 ++- .../indexshipper/indexgateway/config.go | 1 + 12 files changed, 46 insertions(+), 32 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index b49aaffb65413..ba96be3b3d604 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -1939,6 +1939,10 @@ client: # CLI flag: -bloom-gateway.worker-concurrency [worker_concurrency: | default = 4] +# Number of blocks processed concurrently on a single worker. +# CLI flag: -bloom-gateway.block-query-concurrency +[block_query_concurrency: | default = 4] + # Maximum number of outstanding tasks per tenant. # CLI flag: -bloom-gateway.max-outstanding-per-tenant [max_outstanding_per_tenant: | default = 1024] diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 4138ff4c1beb2..f76d6a55d2a09 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -114,7 +114,8 @@ func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus logger: logger, metrics: newMetrics(reg, constants.Loki, metricsSubsystem), workerConfig: workerConfig{ - maxItems: cfg.NumMultiplexItems, + maxItems: cfg.NumMultiplexItems, + queryConcurrency: cfg.BlockQueryConcurrency, }, pendingTasks: &atomic.Int64{}, diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index b96eeeec19c7e..45c9a3926c157 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -138,7 +138,6 @@ func TestBloomGateway_StartStopService(t *testing.T) { func TestBloomGateway_FilterChunkRefs(t *testing.T) { tenantID := "test" - store := setupBloomStore(t) logger := log.NewNopLogger() reg := prometheus.NewRegistry() @@ -156,7 +155,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { ReplicationFactor: 1, NumTokens: 16, }, - WorkerConcurrency: 4, + WorkerConcurrency: 2, + BlockQueryConcurrency: 2, MaxOutstandingPerTenant: 1024, } @@ -249,7 +249,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { now := mktime("2023-10-03 10:00") reg := prometheus.NewRegistry() - gw, err := New(cfg, store, logger, reg) + gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -294,7 +294,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { now := mktime("2023-10-03 10:00") reg := prometheus.NewRegistry() - gw, err := New(cfg, store, logger, reg) + gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -333,15 +333,13 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("use fuse queriers to filter chunks", func(t *testing.T) { now := mktime("2023-10-03 10:00") - reg := prometheus.NewRegistry() - gw, err := New(cfg, store, logger, reg) - require.NoError(t, err) - // replace store implementation and re-initialize workers and sub-services _, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) - gw.bloomStore = newMockBloomStore(queriers, metas) - err = gw.initServices() + reg := prometheus.NewRegistry() + store := newMockBloomStore(queriers, metas) + + gw, err := New(cfg, store, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) diff --git a/pkg/bloomgateway/config.go b/pkg/bloomgateway/config.go index ad5d2928728a6..356bc782fb839 100644 --- a/pkg/bloomgateway/config.go +++ b/pkg/bloomgateway/config.go @@ -18,6 +18,7 @@ type Config struct { Client ClientConfig `yaml:"client,omitempty" doc:""` WorkerConcurrency int `yaml:"worker_concurrency"` + BlockQueryConcurrency int `yaml:"block_query_concurrency"` MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"` NumMultiplexItems int `yaml:"num_multiplex_tasks"` } @@ -31,6 +32,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.") f.IntVar(&cfg.WorkerConcurrency, prefix+"worker-concurrency", 4, "Number of workers to use for filtering chunks concurrently.") + f.IntVar(&cfg.BlockQueryConcurrency, prefix+"block-query-concurrency", 4, "Number of blocks processed concurrently on a single worker.") f.IntVar(&cfg.MaxOutstandingPerTenant, prefix+"max-outstanding-per-tenant", 1024, "Maximum number of outstanding tasks per tenant.") f.IntVar(&cfg.NumMultiplexItems, prefix+"num-multiplex-tasks", 512, "How many tasks are multiplexed at once.") // TODO(chaudum): Figure out what the better place is for registering flags diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 9a503551d3d23..9fc6aca57dc11 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -11,25 +11,28 @@ import ( "github.com/pkg/errors" "github.com/grafana/dskit/concurrency" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) -func newProcessor(id string, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor { +func newProcessor(id string, concurrency int, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor { return &processor{ - id: id, - store: store, - logger: logger, - metrics: metrics, + id: id, + concurrency: concurrency, + store: store, + logger: logger, + metrics: metrics, } } type processor struct { - id string - store bloomshipper.Store - logger log.Logger - metrics *workerMetrics + id string + concurrency int // concurrency at which bloom blocks are processed + store bloomshipper.Store + logger log.Logger + metrics *workerMetrics } func (p *processor) run(ctx context.Context, tasks []Task) error { @@ -94,8 +97,7 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er return err } - // TODO(chaudum): What's a good cocurrency value? - return concurrency.ForEachJob(ctx, len(bqs), 10, func(ctx context.Context, i int) error { + return concurrency.ForEachJob(ctx, len(bqs), p.concurrency, func(ctx context.Context, i int) error { bq := bqs[i] if bq == nil { // TODO(chaudum): Add metric for skipped blocks diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index 246da373f3574..d9e6a799045e3 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -96,7 +96,7 @@ func TestProcessor(t *testing.T) { _, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) mockStore := newMockBloomStore(queriers, metas) - p := newProcessor("worker", mockStore, log.NewNopLogger(), metrics) + p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics) chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10) swb := seriesWithInterval{ @@ -145,7 +145,7 @@ func TestProcessor(t *testing.T) { mockStore := newMockBloomStore(queriers, metas) mockStore.err = errors.New("store failed") - p := newProcessor("worker", mockStore, log.NewNopLogger(), metrics) + p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics) chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10) swb := seriesWithInterval{ diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 5a37b059e75e9..52de8155d7783 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -22,7 +22,8 @@ const ( ) type workerConfig struct { - maxItems int + maxItems int + queryConcurrency int } // worker is a datastructure that consumes tasks from the request queue, @@ -65,7 +66,7 @@ func (w *worker) starting(_ context.Context) error { func (w *worker) running(_ context.Context) error { idx := queue.StartIndexWithLocalQueue - p := newProcessor(w.id, w.store, w.logger, w.metrics) + p := newProcessor(w.id, w.cfg.queryConcurrency, w.store, w.logger, w.metrics) for st := w.State(); st == services.Running || st == services.Stopping; { taskCtx := context.Background() diff --git a/pkg/distributor/http_test.go b/pkg/distributor/http_test.go index b5b4bebbd58fb..23b2993c5b213 100644 --- a/pkg/distributor/http_test.go +++ b/pkg/distributor/http_test.go @@ -2,14 +2,16 @@ package distributor import ( "context" - "github.com/grafana/dskit/user" - "github.com/grafana/loki/pkg/loghttp/push" - "github.com/grafana/loki/pkg/logproto" "io" "net/http" "net/http/httptest" "testing" + "github.com/grafana/dskit/user" + + "github.com/grafana/loki/pkg/loghttp/push" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/dskit/flagext" "github.com/stretchr/testify/require" diff --git a/pkg/logql/log/filter.go b/pkg/logql/log/filter.go index 7117b77805d61..7b613947c8b8b 100644 --- a/pkg/logql/log/filter.go +++ b/pkg/logql/log/filter.go @@ -9,8 +9,9 @@ import ( "github.com/grafana/regexp" "github.com/grafana/regexp/syntax" - "github.com/grafana/loki/pkg/util" "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/util" ) // Checker is an interface that matches against the input line or regexp. diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index 2eee6dc0ae072..d5f4d872c5084 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -3,11 +3,12 @@ package querier import ( "context" "errors" - "go.uber.org/atomic" "sync" "testing" "time" + "go.uber.org/atomic" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" diff --git a/pkg/storage/chunk/predicate.go b/pkg/storage/chunk/predicate.go index 391b1e9163235..62a91c7a46437 100644 --- a/pkg/storage/chunk/predicate.go +++ b/pkg/storage/chunk/predicate.go @@ -1,8 +1,9 @@ package chunk import ( - "github.com/grafana/loki/pkg/querier/plan" "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/querier/plan" ) type Predicate struct { diff --git a/pkg/storage/stores/shipper/indexshipper/indexgateway/config.go b/pkg/storage/stores/shipper/indexshipper/indexgateway/config.go index 884d29bf9e37c..eb5c134a5de18 100644 --- a/pkg/storage/stores/shipper/indexshipper/indexgateway/config.go +++ b/pkg/storage/stores/shipper/indexshipper/indexgateway/config.go @@ -3,6 +3,7 @@ package indexgateway import ( "flag" "fmt" + "github.com/pkg/errors" "github.com/grafana/loki/pkg/util/ring"