From 9f86473b0a2dedb302c2dcc93e123ea3cde62303 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 19 Feb 2024 10:04:14 +0100 Subject: [PATCH] Various bloom gateway instrumentation changes (#11983) See individual commit messages for details. * Improve help text for chunk removals metric * End counter metric with _total * Fix incorrect observation of failed tasks * Observe chunk removals from block querier * Move chunks/series metrics into bloom querier * Observe total/filtered series and total/filtered chunks in bloom gateway Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 82 ++++++++++++--------------- pkg/bloomgateway/bloomgateway_test.go | 9 ++- pkg/bloomgateway/querier.go | 55 +++++++++++++++++- pkg/bloomgateway/worker.go | 2 +- 4 files changed, 98 insertions(+), 50 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 58f709f0be2f8..4e36e5ce3018e 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -82,10 +82,9 @@ var ( ) type metrics struct { - queueDuration prometheus.Histogram - inflightRequests prometheus.Summary - chunkRefsUnfiltered prometheus.Counter - chunkRefsFiltered prometheus.Counter + queueDuration prometheus.Histogram + inflightRequests prometheus.Summary + chunkRemovals *prometheus.CounterVec } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -106,29 +105,15 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * MaxAge: time.Minute, AgeBuckets: 6, }), - chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + chunkRemovals: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "chunkrefs_pre_filtering", - Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.", - }), - chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunkrefs_post_filtering", - Help: "Total amount of chunk refs post filtering.", - }), + Name: "chunk_removals_total", + Help: "Total amount of removals received from the block querier partitioned by state. The state 'accepted' means that the removals are processed, the state 'dropped' means that the removals were received after the task context was done (e.g. client timeout, etc).", + }, []string{"state"}), } } -func (m *metrics) addUnfilteredCount(n int) { - m.chunkRefsUnfiltered.Add(float64(n)) -} - -func (m *metrics) addFilteredCount(n int) { - m.chunkRefsFiltered.Add(float64(n)) -} - // SyncMap is a map structure which can be synchronized using the RWMutex type SyncMap[k comparable, v any] struct { sync.RWMutex @@ -324,12 +309,8 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, errors.New("from time must not be after through time") } - numChunksUnfiltered := len(req.Refs) - // Shortcut if request does not contain filters if len(req.Filters) == 0 { - g.metrics.addUnfilteredCount(numChunksUnfiltered) - g.metrics.addFilteredCount(len(req.Refs)) return &logproto.FilterChunkRefResponse{ ChunkRefs: req.Refs, }, nil @@ -374,15 +355,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk // When enqueuing, we also add the task to the pending tasks g.pendingTasks.Add(task.ID, task) }) - go consumeTask(ctx, task, tasksCh, logger) + go g.consumeTask(ctx, task, tasksCh) } responses := responsesPool.Get(numSeries) defer responsesPool.Put(responses) remaining := len(tasks) -outer: - for { + for remaining > 0 { select { case <-ctx.Done(): return nil, errors.Wrap(ctx.Err(), "request failed") @@ -393,23 +373,17 @@ outer: } responses = append(responses, task.responses...) remaining-- - if remaining == 0 { - break outer - } } } - for _, o := range responses { - if o.Removals.Len() == 0 { - continue - } - removeNotMatchingChunks(req, o, g.logger) - } + preFilterSeries := len(req.Refs) - g.metrics.addUnfilteredCount(numChunksUnfiltered) - g.metrics.addFilteredCount(len(req.Refs)) + // TODO(chaudum): Don't wait for all responses before starting to filter chunks. + filtered := g.processResponses(req, responses) - level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs)) + postFilterSeries := len(req.Refs) + + level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered) return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil } @@ -419,16 +393,18 @@ outer: // task is closed by the worker. // Once the tasks is closed, it will send the task with the results from the // block querier to the supplied task channel. -func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log.Logger) { - logger = log.With(logger, "task", task.ID) +func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Task) { + logger := log.With(g.logger, "task", task.ID) for res := range task.resCh { select { case <-ctx.Done(): level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) + g.metrics.chunkRemovals.WithLabelValues("dropped").Add(float64(res.Removals.Len())) default: level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) task.responses = append(task.responses, res) + g.metrics.chunkRemovals.WithLabelValues("accepted").Add(float64(res.Removals.Len())) } } @@ -441,7 +417,18 @@ func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log } } -func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, logger log.Logger) { +func (g *Gateway) processResponses(req *logproto.FilterChunkRefRequest, responses []v1.Output) (filtered int) { + for _, o := range responses { + if o.Removals.Len() == 0 { + continue + } + filtered += g.removeNotMatchingChunks(req, o) + } + return +} + +func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) (filtered int) { + // binary search index of fingerprint idx := sort.Search(len(req.Refs), func(i int) bool { return req.Refs[i].Fingerprint >= uint64(res.Fp) @@ -449,13 +436,15 @@ func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, // fingerprint not found if idx >= len(req.Refs) { - level.Error(logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp)) + level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp)) return } // if all chunks of a fingerprint are are removed // then remove the whole group from the response if len(req.Refs[idx].Refs) == res.Removals.Len() { + filtered += len(req.Refs[idx].Refs) + req.Refs[idx] = nil // avoid leaking pointer req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...) return @@ -465,10 +454,13 @@ func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, toRemove := res.Removals[i] for j := 0; j < len(req.Refs[idx].Refs); j++ { if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum { + filtered += 1 + req.Refs[idx].Refs[j] = nil // avoid leaking pointer req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...) j-- // since we removed the current item at index, we have to redo the same index } } } + return } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index f07e014b84dc3..fede86484a96b 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -423,6 +423,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { } func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { + g := &Gateway{ + logger: log.NewNopLogger(), + } t.Run("removing chunks partially", func(t *testing.T) { req := &logproto.FilterChunkRefRequest{ Refs: []*logproto.GroupedChunkRefs{ @@ -450,7 +453,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { }}, }, } - removeNotMatchingChunks(req, res, log.NewNopLogger()) + n := g.removeNotMatchingChunks(req, res) + require.Equal(t, 2, n) require.Equal(t, expected, req) }) @@ -474,7 +478,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) { expected := &logproto.FilterChunkRefRequest{ Refs: []*logproto.GroupedChunkRefs{}, } - removeNotMatchingChunks(req, res, log.NewNopLogger()) + n := g.removeNotMatchingChunks(req, res) + require.Equal(t, 3, n) require.Equal(t, expected, req) }) diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index 4b2366e83f287..02608bfdf71c4 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -5,17 +5,56 @@ import ( "sort" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" ) +type querierMetrics struct { + chunksTotal prometheus.Counter + chunksFiltered prometheus.Counter + seriesTotal prometheus.Counter + seriesFiltered prometheus.Counter +} + +func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem string) *querierMetrics { + return &querierMetrics{ + chunksTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunks_total", + Help: "Total amount of chunks pre filtering. Does not count chunks in failed requests.", + }), + chunksFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunks_filtered_total", + Help: "Total amount of chunks that have been filtered out. Does not count chunks in failed requests.", + }), + seriesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_total", + Help: "Total amount of series pre filtering. Does not count series in failed requests.", + }), + seriesFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "series_filtered_total", + Help: "Total amount of series that have been filtered out. Does not count series in failed requests.", + }), + } +} + // BloomQuerier is a store-level abstraction on top of Client // It is used by the index gateway to filter ChunkRefs based on given line fiter expression. type BloomQuerier struct { - c Client - logger log.Logger + c Client + logger log.Logger + metrics *querierMetrics } func NewQuerier(c Client, logger log.Logger) *BloomQuerier { @@ -37,6 +76,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from defer groupedChunksRefPool.Put(grouped) grouped = groupChunkRefs(chunkRefs, grouped) + preFilterChunks := len(chunkRefs) + preFilterSeries := len(grouped) + refs, err := bq.c.FilterChunks(ctx, tenant, from, through, grouped, filters...) if err != nil { return nil, err @@ -55,6 +97,15 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from }) } } + + postFilterChunks := len(result) + postFilterSeries := len(refs) + + bq.metrics.chunksTotal.Add(float64(preFilterChunks)) + bq.metrics.chunksFiltered.Add(float64(preFilterChunks - postFilterChunks)) + bq.metrics.seriesTotal.Add(float64(preFilterSeries)) + bq.metrics.seriesFiltered.Add(float64(preFilterSeries - postFilterSeries)) + return result, nil } diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 5c57c0a2e4952..ec44081c1b30c 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -163,7 +163,7 @@ func (w *worker) running(_ context.Context) error { err = p.run(taskCtx, tasks) if err != nil { - w.metrics.processDuration.WithLabelValues(w.id, labelSuccess).Observe(time.Since(start).Seconds()) + w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds()) w.metrics.tasksProcessed.WithLabelValues(w.id, labelFailure).Add(float64(len(tasks))) level.Error(w.logger).Log("msg", "failed to process tasks", "err", err) } else {