Skip to content

Commit

Permalink
Add tests for async consumer error handling and batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes committed Nov 18, 2024
1 parent 0fd3b6d commit c9909a7
Show file tree
Hide file tree
Showing 3 changed files with 513 additions and 0 deletions.
343 changes: 343 additions & 0 deletions xkafka/batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestNewBatchConsumer(t *testing.T) {
ErrorHandler(NoopErrorHandler),
BatchSize(10),
BatchTimeout(testTimeout),
ManualCommit(true),
)
assert.NoError(t, err)
assert.NotNil(t, consumer)
Expand All @@ -60,6 +61,50 @@ func TestNewBatchConsumer(t *testing.T) {
assert.EqualValues(t, expectedConfig, consumer.config.configMap)
}

func TestNewBatchConsumer_ConfigValidation(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
options []ConsumerOption
expect error
}{
{
name: "missing brokers",
options: []ConsumerOption{testTopics, errHandler},
expect: ErrRequiredOption,
},
{
name: "missing topics",
options: []ConsumerOption{testBrokers, errHandler},
expect: ErrRequiredOption,
},
{
name: "missing error handler",
options: []ConsumerOption{testBrokers, testTopics},
expect: ErrRequiredOption,
},
{
name: "consumer error",
options: []ConsumerOption{
testTopics, testBrokers, errHandler,
consumerFunc(func(configMap *kafka.ConfigMap) (consumerClient, error) {
return nil, assert.AnError
}),
},
expect: assert.AnError,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
_, err := NewBatchConsumer("test-batch-consumer", noopBatchHandler(), tc.options...)
assert.Error(t, err)
assert.ErrorIs(t, err, tc.expect)
})
}
}

func TestBatchConsumer_Lifecycle(t *testing.T) {
t.Parallel()

Expand All @@ -75,6 +120,28 @@ func TestBatchConsumer_Lifecycle(t *testing.T) {
mockKafka.AssertExpectations(t)
})

t.Run("RunUnsubscribeError", func(t *testing.T) {
consumer, mockKafka := newTestBatchConsumer(t, defaultOpts...)

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("ReadMessage", testTimeout).Return(newFakeKafkaMessage(), nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("Close").Return(nil)

mockKafka.On("Unsubscribe").Return(assert.AnError)

ctx, cancel := context.WithCancel(context.Background())

go func() {
<-time.After(100 * time.Millisecond)
cancel()
}()

assert.Error(t, consumer.Run(ctx))

mockKafka.AssertExpectations(t)
})

t.Run("RunCloseError", func(t *testing.T) {
consumer, mockKafka := newTestBatchConsumer(t, defaultOpts...)

Expand Down Expand Up @@ -242,6 +309,116 @@ func TestBatchConsumer_Async(t *testing.T) {
mockKafka.AssertExpectations(t)
}

func TestBatchConsumer_StopOffsetOnError(t *testing.T) {
t.Parallel()

opts := append(defaultOpts,
Concurrency(2),
BatchSize(3),
)
consumer, mockKafka := newTestBatchConsumer(t, opts...)

km := newFakeKafkaMessage()
ctx, cancel := context.WithCancel(context.Background())

count := atomic.Int32{}

handler := BatchHandlerFunc(func(ctx context.Context, b *Batch) error {
assert.NotNil(t, b)

n := count.Add(1)

if n > 2 {
err := assert.AnError
cancel()

return b.AckFail(err)
}

b.AckSuccess()

return nil
})

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)
mockKafka.On("Close").Return(nil)

mockKafka.On("StoreOffsets", mock.Anything).
Return(nil, nil).
Times(2)

consumer.handler = handler
err := consumer.Run(ctx)
assert.NoError(t, err)

mockKafka.AssertExpectations(t)
}

func TestBatchConsumer_BatchTimeout(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
options []ConsumerOption
}{
{
name: "sequential",
options: []ConsumerOption{
BatchTimeout(10 * time.Millisecond),
BatchSize(100_000),
},
},
{
name: "async",
options: []ConsumerOption{
Concurrency(2),
BatchTimeout(10 * time.Millisecond),
BatchSize(100_000),
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
consumer, mockKafka := newTestBatchConsumer(t, append(defaultOpts, tc.options...)...)

km := newFakeKafkaMessage()
ctx, cancel := context.WithCancel(context.Background())

handler := BatchHandlerFunc(func(ctx context.Context, b *Batch) error {
b.AckSuccess()

assert.NotNil(t, b)
assert.True(t, len(b.Messages) > 0)

return nil
})

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("StoreOffsets", mock.Anything).Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)
mockKafka.On("Close").Return(nil)

consumer.handler = handler

go func() {
<-time.After(500 * time.Millisecond)
cancel()
}()

err := consumer.Run(ctx)
assert.NoError(t, err)

mockKafka.AssertExpectations(t)
})
}
}

