Skip to content

Commit

Permalink
chore(blooms): Make block query concurrency configurable (#12292)
Browse files Browse the repository at this point in the history
A concurrency of 10 per worker seems to be too high for a reasonable 8GiB memory limit on bloom gateways. So this PR makes the concurrency configurable and uses a default of 4.

Memory usage of loading block pages into memory therefore being:

```
worker_concurrency x block_query_concurrency x max_block_page_size x 2
```

With current defaults:

```
4 x 4 x 32MiB x 2 = 1GiB
```

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored and edsoncelio committed Mar 22, 2024
1 parent 9253500 commit 3ca8543
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 32 deletions.
4 changes: 4 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,10 @@ client:
# CLI flag: -bloom-gateway.worker-concurrency
[worker_concurrency: <int> | default = 4]
# Number of blocks processed concurrently on a single worker.
# CLI flag: -bloom-gateway.block-query-concurrency
[block_query_concurrency: <int> | default = 4]
# Maximum number of outstanding tasks per tenant.
# CLI flag: -bloom-gateway.max-outstanding-per-tenant
[max_outstanding_per_tenant: <int> | default = 1024]
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus
logger: logger,
metrics: newMetrics(reg, constants.Loki, metricsSubsystem),
workerConfig: workerConfig{
maxItems: cfg.NumMultiplexItems,
maxItems: cfg.NumMultiplexItems,
queryConcurrency: cfg.BlockQueryConcurrency,
},
pendingTasks: &atomic.Int64{},

Expand Down
18 changes: 8 additions & 10 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func TestBloomGateway_StartStopService(t *testing.T) {
func TestBloomGateway_FilterChunkRefs(t *testing.T) {
tenantID := "test"

store := setupBloomStore(t)
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

Expand All @@ -156,7 +155,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
ReplicationFactor: 1,
NumTokens: 16,
},
WorkerConcurrency: 4,
WorkerConcurrency: 2,
BlockQueryConcurrency: 2,
MaxOutstandingPerTenant: 1024,
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, store, logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, store, logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -333,15 +333,13 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("use fuse queriers to filter chunks", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)

// replace store implementation and re-initialize workers and sub-services
_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

gw.bloomStore = newMockBloomStore(queriers, metas)
err = gw.initServices()
reg := prometheus.NewRegistry()
store := newMockBloomStore(queriers, metas)

gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down
2 changes: 2 additions & 0 deletions pkg/bloomgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
Client ClientConfig `yaml:"client,omitempty" doc:""`

WorkerConcurrency int `yaml:"worker_concurrency"`
BlockQueryConcurrency int `yaml:"block_query_concurrency"`
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
NumMultiplexItems int `yaml:"num_multiplex_tasks"`
}
Expand All @@ -31,6 +32,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.")
f.IntVar(&cfg.WorkerConcurrency, prefix+"worker-concurrency", 4, "Number of workers to use for filtering chunks concurrently.")
f.IntVar(&cfg.BlockQueryConcurrency, prefix+"block-query-concurrency", 4, "Number of blocks processed concurrently on a single worker.")
f.IntVar(&cfg.MaxOutstandingPerTenant, prefix+"max-outstanding-per-tenant", 1024, "Maximum number of outstanding tasks per tenant.")
f.IntVar(&cfg.NumMultiplexItems, prefix+"num-multiplex-tasks", 512, "How many tasks are multiplexed at once.")
// TODO(chaudum): Figure out what the better place is for registering flags
Expand Down
24 changes: 13 additions & 11 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,28 @@ import (
"github.com/pkg/errors"

"github.com/grafana/dskit/concurrency"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

func newProcessor(id string, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
func newProcessor(id string, concurrency int, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
return &processor{
id: id,
store: store,
logger: logger,
metrics: metrics,
id: id,
concurrency: concurrency,
store: store,
logger: logger,
metrics: metrics,
}
}

type processor struct {
id string
store bloomshipper.Store
logger log.Logger
metrics *workerMetrics
id string
concurrency int // concurrency at which bloom blocks are processed
store bloomshipper.Store
logger log.Logger
metrics *workerMetrics
}

func (p *processor) run(ctx context.Context, tasks []Task) error {
Expand Down Expand Up @@ -94,8 +97,7 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er
return err
}

// TODO(chaudum): What's a good cocurrency value?
return concurrency.ForEachJob(ctx, len(bqs), 10, func(ctx context.Context, i int) error {
return concurrency.ForEachJob(ctx, len(bqs), p.concurrency, func(ctx context.Context, i int) error {
bq := bqs[i]
if bq == nil {
// TODO(chaudum): Add metric for skipped blocks
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestProcessor(t *testing.T) {
_, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

mockStore := newMockBloomStore(queriers, metas)
p := newProcessor("worker", mockStore, log.NewNopLogger(), metrics)
p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithInterval{
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestProcessor(t *testing.T) {
mockStore := newMockBloomStore(queriers, metas)
mockStore.err = errors.New("store failed")

p := newProcessor("worker", mockStore, log.NewNopLogger(), metrics)
p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithInterval{
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const (
)

type workerConfig struct {
maxItems int
maxItems int
queryConcurrency int
}

// worker is a datastructure that consumes tasks from the request queue,
Expand Down Expand Up @@ -65,7 +66,7 @@ func (w *worker) starting(_ context.Context) error {
func (w *worker) running(_ context.Context) error {
idx := queue.StartIndexWithLocalQueue

p := newProcessor(w.id, w.store, w.logger, w.metrics)
p := newProcessor(w.id, w.cfg.queryConcurrency, w.store, w.logger, w.metrics)

for st := w.State(); st == services.Running || st == services.Stopping; {
taskCtx := context.Background()
Expand Down
8 changes: 5 additions & 3 deletions pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package distributor

import (
"context"
"github.com/grafana/dskit/user"
"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/grafana/dskit/user"

"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"

"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"

Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/log/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"github.com/grafana/regexp"
"github.com/grafana/regexp/syntax"

"github.com/grafana/loki/pkg/util"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/util"
)

// Checker is an interface that matches against the input line or regexp.
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/ingester_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package querier
import (
"context"
"errors"
"go.uber.org/atomic"
"sync"
"testing"
"time"

"go.uber.org/atomic"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/chunk/predicate.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package chunk

import (
"github.com/grafana/loki/pkg/querier/plan"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/querier/plan"
)

type Predicate struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package indexgateway
import (
"flag"
"fmt"

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/util/ring"
Expand Down

0 comments on commit 3ca8543

Please sign in to comment.