Skip to content

Commit

Permalink
chore: add comments to help explain durability guarantees of the cons…
Browse files Browse the repository at this point in the history
…umer (#15105)
  • Loading branch information
grobinson-grafana authored Nov 25, 2024
1 parent 23dc55d commit 0c388c3
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/ingester/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partiti
for {
select {
case <-ctx.Done():
// It can happen that the context is canceled while there are unprocessed records
// in the channel. However, we do not need to process all remaining records,
// and can exit out instead, as partition offsets are not committed until
// the record has been handed over to the Pusher and committed in the WAL.
level.Info(kc.logger).Log("msg", "shutting down kafka consumer")
return
case records := <-recordsChan:
Expand Down

0 comments on commit 0c388c3

Please sign in to comment.