Skip to content

Commit

Permalink
Introduce separate histogram metric for bloom query latency
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jan 16, 2024
1 parent a17cb9c commit 1e6aab6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
13 changes: 7 additions & 6 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,8 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
metrics, err := cliBloomCompactor.Metrics()
require.NoError(t, err)
successfulRunCount := getMetricValue(t, "loki_bloomcompactor_runs_completed_total", metrics)
return successfulRunCount >= 1
t.Log("successful bloom compactor runs", successfulRunCount)
return successfulRunCount == 1
}, 30*time.Second, time.Second)

// use bloom gateway to perform needle in the haystack queries
Expand All @@ -1238,18 +1239,18 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
bloomGwMetrics, err := cliBloomGateway.Metrics()
require.NoError(t, err)

mf, err := extractMetricFamily("loki_bloom_gateway_store_latency", bloomGwMetrics)
require.NoError(t, err)

unfilteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_pre_filtering", bloomGwMetrics)
require.Equal(t, float64(10), unfilteredCount)

filteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_post_filtering", bloomGwMetrics)
require.Equal(t, float64(1), filteredCount)

mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics)
require.NoError(t, err)

count := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{
Name: proto.String("operation"),
Value: proto.String("ForEach"),
Name: proto.String("status"),
Value: proto.String("success"),
}, func(m *dto.Metric) uint64 {
return m.Histogram.GetSampleCount()
})
Expand Down
20 changes: 16 additions & 4 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type workerMetrics struct {
dequeueErrors *prometheus.CounterVec
dequeueWaitTime *prometheus.SummaryVec
storeAccessLatency *prometheus.HistogramVec
bloomQueryLatency *prometheus.HistogramVec
}

func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics {
Expand All @@ -50,6 +51,13 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Name: "dequeue_wait_time",
Help: "Time spent waiting for dequeuing tasks from queue",
}, labels),
bloomQueryLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "bloom_query_latency",
Help: "Latency in seconds of processing bloom blocks",
}, append(labels, "status")),
// TODO(chaudum): Move this metric into the bloomshipper
storeAccessLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -210,20 +218,18 @@ func (w *worker) stopping(err error) error {
}

func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day time.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
storeFetchStart := time.Now()
return w.store.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
w.metrics.storeAccessLatency.WithLabelValues(w.id, "ForEach").Observe(time.Since(storeFetchStart).Seconds())
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
processBlock(bq, day, b.tasks)
w.processBlock(bq, day, b.tasks)
return nil
}
}
return nil
})
}

func processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) {
func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) {
schema, err := blockQuerier.Schema()
if err != nil {
for _, t := range tasks {
Expand All @@ -234,10 +240,16 @@ func processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) {
tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0)
it := newTaskMergeIterator(day, tokenizer, tasks...)
fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it})

start := time.Now()
err = fq.Run()

label := "success"
if err != nil {
label = "failure"
for _, t := range tasks {
t.ErrCh <- errors.Wrap(err, "failed to run chunk check")
}
}
w.metrics.bloomQueryLatency.WithLabelValues(w.id, label).Observe(time.Since(start).Seconds())
}

0 comments on commit 1e6aab6

Please sign in to comment.