From 8db007eed0b2748aa484b4add58a1cb5cd59d2e7 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 16 Jan 2024 16:24:12 +0100 Subject: [PATCH] Make sure error while processing block is only sent once to channel Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 26 ++++++++++++++++---------- pkg/bloomgateway/worker.go | 21 +++++++++------------ 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 414658365435a..766c05bab457c 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -339,6 +339,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk responses := responsesPool.Get(requestCount) defer responsesPool.Put(responses) +outer: for { select { case <-ctx.Done(): @@ -351,19 +352,24 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount)) // wait for all parts of the full response if len(responses) == requestCount { - for _, o := range responses { - if o.Removals.Len() == 0 { - continue - } - // we must not remove items from req.Refs as long as the worker may iterater over them - g.removeNotMatchingChunks(req, o) - } - g.metrics.addUnfilteredCount(numChunksUnfiltered) - g.metrics.addFilteredCount(len(req.Refs)) - return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil + break outer } } } + + for _, o := range responses { + if o.Removals.Len() == 0 { + continue + } + // we must not remove items from req.Refs as long as the worker may iterater over them + g.removeNotMatchingChunks(req, o) + } + + g.metrics.addUnfilteredCount(numChunksUnfiltered) + g.metrics.addFilteredCount(len(req.Refs)) + + level.Debug(g.logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs)) + return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil } func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) { diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 73f327043abd5..ce5add3c63f3d 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -221,20 +221,17 @@ func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant strin return w.store.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error { for _, b := range boundedRefs { if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp { - w.processBlock(bq, day, b.tasks) - return nil + return w.processBlock(bq, day, b.tasks) } } return nil }) } -func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) { +func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) error { schema, err := blockQuerier.Schema() if err != nil { - for _, t := range tasks { - t.ErrCh <- errors.Wrap(err, "failed to get block schema") - } + return err } tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0) @@ -243,13 +240,13 @@ func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, task start := time.Now() err = fq.Run() + duration := time.Since(start).Seconds() - label := "success" if err != nil { - label = "failure" - for _, t := range tasks { - t.ErrCh <- errors.Wrap(err, "failed to run chunk check") - } + w.metrics.bloomQueryLatency.WithLabelValues(w.id, "failure").Observe(duration) + return err } - w.metrics.bloomQueryLatency.WithLabelValues(w.id, label).Observe(time.Since(start).Seconds()) + + w.metrics.bloomQueryLatency.WithLabelValues(w.id, "success").Observe(duration) + return nil }