Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various bloom gateway instrumentation changes #11983

Merged
merged 8 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 37 additions & 45 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ var (
)

type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
chunkRefsUnfiltered prometheus.Counter
chunkRefsFiltered prometheus.Counter
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
chunkRemovals *prometheus.CounterVec
}

func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
Expand All @@ -106,29 +105,15 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *
MaxAge: time.Minute,
AgeBuckets: 6,
}),
chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
chunkRemovals: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_pre_filtering",
Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.",
}),
chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_post_filtering",
Help: "Total amount of chunk refs post filtering.",
}),
Name: "chunk_removals_total",
Help: "Total amount of removals received from the block querier partitioned by state. The state 'accepted' means that the removals are processed, the state 'dropped' means that the removals were received after the task context was done (e.g. client timeout, etc).",
}, []string{"state"}),
}
}

func (m *metrics) addUnfilteredCount(n int) {
m.chunkRefsUnfiltered.Add(float64(n))
}

func (m *metrics) addFilteredCount(n int) {
m.chunkRefsFiltered.Add(float64(n))
}

// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
Expand Down Expand Up @@ -324,12 +309,8 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, errors.New("from time must not be after through time")
}

numChunksUnfiltered := len(req.Refs)

// Shortcut if request does not contain filters
if len(req.Filters) == 0 {
g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
Expand Down Expand Up @@ -374,15 +355,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
})
go consumeTask(ctx, task, tasksCh, logger)
go g.consumeTask(ctx, task, tasksCh)
}

responses := responsesPool.Get(numSeries)
defer responsesPool.Put(responses)
remaining := len(tasks)

outer:
for {
for remaining > 0 {
select {
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "request failed")
Expand All @@ -393,23 +373,17 @@ outer:
}
responses = append(responses, task.responses...)
remaining--
if remaining == 0 {
break outer
}
}
}

for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
removeNotMatchingChunks(req, o, g.logger)
}
preFilterSeries := len(req.Refs)

g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
// TODO(chaudum): Don't wait for all responses before starting to filter chunks.
filtered := g.processResponses(req, responses)

level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs))
postFilterSeries := len(req.Refs)

level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered)
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

Expand All @@ -419,16 +393,18 @@ outer:
// task is closed by the worker.
// Once the tasks is closed, it will send the task with the results from the
// block querier to the supplied task channel.
func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log.Logger) {
logger = log.With(logger, "task", task.ID)
func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Task) {
logger := log.With(g.logger, "task", task.ID)

for res := range task.resCh {
select {
case <-ctx.Done():
level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
g.metrics.chunkRemovals.WithLabelValues("dropped").Add(float64(res.Removals.Len()))
default:
level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
task.responses = append(task.responses, res)
g.metrics.chunkRemovals.WithLabelValues("accepted").Add(float64(res.Removals.Len()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the meaning of dropped or accepted is intuitive for the operators. Maybe we can add what they mean in the metric help? Or chose different label values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this description make sense to you? 0431acd

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes perfect sense now? :) Thank you!

}
}

Expand All @@ -441,21 +417,34 @@ func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log
}
}

func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, logger log.Logger) {
func (g *Gateway) processResponses(req *logproto.FilterChunkRefRequest, responses []v1.Output) (filtered int) {
for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
filtered += g.removeNotMatchingChunks(req, o)
}
return
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) (filtered int) {

// binary search index of fingerprint
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})

// fingerprint not found
if idx >= len(req.Refs) {
level.Error(logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
}

// if all chunks of a fingerprint are are removed
// then remove the whole group from the response
if len(req.Refs[idx].Refs) == res.Removals.Len() {
filtered += len(req.Refs[idx].Refs)

req.Refs[idx] = nil // avoid leaking pointer
req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...)
return
Expand All @@ -465,10 +454,13 @@ func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output,
toRemove := res.Removals[i]
for j := 0; j < len(req.Refs[idx].Refs); j++ {
if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum {
filtered += 1

req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
j-- // since we removed the current item at index, we have to redo the same index
}
}
}
return
}
9 changes: 7 additions & 2 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
}

func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
g := &Gateway{
logger: log.NewNopLogger(),
}
t.Run("removing chunks partially", func(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{
Expand Down Expand Up @@ -450,7 +453,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
}},
},
}
removeNotMatchingChunks(req, res, log.NewNopLogger())
n := g.removeNotMatchingChunks(req, res)
require.Equal(t, 2, n)
require.Equal(t, expected, req)
})

Expand All @@ -474,7 +478,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
expected := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{},
}
removeNotMatchingChunks(req, res, log.NewNopLogger())
n := g.removeNotMatchingChunks(req, res)
require.Equal(t, 3, n)
require.Equal(t, expected, req)
})

Expand Down
55 changes: 53 additions & 2 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,56 @@ import (
"sort"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
)

type querierMetrics struct {
chunksTotal prometheus.Counter
chunksFiltered prometheus.Counter
seriesTotal prometheus.Counter
seriesFiltered prometheus.Counter
}

func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem string) *querierMetrics {
return &querierMetrics{
chunksTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunks_total",
Help: "Total amount of chunks pre filtering. Does not count chunks in failed requests.",
}),
chunksFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunks_filtered_total",
Help: "Total amount of chunks that have been filtered out. Does not count chunks in failed requests.",
}),
seriesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_total",
Help: "Total amount of series pre filtering. Does not count series in failed requests.",
}),
seriesFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_filtered_total",
Help: "Total amount of series that have been filtered out. Does not count series in failed requests.",
}),
}
}

// BloomQuerier is a store-level abstraction on top of Client
// It is used by the index gateway to filter ChunkRefs based on given line fiter expression.
type BloomQuerier struct {
c Client
logger log.Logger
c Client
logger log.Logger
metrics *querierMetrics
}

func NewQuerier(c Client, logger log.Logger) *BloomQuerier {
Expand All @@ -37,6 +76,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
defer groupedChunksRefPool.Put(grouped)
grouped = groupChunkRefs(chunkRefs, grouped)

preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)

refs, err := bq.c.FilterChunks(ctx, tenant, from, through, grouped, filters...)
if err != nil {
return nil, err
Expand All @@ -55,6 +97,15 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
})
}
}

postFilterChunks := len(result)
postFilterSeries := len(refs)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
bq.metrics.chunksFiltered.Add(float64(preFilterChunks - postFilterChunks))
bq.metrics.seriesTotal.Add(float64(preFilterSeries))
bq.metrics.seriesFiltered.Add(float64(preFilterSeries - postFilterSeries))

return result, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (w *worker) running(_ context.Context) error {
err = p.run(taskCtx, tasks)

if err != nil {
w.metrics.processDuration.WithLabelValues(w.id, labelSuccess).Observe(time.Since(start).Seconds())
w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds())
w.metrics.tasksProcessed.WithLabelValues(w.id, labelFailure).Add(float64(len(tasks)))
level.Error(w.logger).Log("msg", "failed to process tasks", "err", err)
} else {
Expand Down
Loading