From ef311cf6887bea344bd4ca872be2056bfa36e547 Mon Sep 17 00:00:00 2001 From: Alex Bublichenko <46664526+abliqo@users.noreply.github.com> Date: Tue, 14 Nov 2023 21:21:46 +0100 Subject: [PATCH] [kafka-consumer] Add topic name as a tag to offset manager metrics (#4951) ## Which problem is this PR solving? Resolves #4950 ## Description of the changes Add tag with topic name to metrics emitted by Kafka consumer offset manager. ## How was this change tested? Unit tests Signed-off-by: Alex Bublichenko --- cmd/ingester/app/consumer/offset/manager.go | 16 +++++++++++++--- cmd/ingester/app/consumer/offset/manager_test.go | 8 ++++---- cmd/ingester/app/consumer/processor_factory.go | 2 +- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/cmd/ingester/app/consumer/offset/manager.go b/cmd/ingester/app/consumer/offset/manager.go index a9e39be8910..b1e6c019135 100644 --- a/cmd/ingester/app/consumer/offset/manager.go +++ b/cmd/ingester/app/consumer/offset/manager.go @@ -51,12 +51,22 @@ type Manager struct { type MarkOffset func(offset int64) // NewManager creates a new Manager -func NewManager(minOffset int64, markOffset MarkOffset, partition int32, factory metrics.Factory) *Manager { +func NewManager( + minOffset int64, + markOffset MarkOffset, + topic string, + partition int32, + factory metrics.Factory, +) *Manager { + tags := map[string]string{ + "topic": topic, + "partition": strconv.Itoa(int(partition)), + } return &Manager{ markOffsetFunction: markOffset, close: make(chan struct{}), - offsetCommitCount: factory.Counter(metrics.Options{Name: "offset-commits-total", Tags: map[string]string{"partition": strconv.Itoa(int(partition))}}), - lastCommittedOffset: factory.Gauge(metrics.Options{Name: "last-committed-offset", Tags: map[string]string{"partition": strconv.Itoa(int(partition))}}), + offsetCommitCount: factory.Counter(metrics.Options{Name: "offset-commits-total", Tags: tags}), + lastCommittedOffset: factory.Gauge(metrics.Options{Name: "last-committed-offset", Tags: tags}), list: newConcurrentList(minOffset), minOffset: minOffset, } diff --git a/cmd/ingester/app/consumer/offset/manager_test.go b/cmd/ingester/app/consumer/offset/manager_test.go index 58a0a2020e9..74ec07e70da 100644 --- a/cmd/ingester/app/consumer/offset/manager_test.go +++ b/cmd/ingester/app/consumer/offset/manager_test.go @@ -38,7 +38,7 @@ func TestHandleReset(t *testing.T) { captureOffset = offset wg.Done() } - manager := NewManager(minOffset, fakeMarker, 1, m) + manager := NewManager(minOffset, fakeMarker, "test_topic", 1, m) manager.Start() manager.MarkOffset(offset) @@ -47,8 +47,8 @@ func TestHandleReset(t *testing.T) { assert.Equal(t, offset, captureOffset) cnt, g := m.Snapshot() - assert.Equal(t, int64(1), cnt["offset-commits-total|partition=1"]) - assert.Equal(t, int64(offset), g["last-committed-offset|partition=1"]) + assert.Equal(t, int64(1), cnt["offset-commits-total|partition=1|topic=test_topic"]) + assert.Equal(t, int64(offset), g["last-committed-offset|partition=1|topic=test_topic"]) } func TestCache(t *testing.T) { @@ -57,7 +57,7 @@ func TestCache(t *testing.T) { fakeMarker := func(offset int64) { assert.Fail(t, "Shouldn't mark cached offset") } - manager := NewManager(offset, fakeMarker, 1, metrics.NullFactory) + manager := NewManager(offset, fakeMarker, "test_topic", 1, metrics.NullFactory) manager.Start() time.Sleep(resetInterval + 50) manager.MarkOffset(offset) diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index ba025f9be7a..a4e09584679 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -65,7 +65,7 @@ func (c *ProcessorFactory) new(topic string, partition int32, minOffset int64) p c.consumer.MarkPartitionOffset(topic, partition, offset, "") } - om := offset.NewManager(minOffset, markOffset, partition, c.metricsFactory) + om := offset.NewManager(minOffset, markOffset, topic, partition, c.metricsFactory) retryProcessor := decorator.NewRetryingProcessor(c.metricsFactory, c.baseProcessor, c.retryOptions...) cp := NewCommittingProcessor(retryProcessor, om)