Skip to content

Commit

Permalink
Make sure error while processing block is only sent once to channel
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jan 17, 2024
1 parent c9c63ad commit e144b42
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 22 deletions.
26 changes: 16 additions & 10 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
responses := responsesPool.Get(requestCount)
defer responsesPool.Put(responses)

outer:
for {
select {
case <-ctx.Done():
Expand All @@ -351,19 +352,24 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
// wait for all parts of the full response
if len(responses) == requestCount {
for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
}
g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
break outer
}
}
}

for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
}

g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))

level.Debug(g.logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs))
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) {
Expand Down
21 changes: 9 additions & 12 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,17 @@ func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant strin
return w.store.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
w.processBlock(bq, day, b.tasks)
return nil
return w.processBlock(bq, day, b.tasks)
}
}
return nil
})
}

func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) {
func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) error {
schema, err := blockQuerier.Schema()
if err != nil {
for _, t := range tasks {
t.ErrCh <- errors.Wrap(err, "failed to get block schema")
}
return err
}

tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0)
Expand All @@ -243,13 +240,13 @@ func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, task

start := time.Now()
err = fq.Run()
duration := time.Since(start).Seconds()

label := "success"
if err != nil {
label = "failure"
for _, t := range tasks {
t.ErrCh <- errors.Wrap(err, "failed to run chunk check")
}
w.metrics.bloomQueryLatency.WithLabelValues(w.id, "failure").Observe(duration)
return err
}
w.metrics.bloomQueryLatency.WithLabelValues(w.id, label).Observe(time.Since(start).Seconds())

w.metrics.bloomQueryLatency.WithLabelValues(w.id, "success").Observe(duration)
return nil
}

0 comments on commit e144b42

Please sign in to comment.