diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 01dae3ee6e0cf..ac815cbe82cdb 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -279,11 +279,13 @@ func New( Help: "Total number of times the distributor has sharded streams", }), kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "kafka_appends_total", - Help: "The total number of appends sent to kafka ingest path.", + Namespace: constants.Loki, + Name: "distributor_kafka_appends_total", + Help: "The total number of appends sent to kafka ingest path.", }, []string{"partition", "status"}), kafkaWriteLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Name: "kafka_latency_seconds", + Namespace: constants.Loki, + Name: "distributor_kafka_latency_seconds", Help: "Latency to write an incoming request to the ingest storage.", NativeHistogramBucketFactor: 1.1, NativeHistogramMinResetDuration: 1 * time.Hour, @@ -291,13 +293,15 @@ func New( Buckets: prometheus.DefBuckets, }), kafkaWriteBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Name: "kafka_sent_bytes_total", - Help: "Total number of bytes sent to the ingest storage.", + Namespace: constants.Loki, + Name: "distributor_kafka_sent_bytes_total", + Help: "Total number of bytes sent to the ingest storage.", }), kafkaRecordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Name: "kafka_records_per_write_request", - Help: "The number of records a single per-partition write request has been split into.", - Buckets: prometheus.ExponentialBuckets(1, 2, 8), + Namespace: constants.Loki, + Name: "distributor_kafka_records_per_write_request", + Help: "The number of records a single per-partition write request has been split into.", + Buckets: prometheus.ExponentialBuckets(1, 2, 8), }), writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"), kafkaWriter: kafkaWriter, diff --git a/pkg/kafka/writer_client.go b/pkg/kafka/writer_client.go index ddd12a646d692..59fefda31d19b 100644 --- a/pkg/kafka/writer_client.go +++ b/pkg/kafka/writer_client.go @@ -18,6 +18,8 @@ import ( "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" + + "github.com/grafana/loki/v3/pkg/util/constants" ) // NewWriterClient returns the kgo.Client that should be used by the Writer. @@ -189,6 +191,7 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi // Metrics. bufferedProduceBytes: promauto.With(reg).NewSummary( prometheus.SummaryOpts{ + Namespace: constants.Loki, Name: "buffered_produce_bytes", Help: "The buffered produce records in bytes. Quantile buckets keep track of buffered records size over the last 60s.", Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 1: 0.001}, @@ -197,16 +200,19 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi }), bufferedProduceBytesLimit: promauto.With(reg).NewGauge( prometheus.GaugeOpts{ - Name: "buffered_produce_bytes_limit", - Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.", + Namespace: constants.Loki, + Name: "buffered_produce_bytes_limit", + Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.", }), produceRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "produce_requests_total", - Help: "Total number of produce requests issued to Kafka.", + Namespace: constants.Loki, + Name: "produce_requests_total", + Help: "Total number of produce requests issued to Kafka.", }), produceFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "produce_failures_total", - Help: "Total number of failed produce requests issued to Kafka.", + Namespace: constants.Loki, + Name: "produce_failures_total", + Help: "Total number of failed produce requests issued to Kafka.", }, []string{"reason"}), }