From 8abfc29e44807c35402a10b2425f3e7250525838 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 14 Mar 2024 17:28:13 +0100 Subject: [PATCH] chore(bloom-gw): Process blocks in parallel (#12203) While processing a single block is rather fast, processing a lot of blocks sequentially can lead to problem that single slowly processed blocks lead to high tail latency. Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 12 +++++----- pkg/bloomgateway/multiplexing.go | 40 +++++++++++++++----------------- pkg/bloomgateway/processor.go | 24 +++++++++++-------- 3 files changed, 39 insertions(+), 37 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index b78e80beefa03..515b27b01b2cf 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -44,6 +44,7 @@ package bloomgateway import ( "context" "fmt" + "sort" "time" "github.com/go-kit/log" @@ -347,14 +348,11 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk // Once the tasks is closed, it will send the task with the results from the // block querier to the supplied task channel. func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Task) { - logger := log.With(g.logger, "task", task.ID) - for res := range task.resCh { select { case <-ctx.Done(): - level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) + // do nothing default: - level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) task.responses = append(task.responses, res) } } @@ -368,18 +366,20 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas } } -// merges a list of responses via a heap. The same fingerprints and chunks can be present in multiple responses, -// but each response must be ordered by fingerprint +// merges a list of responses via a heap. The same fingerprints and chunks can be present in multiple responses. +// Individual responses do not need to be be ordered beforehand. func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] { if len(responses) == 0 { return v1.NewEmptyIter[v1.Output]() } if len(responses) == 1 { + sort.Slice(responses[0], func(i, j int) bool { return responses[0][i].Fp < responses[0][j].Fp }) return v1.NewSliceIter(responses[0]) } itrs := make([]v1.PeekingIterator[v1.Output], 0, len(responses)) for _, r := range responses { + sort.Slice(r, func(i, j int) bool { return r[i].Fp < r[j].Fp }) itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r))) } return v1.NewHeapIterator[v1.Output]( diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 8279dda99a595..97e0b0aa6d66f 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -87,17 +87,16 @@ func NewTask(ctx context.Context, tenantID string, refs seriesWithInterval, filt } task := Task{ - ID: key, - Tenant: tenantID, - err: new(wrappedError), - resCh: make(chan v1.Output), - filters: filters, - series: refs.series, - interval: refs.interval, - table: refs.day, - ctx: ctx, - done: make(chan struct{}), - responses: make([]v1.Output, 0, len(refs.series)), + ID: key, + Tenant: tenantID, + err: new(wrappedError), + resCh: make(chan v1.Output), + filters: filters, + series: refs.series, + interval: refs.interval, + table: refs.day, + ctx: ctx, + done: make(chan struct{}), } return task, nil } @@ -130,16 +129,15 @@ func (t Task) CloseWithError(err error) { func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task { // do not copy ID to distinguish it as copied task return Task{ - Tenant: t.Tenant, - err: t.err, - resCh: t.resCh, - filters: t.filters, - series: series, - interval: t.interval, - table: t.table, - ctx: t.ctx, - done: make(chan struct{}), - responses: make([]v1.Output, 0, len(series)), + Tenant: t.Tenant, + err: t.err, + resCh: t.resCh, + filters: t.filters, + series: series, + interval: t.interval, + table: t.table, + ctx: t.ctx, + done: make(chan struct{}), } } diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 90e3f6f730c93..9a503551d3d23 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -10,6 +10,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "github.com/grafana/dskit/concurrency" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" @@ -93,31 +94,34 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er return err } - // TODO(chaudum): use `concurrency` lib with bound parallelism - for i, bq := range bqs { - block := data[i] + // TODO(chaudum): What's a good cocurrency value? + return concurrency.ForEachJob(ctx, len(bqs), 10, func(ctx context.Context, i int) error { + bq := bqs[i] if bq == nil { // TODO(chaudum): Add metric for skipped blocks - continue + return nil } + defer bq.Close() + + block := data[i] level.Debug(p.logger).Log( "msg", "process block with tasks", + "job", i+1, + "of_jobs", len(bqs), "block", block.ref, - "block_bounds", block.ref.Bounds, - "querier_bounds", bq.Bounds, "num_tasks", len(block.tasks), ) + if !block.ref.Bounds.Equal(bq.Bounds) { - bq.Close() return errors.Errorf("block and querier bounds differ: %s vs %s", block.ref.Bounds, bq.Bounds) } + err := p.processBlock(ctx, bq.BlockQuerier, block.tasks) - bq.Close() if err != nil { return errors.Wrap(err, "processing block") } - } - return nil + return nil + }) } func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerier, tasks []Task) error {