diff --git a/kafka/source.go b/kafka/source.go index e02769c..e7269ee 100644 --- a/kafka/source.go +++ b/kafka/source.go @@ -25,6 +25,12 @@ type SourceConfig struct { BufferSize int } +type marker struct { + Topic string + Partition int32 + Offset int64 +} + // NewSourceConfig creates a new Kafka source configuration. func NewSourceConfig() *SourceConfig { c := &SourceConfig{ @@ -132,11 +138,9 @@ func (s *Source) Consume() (*streams.Message, error) { // Commit marks the consumed records as processed. func (s *Source) Commit(v interface{}) error { - state := v.(map[string]map[int32]int64) - for topic, partitions := range state { - for partition, offset := range partitions { - s.consumer.MarkPartitionOffset(topic, partition, offset, "") - } + state := v.([]marker) + for _, m := range state { + s.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "") } if err := s.consumer.CommitOffsets(); err != nil { @@ -151,16 +155,21 @@ func (s *Source) Close() error { return s.consumer.Close() } -func (s *Source) markState(msg *sarama.ConsumerMessage) map[string]map[int32]int64 { +func (s *Source) markState(msg *sarama.ConsumerMessage) []marker { partitions, ok := s.state[msg.Topic] if !ok { partitions = make(map[int32]int64) s.state[msg.Topic] = partitions } - partitions[msg.Partition] = msg.Offset + var markers []marker + for topic, partitions := range s.state { + for partition, offset := range partitions { + markers = append(markers, marker{Topic: topic, Partition: partition, Offset: offset}) + } + } - return s.state + return markers } func (s *Source) readErrors() {