Skip to content

Commit

Permalink
fix(blooms): bloom-gw chunk merging improvements (grafana#12162)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Mar 8, 2024
1 parent e71964c commit f4b2c5d
Show file tree
Hide file tree
Showing 4 changed files with 494 additions and 122 deletions.
228 changes: 162 additions & 66 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ package bloomgateway
import (
"context"
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -229,7 +228,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}, nil
}

var numSeries int
seriesByDay := partitionRequest(req)

// no tasks --> empty response
Expand All @@ -240,11 +238,16 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}

tasks := make([]Task, 0, len(seriesByDay))
responses := make([][]v1.Output, 0, len(seriesByDay))
for _, seriesForDay := range seriesByDay {
task, err := NewTask(ctx, tenantID, seriesForDay, filters)
if err != nil {
return nil, err
}

// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))

level.Debug(g.logger).Log(
"msg", "created task for day",
"task", task.ID,
Expand All @@ -254,7 +257,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
"filters", JoinFunc(filters, ";", func(e syntax.LineFilterExpr) string { return e.String() }),
)
tasks = append(tasks, task)
numSeries += len(seriesForDay.series)
}

g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())
Expand All @@ -272,13 +274,19 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// When enqueuing, we also add the task to the pending tasks
_ = g.pendingTasks.Inc()
})
// TODO(owen-d): use `concurrency` lib, bound parallelism
go g.consumeTask(ctx, task, tasksCh)
}

responses := responsesPool.Get(numSeries)
defer responsesPool.Put(responses)
remaining := len(tasks)

preFilterSeries := len(req.Refs)
var preFilterChunks, postFilterChunks int

for _, series := range req.Refs {
preFilterChunks += len(series.Refs)
}

for remaining > 0 {
select {
case <-ctx.Done():
Expand All @@ -288,20 +296,36 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
if task.Err() != nil {
return nil, errors.Wrap(task.Err(), "request failed")
}
responses = append(responses, task.responses...)
responses = append(responses, task.responses)
remaining--
}
}

preFilterSeries := len(req.Refs)
filtered := filterChunkRefs(req, responses)

// TODO(chaudum): Don't wait for all responses before starting to filter chunks.
filtered := g.processResponses(req, responses)
// free up the responses
for _, resp := range responses {
responsesPool.Put(resp)
}

postFilterSeries := len(req.Refs)
postFilterSeries := len(filtered)

level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered)
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
for _, group := range req.Refs {
postFilterChunks += len(group.Refs)
}
g.metrics.requestedSeries.Observe(float64(preFilterSeries))
g.metrics.filteredSeries.Observe(float64(preFilterSeries - postFilterSeries))
g.metrics.requestedChunks.Observe(float64(preFilterChunks))
g.metrics.filteredChunks.Observe(float64(preFilterChunks - postFilterChunks))

level.Info(logger).Log(
"msg", "return filtered chunk refs",
"requested_series", preFilterSeries,
"filtered_series", preFilterSeries-postFilterSeries,
"requested_chunks", preFilterChunks,
"filtered_chunks", preFilterChunks-postFilterChunks,
)
return &logproto.FilterChunkRefResponse{ChunkRefs: filtered}, nil
}

// consumeTask receives v1.Output yielded from the block querier on the task's
Expand All @@ -317,11 +341,9 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas
select {
case <-ctx.Done():
level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
g.metrics.chunkRemovals.WithLabelValues("dropped").Add(float64(res.Removals.Len()))
default:
level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
task.responses = append(task.responses, res)
g.metrics.chunkRemovals.WithLabelValues("accepted").Add(float64(res.Removals.Len()))
}
}

Expand All @@ -334,68 +356,142 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas
}
}

func (g *Gateway) processResponses(req *logproto.FilterChunkRefRequest, responses []v1.Output) (filtered int) {
for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
filtered += g.removeNotMatchingChunks(req, o)
// merges a list of responses via a heap. The same fingerprints and chunks can be present in multiple responses,
// but each response must be ordered by fingerprint
func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] {
if len(responses) == 0 {
return v1.NewEmptyIter[v1.Output]()
}
if len(responses) == 1 {
return v1.NewSliceIter(responses[0])
}

itrs := make([]v1.PeekingIterator[v1.Output], 0, len(responses))
for _, r := range responses {
itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r)))
}
return
return v1.NewHeapIterator[v1.Output](
func(o1, o2 v1.Output) bool { return o1.Fp <= o2.Fp },
itrs...,
)
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) (filtered int) {
// TODO(owen-d): improve perf. This can be faster with a more specialized impl
// NB(owen-d): `req` is mutated in place for performance, but `responses` is not
func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Output) []*logproto.GroupedChunkRefs {
res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))

