Skip to content

Commit

Permalink
Deterministic start with timestamp offset on rarely populated topic (#…
Browse files Browse the repository at this point in the history
…508)

Co-authored-by: Sotirios Mantziaris <[email protected]>
  • Loading branch information
Chaus Kostiantyn and mantzas authored May 24, 2022
1 parent d0c75af commit 8835b4a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
66 changes: 66 additions & 0 deletions component/async/kafka/simple/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
simpleTopic4 = "simpleTopic4"
simpleTopic5 = "simpleTopic5"
simpleTopic6 = "simpleTopic6"
simpleTopic7 = "simpleTopic7"
broker = "127.0.0.1:9093"
)

Expand Down Expand Up @@ -364,6 +365,71 @@ func TestSimpleConsume_WithNotificationOnceReachingLatestOffset_NoMessages(t *te
}
}

func TestSimpleConsume_WithNotificationOnceReachingLatestOffset_WithTimestampOffset_RarelyUpdatedTopic(t *testing.T) {
require.NoError(t, testkafka.CreateTopics(broker, simpleTopic7))
// Messages with old timestamps
now := time.Now()
times := []time.Time{
now.Add(-10 * time.Hour),
now.Add(-8 * time.Hour),
now.Add(-5 * time.Hour),
}
sent := createTimestampPayload(times...)

messages := make([]*sarama.ProducerMessage, 0)
for i, tm := range times {
val := sent[i]
msg := testkafka.CreateProducerMessage(simpleTopic7, val)
msg.Timestamp = tm
messages = append(messages, msg)
}

err := testkafka.SendMessages(broker, messages...)
require.NoError(t, err)

chErr := make(chan error)
chNotif := make(chan struct{})
go func() {
saramaCfg, err := kafkacmp.DefaultConsumerSaramaConfig("test-simple-consumer", true)
require.NoError(t, err)

factory, err := New("test7", simpleTopic7, []string{broker}, saramaCfg, kafka.DecoderJSON(), kafka.Version(sarama.V2_1_0_0.String()),
WithTimestampOffset(4*time.Hour), WithNotificationOnceReachingLatestOffset(chNotif))
if err != nil {
chErr <- err
return
}

consumer, err := factory.Create()
if err != nil {
chErr <- err
return
}
defer func() {
_ = consumer.Close()
}()

ctx, cnl := context.WithCancel(context.Background())
defer cnl()

_, _, err = consumer.Consume(ctx)
if err != nil {
chErr <- err
}
}()

// At this stage, we have received all the expected messages.
// We should also check that the notification channel is also eventually closed.
select {
case err := <-chErr:
require.NoError(t, err)
case <-time.After(2 * time.Second):
assert.FailNow(t, "notification channel not closed")
case _, open := <-chNotif:
assert.False(t, open)
}
}

func createTimestampPayload(timestamps ...time.Time) []string {
payloads := make([]string, len(timestamps))
for i, timestamp := range timestamps {
Expand Down
3 changes: 2 additions & 1 deletion component/async/kafka/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
}

func (c *consumer) ifStartingOffsetAfterLatestOffset(latestOffset int64, partition int) bool {
return c.startingOffsets[int32(partition)] >= latestOffset
startingOffset := c.startingOffsets[int32(partition)]
return startingOffset <= sarama.OffsetNewest || startingOffset >= latestOffset
}

func (c *consumer) partitionsFromOffset(_ context.Context) ([]sarama.PartitionConsumer, error) {
Expand Down

0 comments on commit 8835b4a

Please sign in to comment.