From 41dc611384c7b512dba3858b2e232c6647671c61 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 13 Dec 2024 13:29:51 +0100 Subject: [PATCH] chore: Introduce back phase for kafka delay metrics --- pkg/ingester/ingester.go | 2 +- pkg/kafka/partition/reader.go | 18 ++++++++++++++---- pkg/kafka/partition/reader_service.go | 3 ++- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 5659c3b056a39..d0da49d359ddf 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -335,7 +335,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) if cfg.WAL.Enabled { - if err := os.MkdirAll(cfg.WAL.Dir, 0750); err != nil { + if err := os.MkdirAll(cfg.WAL.Dir, 0o750); err != nil { // Best effort try to make path absolute for easier debugging. path, _ := filepath.Abs(cfg.WAL.Dir) if path == "" { diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index 58f9e77c4b05e..d4a83c7102572 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -40,6 +40,9 @@ type Reader interface { Poll(ctx context.Context, maxPollRecords int) ([]Record, error) // Set the target offset for consumption. reads will begin from here. SetOffsetForConsumption(offset int64) + // SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader. + // For example, we can use this to differentiate between the startup phase and the running phase. + SetPhase(phase string) } // ReaderMetrics contains metrics specific to Kafka reading operations @@ -48,7 +51,7 @@ type ReaderMetrics struct { fetchesErrors prometheus.Counter fetchesTotal prometheus.Counter fetchWaitDuration prometheus.Histogram - receiveDelay prometheus.Histogram + receiveDelay *prometheus.HistogramVec lastCommittedOffset prometheus.Gauge kprom *kprom.Metrics } @@ -73,7 +76,7 @@ func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics { Name: "loki_kafka_reader_fetches_total", Help: "Total number of Kafka fetches performed.", }), - receiveDelay: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + receiveDelay: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Name: "loki_kafka_reader_receive_delay_seconds", Help: "Delay between producing a record and receiving it.", NativeHistogramZeroThreshold: math.Pow(2, -10), @@ -81,7 +84,7 @@ func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics { NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), - }), + }, []string{"phase"}), kprom: client.NewReaderClientMetrics("partition-reader", r), } } @@ -93,6 +96,7 @@ type KafkaReader struct { partitionID int32 consumerGroup string metrics *ReaderMetrics + phase string logger log.Logger } @@ -131,6 +135,12 @@ func (r *KafkaReader) Partition() int32 { return r.partitionID } +// SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader. +// For example, we can use this to differentiate between the startup phase and the running phase. +func (r *KafkaReader) SetPhase(phase string) { + r.phase = phase +} + // Poll retrieves the next batch of records from Kafka // Number of records fetched can be limited by configuring maxPollRecords to a non-zero value. func (r *KafkaReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, error) { @@ -143,7 +153,7 @@ func (r *KafkaReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, e var numRecords int fetches.EachRecord(func(record *kgo.Record) { numRecords++ - r.metrics.receiveDelay.Observe(time.Since(record.Timestamp).Seconds()) + r.metrics.receiveDelay.WithLabelValues(r.phase).Observe(time.Since(record.Timestamp).Seconds()) }) r.metrics.recordsPerFetch.Observe(float64(numRecords)) diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index fc0cb368d7722..9a1a9730f22b2 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -143,7 +143,7 @@ func (s *ReaderService) starting(ctx context.Context) error { s.metrics.reportStarting() logger := log.With(s.logger, "phase", phaseStarting) - + s.reader.SetPhase(phaseStarting) // Fetch the last committed offset to determine where to start reading lastCommittedOffset, err := s.offsetManager.FetchLastCommittedOffset(ctx, s.partitionID) if err != nil { @@ -174,6 +174,7 @@ func (s *ReaderService) starting(ctx context.Context) error { func (s *ReaderService) running(ctx context.Context) error { level.Info(s.logger).Log("msg", "reader service running") s.metrics.reportRunning() + s.reader.SetPhase(phaseRunning) consumer, err := s.consumerFactory(s.committer, log.With(s.logger, "phase", phaseRunning)) if err != nil {