Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(kafka): Added additional test for kafka startup logic #14391

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
@@ -274,7 +274,7 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex
attempts := []func() (time.Duration, error){
// First process fetches until at least the max lag is honored.
func() (time.Duration, error) {
return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan)
return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since)
},

// If the target lag hasn't been reached with the first attempt (which stops once at least the max lag
@@ -287,13 +287,13 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex
timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded)
defer cancel()

return p.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan)
return p.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan, time.Since)
},

// If the target lag hasn't been reached with the previous attempt then we'll move on. However,
// we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored.
func() (time.Duration, error) {
return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan)
return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since)
},
}

@@ -326,7 +326,7 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex
return nil
}

func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record) (time.Duration, error) {
func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record, timeSince func(time.Time) time.Duration) (time.Duration, error) {
boff := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
@@ -382,7 +382,7 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t

// If it took less than the max desired lag to replay the partition
// then we can stop here, otherwise we'll have to redo it.
if currLag = time.Since(lastProducedOffsetRequestedAt); currLag <= maxLag {
if currLag = timeSince(lastProducedOffsetRequestedAt); currLag <= maxLag {
return currLag, nil
}
}
70 changes: 70 additions & 0 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package partition

import (
"context"
"fmt"
"sync"
"testing"
"time"
@@ -13,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
@@ -161,3 +163,71 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) {
err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
require.NoError(t, err)
}

func TestPartitionReader_ProcessCommits(t *testing.T) {
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic")
consumer := newMockConsumer()

consumerFactory := func(_ Committer) (Consumer, error) {
return consumer, nil
}

partitionID := int32(0)
partitionReader, err := NewReader(kafkaCfg, partitionID, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)

// Init the client: This usually happens in "start" but we want to manage our own lifecycle for this test.
partitionReader.client, err = kafka.NewReaderClient(kafkaCfg, nil, log.NewNopLogger(),
kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{
kafkaCfg.Topic: {partitionID: kgo.NewOffset().AtStart()},
}),
)
require.NoError(t, err)

stream := logproto.Stream{
Labels: labels.FromStrings("foo", "bar").String(),
Entries: []logproto.Entry{{Timestamp: time.Now(), Line: "test"}},
}

records, err := kafka.Encode(partitionID, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

ctx, cancel := context.WithDeadlineCause(context.Background(), time.Now().Add(10*time.Second), fmt.Errorf("test unexpectedly deadlocked"))
recordsChan := make(chan []Record)
wait := consumer.Start(ctx, recordsChan)

targetLag := time.Second

i := -1
iterations := 5
producer.ProduceSync(context.Background(), records...)
// timeSince acts as a hook for when we check if we've honoured the lag or not. We modify it to respond "no" initially, to force a re-loop, and then "yes" after `iterations`.
// We also inject a new kafka record each time so there is more to consume.
timeSince := func(time.Time) time.Duration {
i++
if i < iterations {
producer.ProduceSync(context.Background(), records...)
return targetLag + 1
}
return targetLag - 1
}

_, err = partitionReader.processNextFetchesUntilLagHonored(ctx, targetLag, log.NewNopLogger(), recordsChan, timeSince)
assert.NoError(t, err)

// Wait to process all the records
cancel()
wait()

close(recordsChan)
close(consumer.recordsChan)
recordsCount := 0
for receivedRecords := range consumer.recordsChan {
recordsCount += len(receivedRecords)
}
// We expect to have processed all the records, including initial + one per iteration.
assert.Equal(t, iterations+1, recordsCount)
}