Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelHollands authored Feb 19, 2024
2 parents ec9f045 + 9f86473 commit 6aca508
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 50 deletions.
82 changes: 37 additions & 45 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ var (
)

type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
chunkRefsUnfiltered prometheus.Counter
chunkRefsFiltered prometheus.Counter
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
chunkRemovals *prometheus.CounterVec
}

func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
Expand All @@ -106,29 +105,15 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *
MaxAge: time.Minute,
AgeBuckets: 6,
}),
chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
chunkRemovals: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_pre_filtering",
Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.",
}),
chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_post_filtering",
Help: "Total amount of chunk refs post filtering.",
}),
Name: "chunk_removals_total",
Help: "Total amount of removals received from the block querier partitioned by state. The state 'accepted' means that the removals are processed, the state 'dropped' means that the removals were received after the task context was done (e.g. client timeout, etc).",
}, []string{"state"}),
}
}

func (m *metrics) addUnfilteredCount(n int) {
m.chunkRefsUnfiltered.Add(float64(n))
}

func (m *metrics) addFilteredCount(n int) {
m.chunkRefsFiltered.Add(float64(n))
}

// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
Expand Down Expand Up @@ -324,12 +309,8 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, errors.New("from time must not be after through time")
}

numChunksUnfiltered := len(req.Refs)

// Shortcut if request does not contain filters
if len(req.Filters) == 0 {
g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
Expand Down Expand Up @@ -374,15 +355,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
})
go consumeTask(ctx, task, tasksCh, logger)
go g.consumeTask(ctx, task, tasksCh)
}

responses := responsesPool.Get(numSeries)
defer responsesPool.Put(responses)
remaining := len(tasks)

outer:
for {
for remaining > 0 {
select {
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "request failed")
Expand All @@ -393,23 +373,17 @@ outer:
}
responses = append(responses, task.responses...)
remaining--
if remaining == 0 {
break outer
}
}
}

for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
removeNotMatchingChunks(req, o, g.logger)
}
preFilterSeries := len(req.Refs)

g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
// TODO(chaudum): Don't wait for all responses before starting to filter chunks.
filtered := g.processResponses(req, responses)

level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs))
postFilterSeries := len(req.Refs)

level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered)
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

Expand All @@ -419,16 +393,18 @@ outer:
// task is closed by the worker.
// Once the tasks is closed, it will send the task with the results from the
// block querier to the supplied task channel.
func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log.Logger) {
logger = log.With(logger, "task", task.ID)
func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Task) {
logger := log.With(g.logger, "task", task.ID)

for res := range task.resCh {
select {
case <-ctx.Done():
level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
g.metrics.chunkRemovals.WithLabelValues("dropped").Add(float64(res.Removals.Len()))
default:
level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
task.responses = append(task.responses, res)
g.metrics.chunkRemovals.WithLabelValues("accepted").Add(float64(res.Removals.Len()))
}
}

Expand All @@ -441,21 +417,34 @@ func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log
}
}

func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, logger log.Logger) {
func (g *Gateway) processResponses(req *logproto.FilterChunkRefRequest, responses []v1.Output) (filtered int) {
for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
filtered += g.removeNotMatchingChunks(req, o)
}
return
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) (filtered int) {

// binary search index of fingerprint
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})

// fingerprint not found
if idx >= len(req.Refs) {
level.Error(logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
}

// if all chunks of a fingerprint are are removed
// then remove the whole group from the response
if len(req.Refs[idx].Refs) == res.Removals.Len() {
filtered += len(req.Refs[idx].Refs)

req.Refs[idx] = nil // avoid leaking pointer
req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...)
return
Expand All @@ -465,10 +454,13 @@ func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output,
toRemove := res.Removals[i]
for j := 0; j < len(req.Refs[idx].Refs); j++ {
if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum {
filtered += 1

req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
j-- // since we removed the current item at index, we have to redo the same index
}
}
}
return
}
9 changes: 7 additions & 2 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
}

func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
g := &Gateway{
logger: log.NewNopLogger(),
}
t.Run("removing chunks partially", func(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{
Expand Down Expand Up @@ -450,7 +453,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
}},
},
}
removeNotMatchingChunks(req, res, log.NewNopLogger())
n := g.removeNotMatchingChunks(req, res)
require.Equal(t, 2, n)
require.Equal(t, expected, req)
})

Expand All @@ -474,7 +478,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
expected := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{},
}
removeNotMatchingChunks(req, res, log.NewNopLogger())
n := g.removeNotMatchingChunks(req, res)
require.Equal(t, 3, n)
require.Equal(t, expected, req)
})

Expand Down
55 changes: 53 additions & 2 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,56 @@ import (
"sort"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
)

type querierMetrics struct {
chunksTotal prometheus.Counter
chunksFiltered prometheus.Counter
seriesTotal prometheus.Counter
seriesFiltered prometheus.Counter
}

func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem string) *querierMetrics {
return &querierMetrics{
chunksTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunks_total",
Help: "Total amount of chunks pre filtering. Does not count chunks in failed requests.",
}),
chunksFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunks_filtered_total",
Help: "Total amount of chunks that have been filtered out. Does not count chunks in failed requests.",
}),
seriesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_total",
Help: "Total amount of series pre filtering. Does not count series in failed requests.",
}),
seriesFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_filtered_total",
Help: "Total amount of series that have been filtered out. Does not count series in failed requests.",
}),
}
}

// BloomQuerier is a store-level abstraction on top of Client
// It is used by the index gateway to filter ChunkRefs based on given line fiter expression.
type BloomQuerier struct {
c Client
logger log.Logger
c Client
logger log.Logger
metrics *querierMetrics
}

func NewQuerier(c Client, logger log.Logger) *BloomQuerier {
Expand All @@ -37,6 +76,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
defer groupedChunksRefPool.Put(grouped)
grouped = groupChunkRefs(chunkRefs, grouped)

preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)

refs, err := bq.c.FilterChunks(ctx, tenant, from, through, grouped, filters...)
if err != nil {
return nil, err
Expand All @@ -55,6 +97,15 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
})
}
}

postFilterChunks := len(result)
postFilterSeries := len(refs)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
bq.metrics.chunksFiltered.Add(float64(preFilterChunks - postFilterChunks))
bq.metrics.seriesTotal.Add(float64(preFilterSeries))
bq.metrics.seriesFiltered.Add(float64(preFilterSeries - postFilterSeries))

return result, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (w *worker) running(_ context.Context) error {
err = p.run(taskCtx, tasks)

if err != nil {
w.metrics.processDuration.WithLabelValues(w.id, labelSuccess).Observe(time.Since(start).Seconds())
w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds())
w.metrics.tasksProcessed.WithLabelValues(w.id, labelFailure).Add(float64(len(tasks)))
level.Error(w.logger).Log("msg", "failed to process tasks", "err", err)
} else {
Expand Down

0 comments on commit 6aca508

Please sign in to comment.