diff --git a/consumer.go b/consumer.go index be668a9..199f590 100644 --- a/consumer.go +++ b/consumer.go @@ -428,22 +428,26 @@ func (c *Consumer) subscribe(subs map[string][]int32) error { } // create consumers in parallel - errs := make(chan error, len(subs)) + var mu sync.Mutex + var wg sync.WaitGroup + for topic, partitions := range subs { for _, partition := range partitions { + wg.Add(1) + info := offsets[topic][partition] go func(t string, p int32) { - errs <- c.createConsumer(t, p, info) + if e := c.createConsumer(t, p, info); e != nil { + mu.Lock() + err = e + mu.Unlock() + } + wg.Done() }(topic, partition) } } + wg.Wait() - // consume errors - for i := 0; i < len(subs); i++ { - if e := <-errs; e != nil { - err = e - } - } if err != nil { _ = c.release() _ = c.leaveGroup() diff --git a/partitions.go b/partitions.go index d9a987d..9d97981 100644 --- a/partitions.go +++ b/partitions.go @@ -11,7 +11,7 @@ type partitionConsumer struct { pcm sarama.PartitionConsumer state partitionState - mutex sync.Mutex + mu sync.Mutex closed bool dying, dead chan none @@ -85,9 +85,9 @@ func (c *partitionConsumer) State() partitionState { return partitionState{} } - c.mutex.Lock() + c.mu.Lock() state := c.state - c.mutex.Unlock() + c.mu.Unlock() return state } @@ -97,11 +97,11 @@ func (c *partitionConsumer) MarkCommitted(offset int64) { return } - c.mutex.Lock() + c.mu.Lock() if offset == c.state.Info.Offset { c.state.Dirty = false } - c.mutex.Unlock() + c.mu.Unlock() } func (c *partitionConsumer) MarkOffset(offset int64, metadata string) { @@ -109,13 +109,13 @@ func (c *partitionConsumer) MarkOffset(offset int64, metadata string) { return } - c.mutex.Lock() + c.mu.Lock() if offset > c.state.Info.Offset { c.state.Info.Offset = offset c.state.Info.Metadata = metadata c.state.Dirty = true } - c.mutex.Unlock() + c.mu.Unlock() } // -------------------------------------------------------------------- @@ -128,8 +128,8 @@ type partitionState struct { // -------------------------------------------------------------------- type partitionMap struct { - data map[topicPartition]*partitionConsumer - mutex sync.RWMutex + data map[topicPartition]*partitionConsumer + mu sync.RWMutex } func newPartitionMap() *partitionMap { @@ -139,21 +139,21 @@ func newPartitionMap() *partitionMap { } func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer { - m.mutex.RLock() + m.mu.RLock() pc, _ := m.data[topicPartition{topic, partition}] - m.mutex.RUnlock() + m.mu.RUnlock() return pc } func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) { - m.mutex.Lock() + m.mu.Lock() m.data[topicPartition{topic, partition}] = pc - m.mutex.Unlock() + m.mu.Unlock() } func (m *partitionMap) HasDirty() bool { - m.mutex.RLock() - defer m.mutex.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() for _, pc := range m.data { if state := pc.State(); state.Dirty { @@ -164,8 +164,8 @@ func (m *partitionMap) HasDirty() bool { } func (m *partitionMap) Snapshot() map[topicPartition]partitionState { - m.mutex.RLock() - defer m.mutex.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() snap := make(map[topicPartition]partitionState, len(m.data)) for tp, pc := range m.data { @@ -175,10 +175,10 @@ func (m *partitionMap) Snapshot() map[topicPartition]partitionState { } func (m *partitionMap) Stop() { - m.mutex.RLock() - defer m.mutex.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() - wg := new(sync.WaitGroup) + var wg sync.WaitGroup for tp := range m.data { wg.Add(1) go func(p *partitionConsumer) { @@ -190,20 +190,20 @@ func (m *partitionMap) Stop() { } func (m *partitionMap) Clear() { - m.mutex.Lock() + m.mu.Lock() for tp := range m.data { delete(m.data, tp) } - m.mutex.Unlock() + m.mu.Unlock() } func (m *partitionMap) Info() map[string][]int32 { info := make(map[string][]int32) - m.mutex.RLock() + m.mu.RLock() for tp := range m.data { info[tp.Topic] = append(info[tp.Topic], tp.Partition) } - m.mutex.RUnlock() + m.mu.RUnlock() for topic := range info { sort.Sort(int32Slice(info[topic]))