Skip to content

Commit

Permalink
[kafka-consumer] Add topic name as a tag to offset manager metrics (#…
Browse files Browse the repository at this point in the history
…4951)

## Which problem is this PR solving?

Resolves #4950

## Description of the changes

Add tag with topic name to metrics emitted by Kafka consumer offset
manager.

## How was this change tested?
Unit tests

Signed-off-by: Alex Bublichenko <[email protected]>
  • Loading branch information
abliqo authored Nov 14, 2023
1 parent 0bb9590 commit ef311cf
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
16 changes: 13 additions & 3 deletions cmd/ingester/app/consumer/offset/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,22 @@ type Manager struct {
type MarkOffset func(offset int64)

// NewManager creates a new Manager
func NewManager(minOffset int64, markOffset MarkOffset, partition int32, factory metrics.Factory) *Manager {
func NewManager(
minOffset int64,
markOffset MarkOffset,
topic string,
partition int32,
factory metrics.Factory,
) *Manager {
tags := map[string]string{
"topic": topic,
"partition": strconv.Itoa(int(partition)),
}
return &Manager{
markOffsetFunction: markOffset,
close: make(chan struct{}),
offsetCommitCount: factory.Counter(metrics.Options{Name: "offset-commits-total", Tags: map[string]string{"partition": strconv.Itoa(int(partition))}}),
lastCommittedOffset: factory.Gauge(metrics.Options{Name: "last-committed-offset", Tags: map[string]string{"partition": strconv.Itoa(int(partition))}}),
offsetCommitCount: factory.Counter(metrics.Options{Name: "offset-commits-total", Tags: tags}),
lastCommittedOffset: factory.Gauge(metrics.Options{Name: "last-committed-offset", Tags: tags}),
list: newConcurrentList(minOffset),
minOffset: minOffset,
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/ingester/app/consumer/offset/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestHandleReset(t *testing.T) {
captureOffset = offset
wg.Done()
}
manager := NewManager(minOffset, fakeMarker, 1, m)
manager := NewManager(minOffset, fakeMarker, "test_topic", 1, m)
manager.Start()

manager.MarkOffset(offset)
Expand All @@ -47,8 +47,8 @@ func TestHandleReset(t *testing.T) {

assert.Equal(t, offset, captureOffset)
cnt, g := m.Snapshot()
assert.Equal(t, int64(1), cnt["offset-commits-total|partition=1"])
assert.Equal(t, int64(offset), g["last-committed-offset|partition=1"])
assert.Equal(t, int64(1), cnt["offset-commits-total|partition=1|topic=test_topic"])
assert.Equal(t, int64(offset), g["last-committed-offset|partition=1|topic=test_topic"])
}

func TestCache(t *testing.T) {
Expand All @@ -57,7 +57,7 @@ func TestCache(t *testing.T) {
fakeMarker := func(offset int64) {
assert.Fail(t, "Shouldn't mark cached offset")
}
manager := NewManager(offset, fakeMarker, 1, metrics.NullFactory)
manager := NewManager(offset, fakeMarker, "test_topic", 1, metrics.NullFactory)
manager.Start()
time.Sleep(resetInterval + 50)
manager.MarkOffset(offset)
Expand Down
2 changes: 1 addition & 1 deletion cmd/ingester/app/consumer/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *ProcessorFactory) new(topic string, partition int32, minOffset int64) p
c.consumer.MarkPartitionOffset(topic, partition, offset, "")
}

om := offset.NewManager(minOffset, markOffset, partition, c.metricsFactory)
om := offset.NewManager(minOffset, markOffset, topic, partition, c.metricsFactory)

retryProcessor := decorator.NewRetryingProcessor(c.metricsFactory, c.baseProcessor, c.retryOptions...)
cp := NewCommittingProcessor(retryProcessor, om)
Expand Down

0 comments on commit ef311cf

Please sign in to comment.