Skip to content

Commit

Permalink
chore: Introduce back phase for kafka delay metrics (#15407)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Dec 13, 2024
1 parent 60abd08 commit 6dfe90a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})

if cfg.WAL.Enabled {
if err := os.MkdirAll(cfg.WAL.Dir, 0750); err != nil {
if err := os.MkdirAll(cfg.WAL.Dir, 0o750); err != nil {
// Best effort try to make path absolute for easier debugging.
path, _ := filepath.Abs(cfg.WAL.Dir)
if path == "" {
Expand Down
18 changes: 14 additions & 4 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Reader interface {
Poll(ctx context.Context, maxPollRecords int) ([]Record, error)
// Set the target offset for consumption. reads will begin from here.
SetOffsetForConsumption(offset int64)
// SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader.
// For example, we can use this to differentiate between the startup phase and the running phase.
SetPhase(phase string)
}

// ReaderMetrics contains metrics specific to Kafka reading operations
Expand All @@ -48,7 +51,7 @@ type ReaderMetrics struct {
fetchesErrors prometheus.Counter
fetchesTotal prometheus.Counter
fetchWaitDuration prometheus.Histogram
receiveDelay prometheus.Histogram
receiveDelay *prometheus.HistogramVec
lastCommittedOffset prometheus.Gauge
kprom *kprom.Metrics
}
Expand All @@ -73,15 +76,15 @@ func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics {
Name: "loki_kafka_reader_fetches_total",
Help: "Total number of Kafka fetches performed.",
}),
receiveDelay: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
receiveDelay: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "loki_kafka_reader_receive_delay_seconds",
Help: "Delay between producing a record and receiving it.",
NativeHistogramZeroThreshold: math.Pow(2, -10),
NativeHistogramBucketFactor: 1.2,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.ExponentialBuckets(0.125, 2, 18),
}),
}, []string{"phase"}),
kprom: client.NewReaderClientMetrics("partition-reader", r),
}
}
Expand All @@ -93,6 +96,7 @@ type KafkaReader struct {
partitionID int32
consumerGroup string
metrics *ReaderMetrics
phase string
logger log.Logger
}

Expand Down Expand Up @@ -131,6 +135,12 @@ func (r *KafkaReader) Partition() int32 {
return r.partitionID
}

// SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader.
// For example, we can use this to differentiate between the startup phase and the running phase.
func (r *KafkaReader) SetPhase(phase string) {
r.phase = phase
}

// Poll retrieves the next batch of records from Kafka
// Number of records fetched can be limited by configuring maxPollRecords to a non-zero value.
func (r *KafkaReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, error) {
Expand All @@ -143,7 +153,7 @@ func (r *KafkaReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, e
var numRecords int
fetches.EachRecord(func(record *kgo.Record) {
numRecords++
r.metrics.receiveDelay.Observe(time.Since(record.Timestamp).Seconds())
r.metrics.receiveDelay.WithLabelValues(r.phase).Observe(time.Since(record.Timestamp).Seconds())
})
r.metrics.recordsPerFetch.Observe(float64(numRecords))

Expand Down
3 changes: 2 additions & 1 deletion pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *ReaderService) starting(ctx context.Context) error {
s.metrics.reportStarting()

logger := log.With(s.logger, "phase", phaseStarting)

s.reader.SetPhase(phaseStarting)
// Fetch the last committed offset to determine where to start reading
lastCommittedOffset, err := s.offsetManager.FetchLastCommittedOffset(ctx, s.partitionID)
if err != nil {
Expand Down Expand Up @@ -174,6 +174,7 @@ func (s *ReaderService) starting(ctx context.Context) error {
func (s *ReaderService) running(ctx context.Context) error {
level.Info(s.logger).Log("msg", "reader service running")
s.metrics.reportRunning()
s.reader.SetPhase(phaseRunning)

consumer, err := s.consumerFactory(s.committer, log.With(s.logger, "phase", phaseRunning))
if err != nil {
Expand Down

0 comments on commit 6dfe90a

Please sign in to comment.