From 36e22437e4e1645729f000a0a6ddf1c7bc81bc44 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 16 Feb 2024 15:59:00 +0100 Subject: [PATCH 1/8] Replace `goto label` Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 58f709f0be2f8..2c6731ac6bc35 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -381,8 +381,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk defer responsesPool.Put(responses) remaining := len(tasks) -outer: - for { + for remaining > 0 { select { case <-ctx.Done(): return nil, errors.Wrap(ctx.Err(), "request failed") @@ -393,9 +392,6 @@ outer: } responses = append(responses, task.responses...) remaining-- - if remaining == 0 { - break outer - } } } From c684721fbf841ab8ca4e95dc8fd704fa19cb468c Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 16 Feb 2024 17:20:03 +0100 Subject: [PATCH 2/8] Observe total/filtered series and total/filtered chunks in bloom gateway Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 84 ++++++++++++++++----------- pkg/bloomgateway/bloomgateway_test.go | 9 ++- 2 files changed, 58 insertions(+), 35 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 2c6731ac6bc35..c3677450408d6 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -82,10 +82,12 @@ var ( ) type metrics struct { - queueDuration prometheus.Histogram - inflightRequests prometheus.Summary - chunkRefsUnfiltered prometheus.Counter - chunkRefsFiltered prometheus.Counter + queueDuration prometheus.Histogram + inflightRequests prometheus.Summary + chunksTotal prometheus.Counter + chunksFiltered prometheus.Counter + seriesTotal prometheus.Counter + seriesFiltered prometheus.Counter } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -106,29 +108,33 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * MaxAge: time.Minute, AgeBuckets: 6, }), - chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + chunksTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "chunkrefs_pre_filtering", - Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.", + Name: "chunks_total", + Help: "Total amount of chunks pre filtering. Does not count chunks in failed requests.", }), - chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + chunksFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "chunkrefs_post_filtering", - Help: "Total amount of chunk refs post filtering.", + Name: "chunks_filtered", + Help: "Total amount of chunks that have been filtered out. Does not count chunks in failed requests.", + }), + seriesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_total", + Help: "Total amount of series pre filtering. Does not count series in failed requests.", + }), + seriesFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_filtered", + Help: "Total amount of series that have been filtered out. Does not count series in failed requests.", }), } } -func (m *metrics) addUnfilteredCount(n int) { - m.chunkRefsUnfiltered.Add(float64(n)) -} - -func (m *metrics) addFilteredCount(n int) { - m.chunkRefsFiltered.Add(float64(n)) -} - // SyncMap is a map structure which can be synchronized using the RWMutex type SyncMap[k comparable, v any] struct { sync.RWMutex @@ -324,12 +330,8 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, errors.New("from time must not be after through time") } - numChunksUnfiltered := len(req.Refs) - // Shortcut if request does not contain filters if len(req.Filters) == 0 { - g.metrics.addUnfilteredCount(numChunksUnfiltered) - g.metrics.addFilteredCount(len(req.Refs)) return &logproto.FilterChunkRefResponse{ ChunkRefs: req.Refs, }, nil @@ -395,17 +397,17 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk } } - for _, o := range responses { - if o.Removals.Len() == 0 { - continue - } - removeNotMatchingChunks(req, o, g.logger) - } + preFilterSeries := len(req.Refs) + g.metrics.seriesTotal.Add(float64(preFilterSeries)) - g.metrics.addUnfilteredCount(numChunksUnfiltered) - g.metrics.addFilteredCount(len(req.Refs)) + // TODO(chaudum): Don't wait for all responses before starting to filter chunks. + filtered := g.processResponses(req, responses) + g.metrics.chunksFiltered.Add(float64(filtered)) - level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs)) + postFilterSeries := len(req.Refs) + g.metrics.seriesFiltered.Add(float64(preFilterSeries - postFilterSeries)) + + level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered) return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil } @@ -437,7 +439,18 @@ func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log } } -func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, logger log.Logger) { +func (g *Gateway) processResponses(req *logproto.FilterChunkRefRequest, responses []v1.Output) (filtered int) { + for _, o := range responses { + if o.Removals.Len() == 0 { + continue + } + filtered += g.removeNotMatchingChunks(req, o) + } + return +} + +func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) (filtered int) { + // binary search index of fingerprint idx := sort.Search(len(req.Refs), func(i int) bool { return req.Refs[i].Fingerprint >= uint64(res.Fp) @@ -445,13 +458,15 @@ func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, // fingerprint not found if idx >= len(req.Refs) { - level.Error(logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp)) + level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp)) return } // if all chunks of a fingerprint are are removed // then remove the whole group from the response if len(req.Refs[idx].Refs) == res.Removals.Len() { + filtered += len(req.Refs[idx].Refs) + req.Refs[idx] = nil // avoid leaking pointer req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...) return @@ -461,10 +476,13 @@ func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, toRemove := res.Removals[i] for j := 0; j < len(req.Refs[idx].Refs); j++ { if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum { + filtered += 1 + req.Refs[idx].Refs[j] = nil // avoid leaking pointer req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...) j-- // since we removed the current item at index, we have to redo the same index } } } + return } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index f07e014b84dc3..fede86484a96b 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -423,6 +423,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { } func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { + g := &Gateway{ + logger: log.NewNopLogger(), + } t.Run("removing chunks partially", func(t *testing.T) { req := &logproto.FilterChunkRefRequest{ Refs: []*logproto.GroupedChunkRefs{ @@ -450,7 +453,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { }}, }, } - removeNotMatchingChunks(req, res, log.NewNopLogger()) + n := g.removeNotMatchingChunks(req, res) + require.Equal(t, 2, n) require.Equal(t, expected, req) }) @@ -474,7 +478,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { expected := &logproto.FilterChunkRefRequest{ Refs: []*logproto.GroupedChunkRefs{}, } - removeNotMatchingChunks(req, res, log.NewNopLogger()) + n := g.removeNotMatchingChunks(req, res) + require.Equal(t, 3, n) require.Equal(t, expected, req) }) From 0df5ad14d0a04004c261cbac4b3b2a121ad3b1f4 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 19 Feb 2024 08:39:44 +0100 Subject: [PATCH 3/8] Move chunks/series metrics into bloom querier Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 31 ------------------ pkg/bloomgateway/querier.go | 55 ++++++++++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 33 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index c3677450408d6..81acbf23add5e 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -84,10 +84,6 @@ var ( type metrics struct { queueDuration prometheus.Histogram inflightRequests prometheus.Summary - chunksTotal prometheus.Counter - chunksFiltered prometheus.Counter - seriesTotal prometheus.Counter - seriesFiltered prometheus.Counter } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -108,30 +104,6 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * MaxAge: time.Minute, AgeBuckets: 6, }), - chunksTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunks_total", - Help: "Total amount of chunks pre filtering. Does not count chunks in failed requests.", - }), - chunksFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunks_filtered", - Help: "Total amount of chunks that have been filtered out. Does not count chunks in failed requests.", - }), - seriesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "series_total", - Help: "Total amount of series pre filtering. Does not count series in failed requests.", - }), - seriesFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "series_filtered", - Help: "Total amount of series that have been filtered out. Does not count series in failed requests.", - }), } } @@ -398,14 +370,11 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk } preFilterSeries := len(req.Refs) - g.metrics.seriesTotal.Add(float64(preFilterSeries)) // TODO(chaudum): Don't wait for all responses before starting to filter chunks. filtered := g.processResponses(req, responses) - g.metrics.chunksFiltered.Add(float64(filtered)) postFilterSeries := len(req.Refs) - g.metrics.seriesFiltered.Add(float64(preFilterSeries - postFilterSeries)) level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered) return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index 4b2366e83f287..f6588d5317f2b 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -5,17 +5,56 @@ import ( "sort" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" ) +type querierMetrics struct { + chunksTotal prometheus.Counter + chunksFiltered prometheus.Counter + seriesTotal prometheus.Counter + seriesFiltered prometheus.Counter +} + +func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem string) *querierMetrics { + return &querierMetrics{ + chunksTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunks_total", + Help: "Total amount of chunks pre filtering. Does not count chunks in failed requests.", + }), + chunksFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunks_filtered", + Help: "Total amount of chunks that have been filtered out. Does not count chunks in failed requests.", + }), + seriesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_total", + Help: "Total amount of series pre filtering. Does not count series in failed requests.", + }), + seriesFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_filtered", + Help: "Total amount of series that have been filtered out. Does not count series in failed requests.", + }), + } +} + // BloomQuerier is a store-level abstraction on top of Client // It is used by the index gateway to filter ChunkRefs based on given line fiter expression. type BloomQuerier struct { - c Client - logger log.Logger + c Client + logger log.Logger + metrics *querierMetrics } func NewQuerier(c Client, logger log.Logger) *BloomQuerier { @@ -37,6 +76,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from defer groupedChunksRefPool.Put(grouped) grouped = groupChunkRefs(chunkRefs, grouped) + preFilterChunks := len(chunkRefs) + preFilterSeries := len(grouped) + refs, err := bq.c.FilterChunks(ctx, tenant, from, through, grouped, filters...) if err != nil { return nil, err @@ -55,6 +97,15 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from }) } } + + postFilterChunks := len(result) + postFilterSeries := len(refs) + + bq.metrics.chunksTotal.Add(float64(preFilterChunks)) + bq.metrics.chunksFiltered.Add(float64(preFilterChunks - postFilterChunks)) + bq.metrics.seriesTotal.Add(float64(preFilterSeries)) + bq.metrics.seriesFiltered.Add(float64(preFilterSeries - postFilterSeries)) + return result, nil } From f5a4b5351df935df990d087f66a0299738cd1291 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 19 Feb 2024 08:59:42 +0100 Subject: [PATCH 4/8] Observe chunk removals from block querier Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 81acbf23add5e..ad7a5b43ac19a 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -84,6 +84,7 @@ var ( type metrics struct { queueDuration prometheus.Histogram inflightRequests prometheus.Summary + chunkRemovals *prometheus.CounterVec } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -104,6 +105,12 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * MaxAge: time.Minute, AgeBuckets: 6, }), + chunkRemovals: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunks_filtered", + Help: "Total amount of removals received from the block querier.", + }, []string{"state"}), } } @@ -348,7 +355,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk // When enqueuing, we also add the task to the pending tasks g.pendingTasks.Add(task.ID, task) }) - go consumeTask(ctx, task, tasksCh, logger) + go g.consumeTask(ctx, task, tasksCh) } responses := responsesPool.Get(numSeries) @@ -386,16 +393,18 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk // task is closed by the worker. // Once the tasks is closed, it will send the task with the results from the // block querier to the supplied task channel. -func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log.Logger) { - logger = log.With(logger, "task", task.ID) +func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Task) { + logger := log.With(g.logger, "task", task.ID) for res := range task.resCh { select { case <-ctx.Done(): level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) + g.metrics.chunkRemovals.WithLabelValues("dropped").Add(float64(res.Removals.Len())) default: level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) task.responses = append(task.responses, res) + g.metrics.chunkRemovals.WithLabelValues("accepted").Add(float64(res.Removals.Len())) } } From a8aed175773c03d4d661e0d9d65c9d3d7f2bd2b0 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 19 Feb 2024 09:00:31 +0100 Subject: [PATCH 5/8] Fix incorrect observation of failed tasks Signed-off-by: Christian Haudum --- pkg/bloomgateway/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 5c57c0a2e4952..ec44081c1b30c 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -163,7 +163,7 @@ func (w *worker) running(_ context.Context) error { err = p.run(taskCtx, tasks) if err != nil { - w.metrics.processDuration.WithLabelValues(w.id, labelSuccess).Observe(time.Since(start).Seconds()) + w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds()) w.metrics.tasksProcessed.WithLabelValues(w.id, labelFailure).Add(float64(len(tasks))) level.Error(w.logger).Log("msg", "failed to process tasks", "err", err) } else { From 849cff0d240b2168b48224ac1df95fa2233a9b16 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 19 Feb 2024 09:20:49 +0100 Subject: [PATCH 6/8] End counter metric with _total Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index ad7a5b43ac19a..344fa24470df1 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -108,7 +108,7 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * chunkRemovals: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "chunks_filtered", + Name: "chunk_removals_total", Help: "Total amount of removals received from the block querier.", }, []string{"state"}), } From c3940e73b804c0cc8863eebf3ab11a4f48cf4355 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 19 Feb 2024 09:22:22 +0100 Subject: [PATCH 7/8] fixup! End counter metric with _total Signed-off-by: Christian Haudum --- pkg/bloomgateway/querier.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index f6588d5317f2b..02608bfdf71c4 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -31,7 +31,7 @@ func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem st chunksFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "chunks_filtered", + Name: "chunks_filtered_total", Help: "Total amount of chunks that have been filtered out. Does not count chunks in failed requests.", }), seriesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ @@ -43,7 +43,7 @@ func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem st seriesFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "series_filtered", + Name: "series_filtered_total", Help: "Total amount of series that have been filtered out. Does not count series in failed requests.", }), } From 0431acd2419d1752529913b9db0f67c2ba46711c Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 19 Feb 2024 09:50:03 +0100 Subject: [PATCH 8/8] Improve help text for chunk removals metric Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 344fa24470df1..4e36e5ce3018e 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -109,7 +109,7 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * Namespace: namespace, Subsystem: subsystem, Name: "chunk_removals_total", - Help: "Total amount of removals received from the block querier.", + Help: "Total amount of removals received from the block querier partitioned by state. The state 'accepted' means that the removals are processed, the state 'dropped' means that the removals were received after the task context was done (e.g. client timeout, etc).", }, []string{"state"}), } }