diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index b2a98afa99ed4..74f18b02057f3 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -274,7 +274,7 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex attempts := []func() (time.Duration, error){ // First process fetches until at least the max lag is honored. func() (time.Duration, error) { - return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan) + return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) }, // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag @@ -287,13 +287,13 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) defer cancel() - return p.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan) + return p.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan, time.Since) }, // If the target lag hasn't been reached with the previous attempt then we'll move on. However, // we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored. func() (time.Duration, error) { - return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan) + return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) }, } @@ -326,7 +326,7 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex return nil } -func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record) (time.Duration, error) { +func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record, timeSince func(time.Time) time.Duration) (time.Duration, error) { boff := backoff.New(ctx, backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: time.Second, @@ -382,7 +382,7 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t // If it took less than the max desired lag to replay the partition // then we can stop here, otherwise we'll have to redo it. - if currLag = time.Since(lastProducedOffsetRequestedAt); currLag <= maxLag { + if currLag = timeSince(lastProducedOffsetRequestedAt); currLag <= maxLag { return currLag, nil } } diff --git a/pkg/kafka/partition/reader_test.go b/pkg/kafka/partition/reader_test.go index b68c9a0622704..8d548c8312411 100644 --- a/pkg/kafka/partition/reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -2,6 +2,7 @@ package partition import ( "context" + "fmt" "sync" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/kafka/testkafka" @@ -161,3 +163,71 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) { err = services.StopAndAwaitTerminated(context.Background(), partitionReader) require.NoError(t, err) } + +func TestPartitionReader_ProcessCommits(t *testing.T) { + _, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic") + consumer := newMockConsumer() + + consumerFactory := func(_ Committer) (Consumer, error) { + return consumer, nil + } + + partitionID := int32(0) + partitionReader, err := NewReader(kafkaCfg, partitionID, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + // Init the client: This usually happens in "start" but we want to manage our own lifecycle for this test. + partitionReader.client, err = kafka.NewReaderClient(kafkaCfg, nil, log.NewNopLogger(), + kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{ + kafkaCfg.Topic: {partitionID: kgo.NewOffset().AtStart()}, + }), + ) + require.NoError(t, err) + + stream := logproto.Stream{ + Labels: labels.FromStrings("foo", "bar").String(), + Entries: []logproto.Entry{{Timestamp: time.Now(), Line: "test"}}, + } + + records, err := kafka.Encode(partitionID, "test-tenant", stream, 10<<20) + require.NoError(t, err) + require.Len(t, records, 1) + + ctx, cancel := context.WithDeadlineCause(context.Background(), time.Now().Add(10*time.Second), fmt.Errorf("test unexpectedly deadlocked")) + recordsChan := make(chan []Record) + wait := consumer.Start(ctx, recordsChan) + + targetLag := time.Second + + i := -1 + iterations := 5 + producer.ProduceSync(context.Background(), records...) + // timeSince acts as a hook for when we check if we've honoured the lag or not. We modify it to respond "no" initially, to force a re-loop, and then "yes" after `iterations`. + // We also inject a new kafka record each time so there is more to consume. + timeSince := func(time.Time) time.Duration { + i++ + if i < iterations { + producer.ProduceSync(context.Background(), records...) + return targetLag + 1 + } + return targetLag - 1 + } + + _, err = partitionReader.processNextFetchesUntilLagHonored(ctx, targetLag, log.NewNopLogger(), recordsChan, timeSince) + assert.NoError(t, err) + + // Wait to process all the records + cancel() + wait() + + close(recordsChan) + close(consumer.recordsChan) + recordsCount := 0 + for receivedRecords := range consumer.recordsChan { + recordsCount += len(receivedRecords) + } + // We expect to have processed all the records, including initial + one per iteration. + assert.Equal(t, iterations+1, recordsCount) +}