Skip to content

Commit

Permalink
Merge pull request #16 from msales/metadata-race
Browse files Browse the repository at this point in the history
Kafka metadata is now a struct instead of a map (caused a data race)
  • Loading branch information
michalkurzeja authored Aug 3, 2018
2 parents 27d4006 + d7743ce commit 3557fea
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ type SourceConfig struct {
BufferSize int
}

type marker struct {
Topic string
Partition int32
Offset int64
}

// NewSourceConfig creates a new Kafka source configuration.
func NewSourceConfig() *SourceConfig {
c := &SourceConfig{
Expand Down Expand Up @@ -132,11 +138,9 @@ func (s *Source) Consume() (*streams.Message, error) {

// Commit marks the consumed records as processed.
func (s *Source) Commit(v interface{}) error {
state := v.(map[string]map[int32]int64)
for topic, partitions := range state {
for partition, offset := range partitions {
s.consumer.MarkPartitionOffset(topic, partition, offset, "")
}
state := v.([]marker)
for _, m := range state {
s.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "")
}

if err := s.consumer.CommitOffsets(); err != nil {
Expand All @@ -151,16 +155,21 @@ func (s *Source) Close() error {
return s.consumer.Close()
}

func (s *Source) markState(msg *sarama.ConsumerMessage) map[string]map[int32]int64 {
func (s *Source) markState(msg *sarama.ConsumerMessage) []marker {
partitions, ok := s.state[msg.Topic]
if !ok {
partitions = make(map[int32]int64)
s.state[msg.Topic] = partitions
}

partitions[msg.Partition] = msg.Offset
var markers []marker
for topic, partitions := range s.state {
for partition, offset := range partitions {
markers = append(markers, marker{Topic: topic, Partition: partition, Offset: offset})
}
}

return s.state
return markers
}

func (s *Source) readErrors() {
Expand Down

0 comments on commit 3557fea

Please sign in to comment.