Skip to content

Commit

Permalink
chore(bloom-gw): Process blocks in parallel (grafana#12203)
Browse files Browse the repository at this point in the history
While processing a single block is rather fast, processing a lot of blocks sequentially can lead to problem that single slowly processed blocks lead to high tail latency.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Mar 14, 2024
1 parent 4b28f82 commit 8abfc29
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 37 deletions.
12 changes: 6 additions & 6 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ package bloomgateway
import (
"context"
"fmt"
"sort"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -347,14 +348,11 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// Once the tasks is closed, it will send the task with the results from the
// block querier to the supplied task channel.
func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Task) {
logger := log.With(g.logger, "task", task.ID)

for res := range task.resCh {
select {
case <-ctx.Done():
level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
// do nothing
default:
level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
task.responses = append(task.responses, res)
}
}
Expand All @@ -368,18 +366,20 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas
}
}

// merges a list of responses via a heap. The same fingerprints and chunks can be present in multiple responses,
// but each response must be ordered by fingerprint
// merges a list of responses via a heap. The same fingerprints and chunks can be present in multiple responses.
// Individual responses do not need to be be ordered beforehand.
func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] {
if len(responses) == 0 {
return v1.NewEmptyIter[v1.Output]()
}
if len(responses) == 1 {
sort.Slice(responses[0], func(i, j int) bool { return responses[0][i].Fp < responses[0][j].Fp })
return v1.NewSliceIter(responses[0])
}

itrs := make([]v1.PeekingIterator[v1.Output], 0, len(responses))
for _, r := range responses {
sort.Slice(r, func(i, j int) bool { return r[i].Fp < r[j].Fp })
itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r)))
}
return v1.NewHeapIterator[v1.Output](
Expand Down
40 changes: 19 additions & 21 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,16 @@ func NewTask(ctx context.Context, tenantID string, refs seriesWithInterval, filt
}

task := Task{
ID: key,
Tenant: tenantID,
err: new(wrappedError),
resCh: make(chan v1.Output),
filters: filters,
series: refs.series,
interval: refs.interval,
table: refs.day,
ctx: ctx,
done: make(chan struct{}),
responses: make([]v1.Output, 0, len(refs.series)),
ID: key,
Tenant: tenantID,
err: new(wrappedError),
resCh: make(chan v1.Output),
filters: filters,
series: refs.series,
interval: refs.interval,
table: refs.day,
ctx: ctx,
done: make(chan struct{}),
}
return task, nil
}
Expand Down Expand Up @@ -130,16 +129,15 @@ func (t Task) CloseWithError(err error) {
func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
// do not copy ID to distinguish it as copied task
return Task{
Tenant: t.Tenant,
err: t.err,
resCh: t.resCh,
filters: t.filters,
series: series,
interval: t.interval,
table: t.table,
ctx: t.ctx,
done: make(chan struct{}),
responses: make([]v1.Output, 0, len(series)),
Tenant: t.Tenant,
err: t.err,
resCh: t.resCh,
filters: t.filters,
series: series,
interval: t.interval,
table: t.table,
ctx: t.ctx,
done: make(chan struct{}),
}
}

Expand Down
24 changes: 14 additions & 10 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

"github.com/grafana/dskit/concurrency"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
Expand Down Expand Up @@ -93,31 +94,34 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er
return err
}

// TODO(chaudum): use `concurrency` lib with bound parallelism
for i, bq := range bqs {
block := data[i]
// TODO(chaudum): What's a good cocurrency value?
return concurrency.ForEachJob(ctx, len(bqs), 10, func(ctx context.Context, i int) error {
bq := bqs[i]
if bq == nil {
// TODO(chaudum): Add metric for skipped blocks
continue
return nil
}
defer bq.Close()

block := data[i]
level.Debug(p.logger).Log(
"msg", "process block with tasks",
"job", i+1,
"of_jobs", len(bqs),
"block", block.ref,
"block_bounds", block.ref.Bounds,
"querier_bounds", bq.Bounds,
"num_tasks", len(block.tasks),
)

if !block.ref.Bounds.Equal(bq.Bounds) {
bq.Close()
return errors.Errorf("block and querier bounds differ: %s vs %s", block.ref.Bounds, bq.Bounds)
}

err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
bq.Close()
if err != nil {
return errors.Wrap(err, "processing block")
}
}
return nil
return nil
})
}

func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerier, tasks []Task) error {
Expand Down

0 comments on commit 8abfc29

Please sign in to comment.