From b2df31d9ede46e84c497d4258a0a20086ccd6191 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 15 Nov 2024 10:30:10 +0100 Subject: [PATCH] chore: Fix kafka producer metrics names (#14936) --- pkg/distributor/distributor.go | 2 +- pkg/kafka/client/writer_client.go | 21 +++++++++------------ 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 44002f151f5fe..e4b92c8f585c4 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -233,7 +233,7 @@ func New( return nil, fmt.Errorf("failed to start kafka client: %w", err) } kafkaWriter = kafka_client.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes, - prometheus.WrapRegistererWithPrefix("_kafka_", registerer)) + prometheus.WrapRegistererWithPrefix("loki_", registerer)) } d := &Distributor{ diff --git a/pkg/kafka/client/writer_client.go b/pkg/kafka/client/writer_client.go index 1493e17f51686..282233ec15df2 100644 --- a/pkg/kafka/client/writer_client.go +++ b/pkg/kafka/client/writer_client.go @@ -21,16 +21,13 @@ import ( "go.uber.org/atomic" "github.com/grafana/loki/v3/pkg/kafka" - "github.com/grafana/loki/v3/pkg/util/constants" ) -var ( - // writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout. - // You can think about this overhead as an extra time for requests sitting in the client's buffer - // before being sent on the wire and the actual time it takes to send it over the network and - // start being processed by Kafka. - writerRequestTimeoutOverhead = 2 * time.Second -) +// writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout. +// You can think about this overhead as an extra time for requests sitting in the client's buffer +// before being sent on the wire and the actual time it takes to send it over the network and +// start being processed by Kafka. +var writerRequestTimeoutOverhead = 2 * time.Second // NewWriterClient returns the kgo.Client that should be used by the Writer. // @@ -215,7 +212,7 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi // Metrics. bufferedProduceBytes: promauto.With(reg).NewSummary( prometheus.SummaryOpts{ - Namespace: constants.Loki, + Namespace: "kafka", Name: "buffered_produce_bytes", Help: "The buffered produce records in bytes. Quantile buckets keep track of buffered records size over the last 60s.", Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 1: 0.001}, @@ -224,17 +221,17 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi }), bufferedProduceBytesLimit: promauto.With(reg).NewGauge( prometheus.GaugeOpts{ - Namespace: constants.Loki, + Namespace: "kafka", Name: "buffered_produce_bytes_limit", Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.", }), produceRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Namespace: constants.Loki, + Namespace: "kafka", Name: "produce_requests_total", Help: "Total number of produce requests issued to Kafka.", }), produceFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Namespace: constants.Loki, + Namespace: "kafka", Name: "produce_failures_total", Help: "Total number of failed produce requests issued to Kafka.", }, []string{"reason"}),