Skip to content

Commit

Permalink
Try fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelreiswildlife committed Mar 15, 2024
1 parent b3fceeb commit adb0c66
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 288 deletions.
14 changes: 1 addition & 13 deletions extensions/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ type KafkaConsumer struct {
stopChannel chan struct{}
run bool
HandleAllMessagesBeforeExiting bool
readyChan chan bool
readyOnce sync.Once
}

// NewKafkaConsumer for creating a new KafkaConsumer instance
Expand All @@ -70,7 +68,6 @@ func NewKafkaConsumer(
messagesReceived: 0,
pendingMessagesWG: nil,
stopChannel: *stopChannel,
readyChan: make(chan bool),
}
var client interfaces.KafkaConsumerClient
if len(clientOrNil) == 1 {
Expand Down Expand Up @@ -131,8 +128,7 @@ func (q *KafkaConsumer) configureConsumer(client interfaces.KafkaConsumerClient)
"fetch.wait.max.ms": q.FetchWaitMaxMs,
"enable.auto.commit": true,
"default.topic.config": kafka.ConfigMap{
"auto.offset.reset": q.OffsetResetStrategy,
"enable.auto.commit": true,
"auto.offset.reset": q.OffsetResetStrategy,
},
"topics": q.Topics,
})
Expand Down Expand Up @@ -229,10 +225,6 @@ func (q *KafkaConsumer) ConsumeLoop() error {

l.Info("successfully subscribed to topics")

q.readyOnce.Do(func() {
close(q.readyChan)
})

//nolint[:gosimple]
for q.run {
message, err := q.Consumer.ReadMessage(100)
Expand All @@ -249,10 +241,6 @@ func (q *KafkaConsumer) ConsumeLoop() error {
return nil
}

func (q *KafkaConsumer) Ready() <-chan bool {
return q.readyChan
}

func (q *KafkaConsumer) receiveMessage(topicPartition kafka.TopicPartition, value []byte) {
l := q.Logger.WithFields(logrus.Fields{
"method": "receiveMessage",
Expand Down
72 changes: 0 additions & 72 deletions extensions/kafka_consumer_integration_test.go

This file was deleted.

Loading

0 comments on commit adb0c66

Please sign in to comment.