diff --git a/component/async/kafka/simple/integration_test.go b/component/async/kafka/simple/integration_test.go index fae67ee59..e6ef74946 100644 --- a/component/async/kafka/simple/integration_test.go +++ b/component/async/kafka/simple/integration_test.go @@ -23,6 +23,7 @@ const ( simpleTopic4 = "simpleTopic4" simpleTopic5 = "simpleTopic5" simpleTopic6 = "simpleTopic6" + simpleTopic7 = "simpleTopic7" broker = "127.0.0.1:9093" ) @@ -364,6 +365,71 @@ func TestSimpleConsume_WithNotificationOnceReachingLatestOffset_NoMessages(t *te } } +func TestSimpleConsume_WithNotificationOnceReachingLatestOffset_WithTimestampOffset_RarelyUpdatedTopic(t *testing.T) { + require.NoError(t, testkafka.CreateTopics(broker, simpleTopic7)) + // Messages with old timestamps + now := time.Now() + times := []time.Time{ + now.Add(-10 * time.Hour), + now.Add(-8 * time.Hour), + now.Add(-5 * time.Hour), + } + sent := createTimestampPayload(times...) + + messages := make([]*sarama.ProducerMessage, 0) + for i, tm := range times { + val := sent[i] + msg := testkafka.CreateProducerMessage(simpleTopic7, val) + msg.Timestamp = tm + messages = append(messages, msg) + } + + err := testkafka.SendMessages(broker, messages...) + require.NoError(t, err) + + chErr := make(chan error) + chNotif := make(chan struct{}) + go func() { + saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-simple-consumer", true) + require.NoError(t, err) + + factory, err := New("test7", simpleTopic7, []string{broker}, saramaCfg, kafka.DecoderJSON(), kafka.Version(sarama.V2_1_0_0.String()), + WithTimestampOffset(4*time.Hour), WithNotificationOnceReachingLatestOffset(chNotif)) + if err != nil { + chErr <- err + return + } + + consumer, err := factory.Create() + if err != nil { + chErr <- err + return + } + defer func() { + _ = consumer.Close() + }() + + ctx, cnl := context.WithCancel(context.Background()) + defer cnl() + + _, _, err = consumer.Consume(ctx) + if err != nil { + chErr <- err + } + }() + + // At this stage, we have received all the expected messages. + // We should also check that the notification channel is also eventually closed. + select { + case err := <-chErr: + require.NoError(t, err) + case <-time.After(2 * time.Second): + assert.FailNow(t, "notification channel not closed") + case _, open := <-chNotif: + assert.False(t, open) + } +} + func createTimestampPayload(timestamps ...time.Time) []string { payloads := make([]string, len(timestamps)) for i, timestamp := range timestamps { diff --git a/component/async/kafka/simple/simple.go b/component/async/kafka/simple/simple.go index 3172e98ee..dab8930de 100644 --- a/component/async/kafka/simple/simple.go +++ b/component/async/kafka/simple/simple.go @@ -242,7 +242,8 @@ func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er } func (c *consumer) ifStartingOffsetAfterLatestOffset(latestOffset int64, partition int) bool { - return c.startingOffsets[int32(partition)] >= latestOffset + startingOffset := c.startingOffsets[int32(partition)] + return startingOffset <= sarama.OffsetNewest || startingOffset >= latestOffset } func (c *consumer) partitionsFromOffset(_ context.Context) ([]sarama.PartitionConsumer, error) {