diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 97f32db54dac4..96b08636286e2 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -44,7 +44,6 @@ package bloomgateway import ( "context" "fmt" - "sort" "sync" "time" @@ -229,7 +228,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk }, nil } - var numSeries int seriesByDay := partitionRequest(req) // no tasks --> empty response @@ -240,11 +238,16 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk } tasks := make([]Task, 0, len(seriesByDay)) + responses := make([][]v1.Output, 0, len(seriesByDay)) for _, seriesForDay := range seriesByDay { task, err := NewTask(ctx, tenantID, seriesForDay, filters) if err != nil { return nil, err } + + // TODO(owen-d): include capacity in constructor? + task.responses = responsesPool.Get(len(seriesForDay.series)) + level.Debug(g.logger).Log( "msg", "created task for day", "task", task.ID, @@ -254,7 +257,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk "filters", JoinFunc(filters, ";", func(e syntax.LineFilterExpr) string { return e.String() }), ) tasks = append(tasks, task) - numSeries += len(seriesForDay.series) } g.activeUsers.UpdateUserTimestamp(tenantID, time.Now()) @@ -272,13 +274,19 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk // When enqueuing, we also add the task to the pending tasks _ = g.pendingTasks.Inc() }) + // TODO(owen-d): use `concurrency` lib, bound parallelism go g.consumeTask(ctx, task, tasksCh) } - responses := responsesPool.Get(numSeries) - defer responsesPool.Put(responses) remaining := len(tasks) + preFilterSeries := len(req.Refs) + var preFilterChunks, postFilterChunks int + + for _, series := range req.Refs { + preFilterChunks += len(series.Refs) + } + for remaining > 0 { select { case <-ctx.Done(): @@ -288,20 +296,36 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk if task.Err() != nil { return nil, errors.Wrap(task.Err(), "request failed") } - responses = append(responses, task.responses...) + responses = append(responses, task.responses) remaining-- } } - preFilterSeries := len(req.Refs) + filtered := filterChunkRefs(req, responses) - // TODO(chaudum): Don't wait for all responses before starting to filter chunks. - filtered := g.processResponses(req, responses) + // free up the responses + for _, resp := range responses { + responsesPool.Put(resp) + } - postFilterSeries := len(req.Refs) + postFilterSeries := len(filtered) - level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered) - return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil + for _, group := range req.Refs { + postFilterChunks += len(group.Refs) + } + g.metrics.requestedSeries.Observe(float64(preFilterSeries)) + g.metrics.filteredSeries.Observe(float64(preFilterSeries - postFilterSeries)) + g.metrics.requestedChunks.Observe(float64(preFilterChunks)) + g.metrics.filteredChunks.Observe(float64(preFilterChunks - postFilterChunks)) + + level.Info(logger).Log( + "msg", "return filtered chunk refs", + "requested_series", preFilterSeries, + "filtered_series", preFilterSeries-postFilterSeries, + "requested_chunks", preFilterChunks, + "filtered_chunks", preFilterChunks-postFilterChunks, + ) + return &logproto.FilterChunkRefResponse{ChunkRefs: filtered}, nil } // consumeTask receives v1.Output yielded from the block querier on the task's @@ -317,11 +341,9 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas select { case <-ctx.Done(): level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) - g.metrics.chunkRemovals.WithLabelValues("dropped").Add(float64(res.Removals.Len())) default: level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) task.responses = append(task.responses, res) - g.metrics.chunkRemovals.WithLabelValues("accepted").Add(float64(res.Removals.Len())) } } @@ -334,68 +356,142 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas } } -func (g *Gateway) processResponses(req *logproto.FilterChunkRefRequest, responses []v1.Output) (filtered int) { - for _, o := range responses { - if o.Removals.Len() == 0 { - continue - } - filtered += g.removeNotMatchingChunks(req, o) +// merges a list of responses via a heap. The same fingerprints and chunks can be present in multiple responses, +// but each response must be ordered by fingerprint +func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] { + if len(responses) == 0 { + return v1.NewEmptyIter[v1.Output]() + } + if len(responses) == 1 { + return v1.NewSliceIter(responses[0]) + } + + itrs := make([]v1.PeekingIterator[v1.Output], 0, len(responses)) + for _, r := range responses { + itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r))) } - return + return v1.NewHeapIterator[v1.Output]( + func(o1, o2 v1.Output) bool { return o1.Fp <= o2.Fp }, + itrs..., + ) } -func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) (filtered int) { +// TODO(owen-d): improve perf. This can be faster with a more specialized impl +// NB(owen-d): `req` is mutated in place for performance, but `responses` is not +func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Output) []*logproto.GroupedChunkRefs { + res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs)) + + // dedupe outputs, merging the same series. + // This returns an Iterator[v1.Output] + dedupedResps := v1.NewDedupingIter[v1.Output, v1.Output]( + // eq + func(o1, o2 v1.Output) bool { + return o1.Fp == o2.Fp + }, + // from + v1.Identity[v1.Output], + // merge two removal sets for the same series, deduping + func(o1, o2 v1.Output) v1.Output { + res := v1.Output{Fp: o1.Fp} + + var chks v1.ChunkRefs + var i, j int + for i < len(o1.Removals) && j < len(o2.Removals) { + + a, b := o1.Removals[i], o2.Removals[j] + + if a == b { + chks = append(chks, a) + i++ + j++ + continue + } + + if a.Less(b) { + chks = append(chks, a) + i++ + continue + } + chks = append(chks, b) + j++ + } - // binary search index of fingerprint - // TODO(owen-d): there's a bug here because the same fingerprint and chunks can exist over multiple day buckets. - // If all requested chunks are in both days, the first day could technically remove _all_ chunks from consideration. - // The sort.Search for the _next_ chunk would return an index where fingerprint is greater than the target fingerprint. - idx := sort.Search(len(req.Refs), func(i int) bool { - return req.Refs[i].Fingerprint >= uint64(res.Fp) - }) + if i < len(o1.Removals) { + chks = append(chks, o1.Removals[i:]...) + } + if j < len(o2.Removals) { + chks = append(chks, o2.Removals[j:]...) + } - // fingerprint not found - if idx >= len(req.Refs) { - level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp)) - return + res.Removals = chks + return res + }, + v1.NewPeekingIter(orderedResponsesByFP(responses)), + ) + + // Iterate through the requested and filtered series/chunks, + // removing chunks that were filtered out. + var next bool + var at v1.Output + if next = dedupedResps.Next(); next { + at = dedupedResps.At() } - // if all chunks of a fingerprint are are removed - // then remove the whole group from the response - - // TODO(owen-d): there's a bug here because the same fingerprint and chunks can exist over multiple day buckets. - // A later day bucket could happen to request removals with len=remaining, but whose chunk references were - // partially removed in an earlier round. Just checking the length here could cause us to discard chunks - // that shouldn't be. - if len(req.Refs[idx].Refs) == res.Removals.Len() { - filtered += len(req.Refs[idx].Refs) - - req.Refs[idx] = nil // avoid leaking pointer - // TODO(owen-d): this is O(n^2); - // use more specialized data structure that doesn't reslice - req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...) - return - } + for i := 0; i < len(req.Refs); i++ { + // we've hit the end of the removals -- append the rest of the + // requested series and return + if !next { + res = append(res, req.Refs[i:]...) + return res + } - for i := range res.Removals { - toRemove := res.Removals[i] - for j := 0; j < len(req.Refs[idx].Refs); j++ { - if req.Refs[idx].Refs[j] == nil { - continue - } + // the current series had no removals + cur := req.Refs[i] + if cur.Fingerprint < uint64(at.Fp) { + res = append(res, cur) + continue + } - // TODO(owen-d): These should check start/end/checksum, not just checksum. - if logproto.ShortRef(toRemove) == *req.Refs[idx].Refs[j] { - filtered += 1 + // the current series had removals. No need to check for equality + // b/c removals must be present in input + filterChunkRefsForSeries(cur, at.Removals) + if len(cur.Refs) > 0 { + res = append(res, cur) + } - // TODO(owen-d): usually not a problem (n is small), but I've seen some series have - // many thousands of chunks per day, so would be good to not reslice. - // See `labels.NewBuilder()` for an example - req.Refs[idx].Refs[j] = nil // avoid leaking pointer - req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...) - j-- // since we removed the current item at index, we have to redo the same index - } + // advance removals + if next = dedupedResps.Next(); next { + at = dedupedResps.At() } } - return + + return res +} + +// mutates cur +func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkRefs) { + // use same backing array to avoid allocations + res := cur.Refs[:0] + + var i, j int + for i < len(cur.Refs) && j < len(removals) { + + if (*v1.ChunkRef)(cur.Refs[i]).Less(removals[j]) { + // chunk was not removed + res = append(res, cur.Refs[i]) + i++ + } else { + // Since all removals must exist in the series, we can assume that if the removal + // is not less, it must be equal to the current chunk (a match). Skip this chunk. + i++ + j++ + } + + } + + if i < len(cur.Refs) { + res = append(res, cur.Refs[i:]...) + } + + cur.Refs = cur.Refs[:len(res)] } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 2b513a6dc6b63..59a3c7a840bd5 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sync" "testing" "time" @@ -16,6 +17,7 @@ import ( "github.com/grafana/dskit/user" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" @@ -411,65 +413,312 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { }) } -func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { - g := &Gateway{ - logger: log.NewNopLogger(), +func TestFilterChunkRefsForSeries(t *testing.T) { + mkInput := func(xs []uint32) *logproto.GroupedChunkRefs { + out := &logproto.GroupedChunkRefs{Refs: make([]*logproto.ShortRef, len(xs))} + for i, x := range xs { + out.Refs[i] = &logproto.ShortRef{Checksum: x} + } + return out } - t.Run("removing chunks partially", func(t *testing.T) { - req := &logproto.FilterChunkRefRequest{ - Refs: []*logproto.GroupedChunkRefs{ - {Fingerprint: 0x00, Tenant: "fake", Refs: []*logproto.ShortRef{ - {Checksum: 0x1}, - {Checksum: 0x2}, - {Checksum: 0x3}, - {Checksum: 0x4}, - {Checksum: 0x5}, - }}, - }, + mkRemovals := func(xs []uint32) v1.ChunkRefs { + out := make(v1.ChunkRefs, len(xs)) + for i, x := range xs { + out[i] = v1.ChunkRef{Checksum: x} } - res := v1.Output{ - Fp: 0x00, Removals: v1.ChunkRefs{ - {Checksum: 0x2}, - {Checksum: 0x4}, - }, + return out + } + + for _, tc := range []struct { + desc string + input, removals, expected []uint32 + }{ + { + desc: "no matches", + input: []uint32{0, 1}, + expected: []uint32{0, 1}, + }, + { + desc: "remove all", + input: []uint32{0, 1, 2, 3, 4}, + removals: []uint32{0, 1, 2, 3, 4}, + expected: []uint32{}, + }, + { + desc: "remove every other", + input: []uint32{0, 1, 2, 3, 4}, + removals: []uint32{0, 2, 4}, + expected: []uint32{1, 3}, + }, + { + desc: "remove middle section", + input: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + removals: []uint32{3, 4, 5}, + expected: []uint32{0, 1, 2, 6, 7, 8, 9}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + input := mkInput(tc.input) + expected := mkInput(tc.expected) + + filterChunkRefsForSeries(input, mkRemovals(tc.removals)) + + require.Equal(t, expected, input) + }) + } +} + +func TestFilterChunkRefs(t *testing.T) { + mkInput := func(nSeries, chunksPerSeries int) *logproto.FilterChunkRefRequest { + res := &logproto.FilterChunkRefRequest{} + refs := make([]*logproto.GroupedChunkRefs, nSeries) + for i := range refs { + chks := make([]*logproto.ShortRef, chunksPerSeries) + for j := range chks { + chks[j] = &logproto.ShortRef{Checksum: uint32(j)} + } + refs[i] = &logproto.GroupedChunkRefs{ + Fingerprint: uint64(i), + Refs: chks, + } } - expected := &logproto.FilterChunkRefRequest{ - Refs: []*logproto.GroupedChunkRefs{ - {Fingerprint: 0x00, Tenant: "fake", Refs: []*logproto.ShortRef{ - {Checksum: 0x1}, - {Checksum: 0x3}, - {Checksum: 0x5}, - }}, - }, + res.Refs = refs + return res + } + + type instruction struct { + fp uint64 + checksums []uint32 + } + mkRemovals := func(xs [][]instruction) [][]v1.Output { + out := make([][]v1.Output, len(xs)) + for i, x := range xs { + out[i] = make([]v1.Output, len(x)) + for j, c := range x { + out[i][j] = v1.Output{ + Fp: model.Fingerprint(c.fp), + Removals: make(v1.ChunkRefs, len(c.checksums)), + } + for k, chk := range c.checksums { + out[i][j].Removals[k] = v1.ChunkRef{Checksum: chk} + } + } } - n := g.removeNotMatchingChunks(req, res) - require.Equal(t, 2, n) - require.Equal(t, expected, req) - }) + return out + } - t.Run("removing all chunks removed fingerprint ref", func(t *testing.T) { - req := &logproto.FilterChunkRefRequest{ - Refs: []*logproto.GroupedChunkRefs{ - {Fingerprint: 0x00, Tenant: "fake", Refs: []*logproto.ShortRef{ - {Checksum: 0x1}, - {Checksum: 0x2}, - {Checksum: 0x3}, - }}, - }, + mkResult := func(xs []instruction) *logproto.FilterChunkRefRequest { + out := &logproto.FilterChunkRefRequest{Refs: make([]*logproto.GroupedChunkRefs, len(xs))} + for i, x := range xs { + out.Refs[i] = &logproto.GroupedChunkRefs{ + Fingerprint: x.fp, + Refs: make([]*logproto.ShortRef, len(x.checksums)), + } + for j, c := range x.checksums { + out.Refs[i].Refs[j] = &logproto.ShortRef{Checksum: c} + } } - res := v1.Output{ - Fp: 0x00, Removals: v1.ChunkRefs{ - {Checksum: 0x1}, - {Checksum: 0x2}, - {Checksum: 0x2}, + return out + } + + for _, tc := range []struct { + desc string + input *logproto.FilterChunkRefRequest + removals [][]instruction + expected *logproto.FilterChunkRefRequest + }{ + { + desc: "no removals", + input: mkInput(2, 2), + expected: mkInput(2, 2), + }, + { + desc: "remove all", + input: mkInput(2, 2), + removals: [][]instruction{ + { + {fp: 0, checksums: []uint32{0, 1}}, + {fp: 1, checksums: []uint32{0, 1}}, + }, + }, + expected: mkInput(0, 0), + }, + { + desc: "remove every other series", + input: mkInput(4, 2), + removals: [][]instruction{ + { + {fp: 0, checksums: []uint32{0, 1}}, + {fp: 2, checksums: []uint32{0, 1}}, + }, + }, + expected: mkResult([]instruction{ + {fp: 1, checksums: []uint32{0, 1}}, + {fp: 3, checksums: []uint32{0, 1}}, + }), + }, + { + desc: "remove the last chunk for each series", + input: mkInput(4, 2), + removals: [][]instruction{ + { + {fp: 0, checksums: []uint32{1}}, + {fp: 1, checksums: []uint32{1}}, + {fp: 2, checksums: []uint32{1}}, + {fp: 3, checksums: []uint32{1}}, + }, }, + expected: mkResult([]instruction{ + {fp: 0, checksums: []uint32{0}}, + {fp: 1, checksums: []uint32{0}}, + {fp: 2, checksums: []uint32{0}}, + {fp: 3, checksums: []uint32{0}}, + }), + }, + { + desc: "remove the middle chunk for every other series", + input: mkInput(4, 3), + removals: [][]instruction{ + { + {fp: 0, checksums: []uint32{1}}, + {fp: 2, checksums: []uint32{1}}, + }, + }, + expected: mkResult([]instruction{ + {fp: 0, checksums: []uint32{0, 2}}, + {fp: 1, checksums: []uint32{0, 1, 2}}, + {fp: 2, checksums: []uint32{0, 2}}, + {fp: 3, checksums: []uint32{0, 1, 2}}, + }), + }, + { + desc: "remove the first chunk of the last series", + input: mkInput(4, 3), + removals: [][]instruction{ + { + {fp: 3, checksums: []uint32{0}}, + }, + }, + expected: mkResult([]instruction{ + {fp: 0, checksums: []uint32{0, 1, 2}}, + {fp: 1, checksums: []uint32{0, 1, 2}}, + {fp: 2, checksums: []uint32{0, 1, 2}}, + {fp: 3, checksums: []uint32{1, 2}}, + }), + }, + { + desc: "duplicate removals", + input: mkInput(4, 3), + removals: [][]instruction{ + { + {fp: 0, checksums: []uint32{0, 1}}, + {fp: 0, checksums: []uint32{0, 1, 2}}, + {fp: 1, checksums: []uint32{1}}, + {fp: 2, checksums: []uint32{1}}, + }, + }, + expected: mkResult([]instruction{ + {fp: 1, checksums: []uint32{0, 2}}, + {fp: 2, checksums: []uint32{0, 2}}, + {fp: 3, checksums: []uint32{0, 1, 2}}, + }), + }, + { + desc: "middle duplicates across 2 days", + input: mkInput(4, 3), + removals: [][]instruction{ + { + {fp: 0, checksums: []uint32{1}}, + {fp: 2, checksums: []uint32{1}}, + }, + { + {fp: 0, checksums: []uint32{1}}, + {fp: 2, checksums: []uint32{1}}, + }, + }, + expected: mkResult([]instruction{ + {fp: 0, checksums: []uint32{0, 2}}, + {fp: 1, checksums: []uint32{0, 1, 2}}, + {fp: 2, checksums: []uint32{0, 2}}, + {fp: 3, checksums: []uint32{0, 1, 2}}, + }), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + res := filterChunkRefs(tc.input, mkRemovals(tc.removals)) + require.Equal(t, tc.expected.Refs, res) + }) + } + +} + +func BenchmarkFilterChunkRefs(b *testing.B) { + nSeries := 1024 + chunksPerSeries := 10 + + mkInput := func() *logproto.FilterChunkRefRequest { + res := &logproto.FilterChunkRefRequest{} + + refs := make([]*logproto.GroupedChunkRefs, nSeries) + for i := range refs { + chks := make([]*logproto.ShortRef, chunksPerSeries) + for j := range chks { + chks[j] = &logproto.ShortRef{Checksum: uint32(j)} + } + refs[i] = &logproto.GroupedChunkRefs{ + Fingerprint: uint64(i), + Refs: chks, + } } - expected := &logproto.FilterChunkRefRequest{ - Refs: []*logproto.GroupedChunkRefs{}, + res.Refs = refs + return res + } + + // responses aren't mutated, so we add a pool to mitigate the alloc + // effect on the benchmark + var responseP sync.Pool + mkOutputs := func() *[]v1.Output { + // remove half the chunks from half the series, so 25% of the volume + outputs := make([]v1.Output, nSeries/2) + for i := range outputs { + output := v1.Output{ + Fp: model.Fingerprint(i * 2), + } + for j := 0; j < chunksPerSeries/2; j++ { + output.Removals = append(output.Removals, v1.ChunkRef{Checksum: uint32(j * 2)}) + } + + outputs[i] = output } - n := g.removeNotMatchingChunks(req, res) - require.Equal(t, 3, n) - require.Equal(t, expected, req) - }) + return &outputs + } + responseP.New = func() interface{} { + return mkOutputs() + } + + // Add comparison functions here to bench side by side + for _, tc := range []struct { + desc string + f func(req *logproto.FilterChunkRefRequest, responses []v1.Output) + }{ + { + desc: "filterChunkRefs", + f: func(req *logproto.FilterChunkRefRequest, responses []v1.Output) { + filterChunkRefs(req, [][]v1.Output{responses}) + }, + }, + } { + b.Run(tc.desc, func(b *testing.B) { + for i := 0; i < b.N; i++ { + req := mkInput() + ptr := responseP.Get().(*[]v1.Output) + resps := *ptr + + tc.f(req, resps) + + responseP.Put(ptr) + } + }) + } } diff --git a/pkg/bloomgateway/metrics.go b/pkg/bloomgateway/metrics.go index cb63e4df1018d..9058a90078ac5 100644 --- a/pkg/bloomgateway/metrics.go +++ b/pkg/bloomgateway/metrics.go @@ -14,7 +14,10 @@ type metrics struct { type serverMetrics struct { inflightRequests prometheus.Summary - chunkRemovals *prometheus.CounterVec + requestedSeries prometheus.Histogram + filteredSeries prometheus.Histogram + requestedChunks prometheus.Histogram + filteredChunks prometheus.Histogram } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -35,12 +38,34 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str MaxAge: time.Minute, AgeBuckets: 6, }), - chunkRemovals: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + requestedSeries: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "chunk_removals_total", - Help: "Total amount of removals received from the block querier partitioned by state. The state 'accepted' means that the removals are processed, the state 'dropped' means that the removals were received after the task context was done (e.g. client timeout, etc).", - }, []string{"state"}), + Name: "requested_series", + Help: "Total amount of series refs sent to bloom-gateway for querying", + Buckets: prometheus.ExponentialBucketsRange(1, 100e3, 10), + }), + filteredSeries: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "filtered_series", + Help: "Total amount of series refs filtered by bloom-gateway", + Buckets: prometheus.ExponentialBucketsRange(1, 100e3, 10), + }), + requestedChunks: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "requested_chunks", + Help: "Total amount of chunk refs sent to bloom-gateway for querying", + Buckets: prometheus.ExponentialBucketsRange(1, 100e3, 10), + }), + filteredChunks: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "filtered_chunks", + Help: "Total amount of chunk refs filtered by bloom-gateway", + Buckets: prometheus.ExponentialBucketsRange(1, 100e3, 10), + }), } } diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index 87efc92bdc64b..838ba891cb2f6 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "sort" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -364,6 +365,7 @@ func (s *SeriesWithOffset) Encode( previousFp model.Fingerprint, previousOffset BloomOffset, ) (model.Fingerprint, BloomOffset) { + sort.Sort(s.Chunks) // ensure order // delta encode fingerprint enc.PutBE64(uint64(s.Fingerprint - previousFp)) // delta encode offsets