Skip to content

Commit

Permalink
Update metrics of bloomgateway worker
Browse files Browse the repository at this point in the history
```
dequeueDuration   : Summary
processDuration   : Summary
metasFetched      : Summary
blocksFetched     : Summary
tasksDequeued     : Counter
tasksProcessed    : Counter
blockQueryLatency : Histogram
```

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 9, 2024
1 parent 60551da commit 88fc5a8
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 31 deletions.
29 changes: 23 additions & 6 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"math"
"sort"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
Expand All @@ -17,16 +18,20 @@ type tasksForBlock struct {
tasks []Task
}

func newProcessor(store bloomshipper.Store, logger log.Logger) *processor {
func newProcessor(id string, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
return &processor{
store: store,
logger: logger,
id: id,
store: store,
logger: logger,
metrics: metrics,
}
}

type processor struct {
store bloomshipper.Store
logger log.Logger
id string
store bloomshipper.Store
logger log.Logger
metrics *workerMetrics
}

func (p *processor) run(ctx context.Context, tasks []Task) error {
Expand Down Expand Up @@ -58,6 +63,8 @@ func (p *processor) processTasks(ctx context.Context, tenant string, interval bl
if err != nil {
return err
}
p.metrics.metasFetched.WithLabelValues(p.id).Observe(float64(len(metas)))

blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces)
return p.processBlocks(ctx, partition(tasks, blocksRefs))
}
Expand All @@ -72,6 +79,7 @@ func (p *processor) processBlocks(ctx context.Context, data []tasksForBlock) err
if err != nil {
return err
}
p.metrics.metasFetched.WithLabelValues(p.id).Observe(float64(len(bqs)))

blockIter := v1.NewSliceIter(bqs)

Expand Down Expand Up @@ -109,7 +117,16 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie
}

fq := blockQuerier.Fuse(iters)
return fq.Run()

start := time.Now()
err = fq.Run()
if err != nil {
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelFailure).Observe(time.Since(start).Seconds())
} else {
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelSuccess).Observe(time.Since(start).Seconds())
}

return err
}

// getFirstLast returns the first and last item of a fingerprint slice
Expand Down
15 changes: 7 additions & 8 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util/constants"
)

var _ bloomshipper.Store = &dummyStore{}
Expand Down Expand Up @@ -92,13 +94,13 @@ func TestProcessor(t *testing.T) {
ctx := context.Background()
tenant := "fake"
now := mktime("2024-01-27 12:00")
metrics := newWorkerMetrics(prometheus.NewPedanticRegistry(), constants.Loki, "bloom_gatway")

t.Run("success case", func(t *testing.T) {
_, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
p := &processor{
store: newMockBloomStore(queriers, metas),
logger: log.NewNopLogger(),
}

mockStore := newMockBloomStore(queriers, metas)
p := newProcessor("worker", mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithBounds{
Expand Down Expand Up @@ -142,10 +144,7 @@ func TestProcessor(t *testing.T) {
mockStore := newMockBloomStore(queriers, metas)
mockStore.err = errors.New("store failed")

p := &processor{
store: mockStore,
logger: log.NewNopLogger(),
}
p := newProcessor("worker", mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithBounds{
Expand Down
75 changes: 58 additions & 17 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,71 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

const (
labelSuccess = "success"
labelFailure = "failure"
)

type workerConfig struct {
maxItems int
}

type workerMetrics struct {
dequeuedTasks *prometheus.CounterVec
dequeueErrors *prometheus.CounterVec
dequeueWaitTime *prometheus.SummaryVec
dequeueDuration *prometheus.SummaryVec
processDuration *prometheus.SummaryVec
metasFetched *prometheus.SummaryVec
blocksFetched *prometheus.SummaryVec
tasksDequeued *prometheus.CounterVec
tasksProcessed *prometheus.CounterVec
blockQueryLatency *prometheus.HistogramVec
}

func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics {
labels := []string{"worker"}
r := promauto.With(registerer)
return &workerMetrics{
dequeuedTasks: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
dequeueDuration: r.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dequeued_tasks_total",
Help: "Total amount of tasks that the worker dequeued from the bloom query queue",
Name: "dequeue_duration_seconds",
Help: "Time spent dequeuing tasks from queue in seconds",
}, labels),
dequeueErrors: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
processDuration: r.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "process_duration_seconds",
Help: "Time spent processing tasks in seconds",
}, append(labels, "status")),
metasFetched: r.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dequeue_errors_total",
Help: "Total amount of failed dequeue operations",
Name: "metas_fetched",
Help: "Amount of metas fetched",
}, labels),
dequeueWaitTime: promauto.With(registerer).NewSummaryVec(prometheus.SummaryOpts{
blocksFetched: r.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dequeue_wait_time",
Help: "Time spent waiting for dequeuing tasks from queue",
Name: "blocks_fetched",
Help: "Amount of blocks fetched",
}, labels),
tasksDequeued: r.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "tasks_dequeued_total",
Help: "Total amount of tasks that the worker dequeued from the queue",
}, append(labels, "status")),
tasksProcessed: r.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "tasks_processed_total",
Help: "Total amount of tasks that the worker processed",
}, append(labels, "status")),
blockQueryLatency: r.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "block_query_latency",
Help: "Time spent running searches against a bloom block",
}, append(labels, "status")),
}
}