func TestBatchConsumer_MiddlewareExecutionOrder(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -317,6 +494,172 @@ func TestBatchConsumer_ManualCommit(t *testing.T) {
mockKafka.AssertExpectations(t)
}

func TestBatchConsumer_ReadMessageTimeout(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
options []ConsumerOption
}{
{
name: "sequential",
options: []ConsumerOption{
BatchSize(2),
},
},
{
name: "async",
options: []ConsumerOption{
Concurrency(2),
BatchSize(2),
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
consumer, mockKafka := newTestBatchConsumer(t, append(defaultOpts, tc.options...)...)

ctx, cancel := context.WithCancel(context.Background())
expect := kafka.NewError(kafka.ErrTimedOut, "kafka: timed out", false)
km := newFakeKafkaMessage()

counter := atomic.Int32{}

handler := BatchHandlerFunc(func(ctx context.Context, b *Batch) error {
n := counter.Add(1)

if n == 1 {
cancel()
}

return nil
})

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil).Once()
mockKafka.On("ReadMessage", testTimeout).Return(nil, expect).Once()
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)
mockKafka.On("Close").Return(nil)

consumer.handler = handler

err := consumer.Run(ctx)
assert.NoError(t, err)

mockKafka.AssertExpectations(t)
})
}
}

func TestBatchConsumer_KafkaError(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
options []ConsumerOption
}{
{
name: "sequential",
options: []ConsumerOption{
BatchSize(2),
},
},
{
name: "async",
options: []ConsumerOption{
Concurrency(2),
BatchSize(2),
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
consumer, mockKafka := newTestBatchConsumer(t, append(defaultOpts, tc.options...)...)

ctx := context.Background()
expect := kafka.NewError(kafka.ErrUnknown, "kafka: unknown error", false)
km := newFakeKafkaMessage()

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("Close").Return(nil)

mockKafka.On("ReadMessage", testTimeout).
Return(km, nil).
Times(3)
mockKafka.On("ReadMessage", testTimeout).
Return(nil, expect).
Once()

err := consumer.Run(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, expect)

mockKafka.AssertExpectations(t)
})
}
}

func TestBatchConsumer_CommitError(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
options []ConsumerOption
}{
{
name: "sequential",
options: []ConsumerOption{
ManualCommit(true),
},
},
{
name: "async",
options: []ConsumerOption{
ManualCommit(true),
Concurrency(2),
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
consumer, mockKafka := newTestBatchConsumer(t, append(defaultOpts, tc.options...)...)

km := newFakeKafkaMessage()
ctx := context.Background()
expect := errors.New("error in commit")

handler := BatchHandlerFunc(func(ctx context.Context, b *Batch) error {
b.AckSuccess()

return nil
})

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("StoreOffsets", mock.Anything).Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)
mockKafka.On("Close").Return(nil)

mockKafka.On("Commit").Return(nil, expect)

consumer.handler = handler

err := consumer.Run(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, expect)

mockKafka.AssertExpectations(t)
})
}
}

func noopBatchHandler() BatchHandler {
return BatchHandlerFunc(func(ctx context.Context, b *Batch) error {
return nil
Expand Down
Loading

0 comments on commit c9909a7

Please sign in to comment.