// dedupe outputs, merging the same series.
// This returns an Iterator[v1.Output]
dedupedResps := v1.NewDedupingIter[v1.Output, v1.Output](
// eq
func(o1, o2 v1.Output) bool {
return o1.Fp == o2.Fp
},
// from
v1.Identity[v1.Output],
// merge two removal sets for the same series, deduping
func(o1, o2 v1.Output) v1.Output {
res := v1.Output{Fp: o1.Fp}

var chks v1.ChunkRefs
var i, j int
for i < len(o1.Removals) && j < len(o2.Removals) {

a, b := o1.Removals[i], o2.Removals[j]

if a == b {
chks = append(chks, a)
i++
j++
continue
}

if a.Less(b) {
chks = append(chks, a)
i++
continue
}
chks = append(chks, b)
j++
}

// binary search index of fingerprint
// TODO(owen-d): there's a bug here because the same fingerprint and chunks can exist over multiple day buckets.
// If all requested chunks are in both days, the first day could technically remove _all_ chunks from consideration.
// The sort.Search for the _next_ chunk would return an index where fingerprint is greater than the target fingerprint.
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})
if i < len(o1.Removals) {
chks = append(chks, o1.Removals[i:]...)
}
if j < len(o2.Removals) {
chks = append(chks, o2.Removals[j:]...)
}

// fingerprint not found
if idx >= len(req.Refs) {
level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
res.Removals = chks
return res
},
v1.NewPeekingIter(orderedResponsesByFP(responses)),
)

// Iterate through the requested and filtered series/chunks,
// removing chunks that were filtered out.
var next bool
var at v1.Output
if next = dedupedResps.Next(); next {
at = dedupedResps.At()
}

// if all chunks of a fingerprint are are removed
// then remove the whole group from the response

// TODO(owen-d): there's a bug here because the same fingerprint and chunks can exist over multiple day buckets.
// A later day bucket could happen to request removals with len=remaining, but whose chunk references were
// partially removed in an earlier round. Just checking the length here could cause us to discard chunks
// that shouldn't be.
if len(req.Refs[idx].Refs) == res.Removals.Len() {
filtered += len(req.Refs[idx].Refs)

req.Refs[idx] = nil // avoid leaking pointer
// TODO(owen-d): this is O(n^2);
// use more specialized data structure that doesn't reslice
req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...)
return
}
for i := 0; i < len(req.Refs); i++ {
// we've hit the end of the removals -- append the rest of the
// requested series and return
if !next {
res = append(res, req.Refs[i:]...)
return res
}

for i := range res.Removals {
toRemove := res.Removals[i]
for j := 0; j < len(req.Refs[idx].Refs); j++ {
if req.Refs[idx].Refs[j] == nil {
continue
}
// the current series had no removals
cur := req.Refs[i]
if cur.Fingerprint < uint64(at.Fp) {
res = append(res, cur)
continue
}

// TODO(owen-d): These should check start/end/checksum, not just checksum.
if logproto.ShortRef(toRemove) == *req.Refs[idx].Refs[j] {
filtered += 1
// the current series had removals. No need to check for equality
// b/c removals must be present in input
filterChunkRefsForSeries(cur, at.Removals)
if len(cur.Refs) > 0 {
res = append(res, cur)
}

// TODO(owen-d): usually not a problem (n is small), but I've seen some series have
// many thousands of chunks per day, so would be good to not reslice.
// See `labels.NewBuilder()` for an example
req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
j-- // since we removed the current item at index, we have to redo the same index
}
// advance removals
if next = dedupedResps.Next(); next {
at = dedupedResps.At()
}
}
return

return res
}

// mutates cur
func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkRefs) {
// use same backing array to avoid allocations
res := cur.Refs[:0]

var i, j int
for i < len(cur.Refs) && j < len(removals) {

if (*v1.ChunkRef)(cur.Refs[i]).Less(removals[j]) {
// chunk was not removed
res = append(res, cur.Refs[i])
i++
} else {
// Since all removals must exist in the series, we can assume that if the removal
// is not less, it must be equal to the current chunk (a match). Skip this chunk.
i++
j++
}

}

if i < len(cur.Refs) {
res = append(res, cur.Refs[i:]...)
}

cur.Refs = cur.Refs[:len(res)]
}
Loading

0 comments on commit f4b2c5d

Please sign in to comment.