From ca667aa66e0e3d1b7388813fc52ac79e112f8306 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 27 Mar 2024 09:53:33 +0100 Subject: [PATCH] chore(bloom-gw): Cleanup tracing implementation (#12368) The huge amount of `bloomgateway.ProcessTask` spans caused problem with Tempo's span limit, resulting in dropped/dangling spans. Since the processing time is now recorded also in a metrics.go-like stats log line, we can remove the spans and only add a log event with the summary to the main span of the request handler. This PR also unifies the event logging in the index gateway, and adds the real processing time (not aggregated processing time) to the request stats. --- Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 24 +++++------ pkg/bloomgateway/processor.go | 35 ++++++++-------- pkg/bloomgateway/processor_test.go | 4 ++ pkg/bloomgateway/stats.go | 40 ++++++++++++++++--- .../indexshipper/indexgateway/gateway.go | 25 ++++-------- 5 files changed, 76 insertions(+), 52 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 0bd8bf2895f05..482d6d8ef8660 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -51,6 +51,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -61,7 +62,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/constants" - "github.com/grafana/loki/pkg/util/spanlogger" ) var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring") @@ -200,15 +200,13 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, err } - sp, ctx := spanlogger.NewWithLogger( - ctx, - log.With(g.logger, "tenant", tenantID), - "bloomgateway.FilterChunkRefs", - ) + logger := log.With(g.logger, "tenant", tenantID) + sp, ctx := opentracing.StartSpanFromContext(ctx, "bloomgateway.FilterChunkRefs") stats, ctx := ContextWithEmptyStats(ctx) defer func() { - level.Info(sp).Log(stats.KVArgs()...) + level.Info(logger).Log(stats.KVArgs()...) + sp.LogKV(stats.KVArgs()...) sp.Finish() }() @@ -249,7 +247,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk }, nil } - sp.Log( + sp.LogKV( "filters", len(filters), "days", len(seriesByDay), "series_requested", len(req.Refs), @@ -279,7 +277,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk for _, task := range tasks { task := task task.enqueueTime = time.Now() - level.Info(sp).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series)) + level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series)) // TODO(owen-d): gracefully handle full queues if err := g.queue.Enqueue(tenantID, nil, task, func() { @@ -293,7 +291,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk go g.consumeTask(ctx, task, tasksCh) } - sp.Log("msg", "enqueued tasks", "duration", time.Since(queueStart).String()) + sp.LogKV("msg", "enqueued tasks", "duration", time.Since(queueStart).String()) remaining := len(tasks) @@ -310,7 +308,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk stats.Status = "cancel" return nil, errors.Wrap(ctx.Err(), "request failed") case task := <-tasksCh: - level.Info(sp).Log("msg", "task done", "task", task.ID, "err", task.Err()) + level.Info(logger).Log("msg", "task done", "task", task.ID, "err", task.Err()) if task.Err() != nil { stats.Status = labelFailure return nil, errors.Wrap(task.Err(), "request failed") @@ -320,7 +318,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk } } - sp.Log("msg", "received all responses") + sp.LogKV("msg", "received all responses") start := time.Now() filtered := filterChunkRefs(req, responses) @@ -348,7 +346,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk stats.ChunksRequested = preFilterChunks stats.ChunksFiltered = preFilterChunks - postFilterChunks - sp.Log("msg", "return filtered chunk refs") + sp.LogKV("msg", "return filtered chunk refs") return &logproto.FilterChunkRefResponse{ChunkRefs: filtered}, nil } diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 74cb19d06911b..5cf805b11a74d 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -119,10 +119,19 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config. return err } - return p.processBlocks(ctx, bqs, data) + start = time.Now() + res := p.processBlocks(ctx, bqs, data) + duration = time.Since(start) + + for _, t := range tasks { + FromContext(t.ctx).AddProcessingTime(duration) + } + + return res } func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.CloseableBlockQuerier, data []blockWithTasks) error { + defer func() { for i := range bqs { if bqs[i] == nil { @@ -162,22 +171,12 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0) iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks)) - // collect spans & run single defer to avoid blowing call stack - // if there are many tasks - spans := make([]opentracing.Span, 0, len(tasks)) - defer func() { - for _, sp := range spans { - sp.Finish() - } - }() - for _, task := range tasks { - // add spans for each task context for this block - sp, _ := opentracing.StartSpanFromContext(task.ctx, "bloomgateway.ProcessBlock") - spans = append(spans, sp) - md, _ := blockQuerier.Metadata() - blk := bloomshipper.BlockRefFrom(task.Tenant, task.table.String(), md) - sp.LogKV("block", blk.String()) + if sp := opentracing.SpanFromContext(task.ctx); sp != nil { + md, _ := blockQuerier.Metadata() + blk := bloomshipper.BlockRefFrom(task.Tenant, task.table.String(), md) + sp.LogKV("process block", blk.String()) + } it := v1.NewPeekingIter(task.RequestIter(tokenizer)) iters = append(iters, it) @@ -196,7 +195,9 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie } for _, task := range tasks { - FromContext(task.ctx).AddProcessingTime(duration) + stats := FromContext(task.ctx) + stats.AddTotalProcessingTime(duration) + stats.IncProcessedBlocks() } return err diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index d9e6a799045e3..d70451a127867 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -88,6 +89,9 @@ func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef func TestProcessor(t *testing.T) { ctx := context.Background() + sp, ctx := opentracing.StartSpanFromContext(ctx, "TestProcessor") + t.Cleanup(sp.Finish) + tenant := "fake" now := mktime("2024-01-27 12:00") metrics := newWorkerMetrics(prometheus.NewPedanticRegistry(), constants.Loki, "bloom_gatway") diff --git a/pkg/bloomgateway/stats.go b/pkg/bloomgateway/stats.go index a855547b9124c..09f78841e544a 100644 --- a/pkg/bloomgateway/stats.go +++ b/pkg/bloomgateway/stats.go @@ -8,10 +8,15 @@ import ( ) type Stats struct { - Status string - NumTasks, NumFilters int - ChunksRequested, ChunksFiltered, SeriesRequested, SeriesFiltered int - QueueTime, MetasFetchTime, BlocksFetchTime, ProcessingTime, PostProcessingTime atomic.Duration + Status string + NumTasks, NumFilters int + ChunksRequested, ChunksFiltered int + SeriesRequested, SeriesFiltered int + QueueTime *atomic.Duration + MetasFetchTime, BlocksFetchTime *atomic.Duration + ProcessingTime, TotalProcessingTime *atomic.Duration + PostProcessingTime *atomic.Duration + ProcessedBlocks *atomic.Int32 } type statsKey int @@ -20,7 +25,16 @@ var ctxKey = statsKey(0) // ContextWithEmptyStats returns a context with empty stats. func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context) { - stats := &Stats{Status: "unknown"} + stats := &Stats{ + Status: "unknown", + ProcessedBlocks: atomic.NewInt32(0), + QueueTime: atomic.NewDuration(0), + MetasFetchTime: atomic.NewDuration(0), + BlocksFetchTime: atomic.NewDuration(0), + ProcessingTime: atomic.NewDuration(0), + TotalProcessingTime: atomic.NewDuration(0), + PostProcessingTime: atomic.NewDuration(0), + } ctx = context.WithValue(ctx, ctxKey, stats) return stats, ctx } @@ -56,6 +70,8 @@ func (s *Stats) KVArgs() []any { "msg", "stats-report", "status", s.Status, "tasks", s.NumTasks, + "filters", s.NumFilters, + "blocks_processed", s.ProcessedBlocks.Load(), "series_requested", s.SeriesRequested, "series_filtered", s.SeriesFiltered, "chunks_requested", s.ChunksRequested, @@ -99,9 +115,23 @@ func (s *Stats) AddProcessingTime(t time.Duration) { s.ProcessingTime.Add(t) } +func (s *Stats) AddTotalProcessingTime(t time.Duration) { + if s == nil { + return + } + s.TotalProcessingTime.Add(t) +} + func (s *Stats) AddPostProcessingTime(t time.Duration) { if s == nil { return } s.PostProcessingTime.Add(t) } + +func (s *Stats) IncProcessedBlocks() { + if s == nil { + return + } + s.ProcessedBlocks.Inc() +} diff --git a/pkg/storage/stores/shipper/indexshipper/indexgateway/gateway.go b/pkg/storage/stores/shipper/indexshipper/indexgateway/gateway.go index 350a95e8f988b..60bf8f1f7c113 100644 --- a/pkg/storage/stores/shipper/indexshipper/indexgateway/gateway.go +++ b/pkg/storage/stores/shipper/indexshipper/indexgateway/gateway.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -355,8 +356,8 @@ func (g *Gateway) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (* func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.IndexGateway_GetShardsServer) error { ctx := server.Context() - log, _ := spanlogger.New(context.Background(), "IndexGateway.GetShards") - defer log.Finish() + sp, ctx := opentracing.StartSpanFromContext(ctx, "indexgateway.GetShards") + defer sp.Finish() instanceID, err := tenant.TenantID(ctx) if err != nil { @@ -412,11 +413,8 @@ func (g *Gateway) getShardsWithBlooms( // as getting it _very_ wrong could harm some cache locality benefits on the bloom-gws by // sending multiple requests to the entire keyspace). - sp, ctx := spanlogger.NewWithLogger( - ctx, - log.With(g.log, "tenant", instanceID), - "indexgateway.getShardsWithBlooms", - ) + logger := log.With(g.log, "tenant", instanceID) + sp, ctx := opentracing.StartSpanFromContext(ctx, "indexgateway.getShardsWithBlooms") defer sp.Finish() // 1) for all bounds, get chunk refs @@ -473,17 +471,10 @@ func (g *Gateway) getShardsWithBlooms( resp.Shards = shards } - level.Debug(g.log).Log( - "msg", "shards response", - "total_chunks", statistics.Index.TotalChunks, - "post_filter_chunks", statistics.Index.PostFilterChunks, - "shards", len(resp.Shards), - "query", req.Query, - "target_bytes_per_shard", datasize.ByteSize(req.TargetBytesPerShard).HumanReadable(), - ) + sp.LogKV("msg", "send shards response", "shards", len(resp.Shards)) - level.Debug(sp).Log( - "msg", "shards response", + level.Debug(logger).Log( + "msg", "send shards response", "total_chunks", statistics.Index.TotalChunks, "post_filter_chunks", statistics.Index.PostFilterChunks, "shards", len(resp.Shards),