From 114359bfaaa8682a1f9f2436645764d3a4b3ebfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kurzeja?= Date: Fri, 3 Aug 2018 11:27:10 +0200 Subject: [PATCH] Updating offset correctly (#17) --- kafka/source.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/source.go b/kafka/source.go index e7269ee..66174b4 100644 --- a/kafka/source.go +++ b/kafka/source.go @@ -162,6 +162,8 @@ func (s *Source) markState(msg *sarama.ConsumerMessage) []marker { s.state[msg.Topic] = partitions } + partitions[msg.Partition] = msg.Offset + var markers []marker for topic, partitions := range s.state { for partition, offset := range partitions {