diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go index 0e6185fbe7968..54dced13d7670 100644 --- a/pkg/ingester/kafka_consumer.go +++ b/pkg/ingester/kafka_consumer.go @@ -73,6 +73,10 @@ func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partiti for { select { case <-ctx.Done(): + // It can happen that the context is canceled while there are unprocessed records + // in the channel. However, we do not need to process all remaining records, + // and can exit out instead, as partition offsets are not committed until + // the record has been handed over to the Pusher and committed in the WAL. level.Info(kc.logger).Log("msg", "shutting down kafka consumer") return case records := <-recordsChan: