Skip to content

Commit

Permalink
chore: Fix kafka producer metrics names (#14936)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Nov 15, 2024
1 parent 0abc47d commit b2df31d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func New(
return nil, fmt.Errorf("failed to start kafka client: %w", err)
}
kafkaWriter = kafka_client.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
prometheus.WrapRegistererWithPrefix("_kafka_", registerer))
prometheus.WrapRegistererWithPrefix("loki_", registerer))
}

d := &Distributor{
Expand Down
21 changes: 9 additions & 12 deletions pkg/kafka/client/writer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/util/constants"
)

var (
// writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout.
// You can think about this overhead as an extra time for requests sitting in the client's buffer
// before being sent on the wire and the actual time it takes to send it over the network and
// start being processed by Kafka.
writerRequestTimeoutOverhead = 2 * time.Second
)
// writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout.
// You can think about this overhead as an extra time for requests sitting in the client's buffer
// before being sent on the wire and the actual time it takes to send it over the network and
// start being processed by Kafka.
var writerRequestTimeoutOverhead = 2 * time.Second

// NewWriterClient returns the kgo.Client that should be used by the Writer.
//
Expand Down Expand Up @@ -215,7 +212,7 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi
// Metrics.
bufferedProduceBytes: promauto.With(reg).NewSummary(
prometheus.SummaryOpts{
Namespace: constants.Loki,
Namespace: "kafka",
Name: "buffered_produce_bytes",
Help: "The buffered produce records in bytes. Quantile buckets keep track of buffered records size over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 1: 0.001},
Expand All @@ -224,17 +221,17 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi
}),
bufferedProduceBytesLimit: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: constants.Loki,
Namespace: "kafka",
Name: "buffered_produce_bytes_limit",
Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.",
}),
produceRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: constants.Loki,
Namespace: "kafka",
Name: "produce_requests_total",
Help: "Total number of produce requests issued to Kafka.",
}),
produceFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Namespace: "kafka",
Name: "produce_failures_total",
Help: "Total number of failed produce requests issued to Kafka.",
}, []string{"reason"}),
Expand Down

0 comments on commit b2df31d

Please sign in to comment.