Expand Down Expand Up @@ -89,19 +123,19 @@ func (w *worker) starting(_ context.Context) error {
func (w *worker) running(_ context.Context) error {
idx := queue.StartIndexWithLocalQueue

p := processor{store: w.store, logger: w.logger}
p := newProcessor(w.id, w.store, w.logger, w.metrics)

for st := w.State(); st == services.Running || st == services.Stopping; {
taskCtx := context.Background()
dequeueStart := time.Now()
start := time.Now()
items, newIdx, err := w.queue.DequeueMany(taskCtx, idx, w.id, w.cfg.maxItems)
w.metrics.dequeueWaitTime.WithLabelValues(w.id).Observe(time.Since(dequeueStart).Seconds())
w.metrics.dequeueDuration.WithLabelValues(w.id).Observe(time.Since(start).Seconds())
if err != nil {
// We only return an error if the queue is stopped and dequeuing did not yield any items
if err == queue.ErrStopped && len(items) == 0 {
return err
}
w.metrics.dequeueErrors.WithLabelValues(w.id).Inc()
w.metrics.tasksDequeued.WithLabelValues(w.id, labelFailure).Inc()
level.Error(w.logger).Log("msg", "failed to dequeue tasks", "err", err, "items", len(items))
}
idx = newIdx
Expand All @@ -110,7 +144,7 @@ func (w *worker) running(_ context.Context) error {
w.queue.ReleaseRequests(items)
continue
}
w.metrics.dequeuedTasks.WithLabelValues(w.id).Add(float64(len(items)))
w.metrics.tasksDequeued.WithLabelValues(w.id, labelSuccess).Add(float64(len(items)))

tasks := make([]Task, 0, len(items))
for _, item := range items {
Expand All @@ -125,9 +159,16 @@ func (w *worker) running(_ context.Context) error {
tasks = append(tasks, task)
}

start = time.Now()
err = p.run(taskCtx, tasks)

if err != nil {
w.metrics.processDuration.WithLabelValues(w.id, labelSuccess).Observe(time.Since(start).Seconds())
w.metrics.tasksProcessed.WithLabelValues(w.id, labelFailure).Add(float64(len(tasks)))
level.Error(w.logger).Log("msg", "failed to process tasks", "err", err)
} else {
w.metrics.processDuration.WithLabelValues(w.id, labelSuccess).Observe(time.Since(start).Seconds())
w.metrics.tasksProcessed.WithLabelValues(w.id, labelSuccess).Add(float64(len(tasks)))
}

// return dequeued items back to the pool
Expand Down

0 comments on commit 88fc5a8

Please sign in to comment.