diff --git a/extensions/kafka_consumer.go b/extensions/kafka_consumer.go index 38b75f1..8abaebc 100644 --- a/extensions/kafka_consumer.go +++ b/extensions/kafka_consumer.go @@ -53,8 +53,6 @@ type KafkaConsumer struct { stopChannel chan struct{} run bool HandleAllMessagesBeforeExiting bool - readyChan chan bool - readyOnce sync.Once } // NewKafkaConsumer for creating a new KafkaConsumer instance @@ -70,7 +68,6 @@ func NewKafkaConsumer( messagesReceived: 0, pendingMessagesWG: nil, stopChannel: *stopChannel, - readyChan: make(chan bool), } var client interfaces.KafkaConsumerClient if len(clientOrNil) == 1 { @@ -131,8 +128,7 @@ func (q *KafkaConsumer) configureConsumer(client interfaces.KafkaConsumerClient) "fetch.wait.max.ms": q.FetchWaitMaxMs, "enable.auto.commit": true, "default.topic.config": kafka.ConfigMap{ - "auto.offset.reset": q.OffsetResetStrategy, - "enable.auto.commit": true, + "auto.offset.reset": q.OffsetResetStrategy, }, "topics": q.Topics, }) @@ -229,10 +225,6 @@ func (q *KafkaConsumer) ConsumeLoop() error { l.Info("successfully subscribed to topics") - q.readyOnce.Do(func() { - close(q.readyChan) - }) - //nolint[:gosimple] for q.run { message, err := q.Consumer.ReadMessage(100) @@ -249,10 +241,6 @@ func (q *KafkaConsumer) ConsumeLoop() error { return nil } -func (q *KafkaConsumer) Ready() <-chan bool { - return q.readyChan -} - func (q *KafkaConsumer) receiveMessage(topicPartition kafka.TopicPartition, value []byte) { l := q.Logger.WithFields(logrus.Fields{ "method": "receiveMessage", diff --git a/extensions/kafka_consumer_integration_test.go b/extensions/kafka_consumer_integration_test.go deleted file mode 100644 index 266c9ec..0000000 --- a/extensions/kafka_consumer_integration_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package extensions - -import ( - "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/suite" - "github.com/topfreegames/pusher/util" - "os" - "testing" - "time" -) - -type KafkaExtensionIntegrationTestSuite struct { - suite.Suite - logger *logrus.Logger -} - -func TestKafkaExtensionIntegrationSuite(t *testing.T) { - suite.Run(t, new(KafkaExtensionIntegrationTestSuite)) -} - -func (suite *KafkaExtensionIntegrationTestSuite) SetupSuite() { - suite.logger = logrus.New() -} - -func (suite *KafkaExtensionIntegrationTestSuite) TestConsumeLoopShouldReceiveMessage() { - configFile := os.Getenv("CONFIG_FILE") - if configFile == "" { - configFile = "../config/test.yaml" - } - config, err := util.NewViperWithConfigFile(configFile) - suite.NoError(err) - - stopChannel := make(chan struct{}) - client, err := NewKafkaConsumer(config, suite.logger, &stopChannel) - suite.NoError(err) - suite.NotNil(client) - defer client.Cleanup() - - go client.ConsumeLoop() - - select { - case <-client.Ready(): - suite.logger.Info("client is ready") - case <-time.After(15 * time.Second): - suite.Fail("timeout waiting for client to be ready") - } - - time.Sleep(200 * time.Millisecond) - - p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": client.Brokers}) - suite.NoError(err) - topic := "push-mygame_apns-single" - err = p.Produce( - &kafka.Message{ - TopicPartition: kafka.TopicPartition{ - Topic: &topic, - Partition: kafka.PartitionAny, - }, - Value: []byte("Hello Go!")}, - nil, - ) - suite.NoError(err) - suite.logger.Info("message produced") - - select { - case msg := <-client.msgChan: - suite.Equal([]byte("Hello Go!"), msg.Value) - case <-time.After(2 * time.Minute): - suite.Fail("timeout waiting for message to be received") - } -} diff --git a/extensions/kafka_consumer_test.go b/extensions/kafka_consumer_test.go index b329944..851d440 100644 --- a/extensions/kafka_consumer_test.go +++ b/extensions/kafka_consumer_test.go @@ -1,198 +1,219 @@ -/* - * Copyright (c) 2016 TFG Co - * Author: TFG Co - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - package extensions import ( "fmt" - "github.com/onsi/gomega" - "github.com/sirupsen/logrus" "os" - "testing" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" - "github.com/stretchr/testify/suite" "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/mocks" "github.com/topfreegames/pusher/util" ) -type KafkaExtensionTestSuite struct { - suite.Suite - logger *logrus.Logger - hook *test.Hook - kafkaConsumerClient *mocks.KafkaConsumerClientMock - consumer *KafkaConsumer - config *viper.Viper -} - -func (suite *KafkaExtensionTestSuite) SetupTest() { - suite.logger, suite.hook = test.NewNullLogger() - suite.logger.Level = logrus.DebugLevel - suite.kafkaConsumerClient = mocks.NewKafkaConsumerClientMock() - suite.config = viper.New() - suite.config.Set("queue.topics", []string{"com.games.test"}) - suite.config.Set("queue.brokers", "localhost:9941") - suite.config.Set("queue.group", "testGroup") - suite.config.Set("queue.sessionTimeout", 6000) - suite.config.Set("queue.offsetResetStrategy", "latest") - suite.config.Set("queue.handleAllMessagesBeforeExiting", true) - - stopChannel := make(chan struct{}) - var err error - suite.consumer, err = NewKafkaConsumer( - suite.config, suite.logger, - &stopChannel, suite.kafkaConsumerClient, - ) - suite.NoError(err) -} - -func (suite *KafkaExtensionTestSuite) TestCreatingNewClient() { - suite.NotEmpty(suite.consumer.Brokers) - suite.Len(suite.consumer.Topics, 1) - suite.NotNil(suite.consumer.ConsumerGroup) - closedChanMatcher := gomega.BeClosed() - isClosed, err := closedChanMatcher.Match(suite.consumer.msgChan) - suite.NoError(err) - suite.False(isClosed) - suite.Equal(suite.kafkaConsumerClient, suite.consumer.Consumer) -} - -func (suite *KafkaExtensionTestSuite) TestStopConsuming() { - suite.consumer.run = true - suite.consumer.StopConsuming() - suite.False(suite.consumer.run) -} - -func (suite *KafkaExtensionTestSuite) TestConsumeLoop_SubscribeFails() { - suite.kafkaConsumerClient.Error = fmt.Errorf("could not subscribe") - err := suite.consumer.ConsumeLoop() - suite.Error(err) - suite.Contains(err.Error(), "could not subscribe") -} - -func (suite *KafkaExtensionTestSuite) TestConsumeLoop_SubscribeToTopic() { - go suite.consumer.ConsumeLoop() - defer suite.consumer.StopConsuming() - suite.Eventually(func() bool { - _, ok := suite.kafkaConsumerClient.SubscribedTopics["com.games.test"] - return ok - }, 5*time.Second, 1*time.Second) -} - -func (suite *KafkaExtensionTestSuite) TestConsumeLoop_ReceiveMessage() { - topic := "push-games_apns-single" - go suite.consumer.ConsumeLoop() - defer suite.consumer.StopConsuming() - time.Sleep(5 * time.Millisecond) - - part := kafka.TopicPartition{ - Topic: &topic, - Partition: 1, - } - val := []byte("test") - event := &kafka.Message{TopicPartition: part, Value: val} - suite.consumer.messagesReceived = 999 - suite.kafkaConsumerClient.MessagesChan <- event - - timeout := time.Millisecond * 30 - timer := time.NewTimer(timeout) - select { - case <-timer.C: - suite.Fail("timeout with no message received") - case msg := <-suite.consumer.msgChan: - suite.Equal(interfaces.KafkaMessage{ - Game: "games", - Topic: topic, - Value: val, - }, msg) - } - suite.Equal(int64(1000), suite.consumer.messagesReceived) -} - -func (suite *KafkaExtensionTestSuite) TestConfigurationDefaults() { - cnf := viper.New() - stopChannel := make(chan struct{}) - cons, err := NewKafkaConsumer(cnf, suite.logger, &stopChannel, suite.kafkaConsumerClient) - suite.NoError(err) - cons.loadConfigurationDefaults() - - suite.Equal([]string{"com.games.test"}, cnf.GetStringSlice("queue.topics")) - suite.Equal("localhost:9092", cnf.GetString("queue.brokers")) - suite.Equal("test", cnf.GetString("queue.group")) - suite.Equal(6000, cnf.GetInt("queue.sessionTimeout")) - suite.Equal("latest", cnf.GetString("queue.offsetResetStrategy")) - suite.True(cnf.GetBool("queue.handleAllMessagesBeforeExiting")) -} - -func (suite *KafkaExtensionTestSuite) TestPendingMessagesWaitingGroup() { - pmwg := suite.consumer.PendingMessagesWaitGroup() - suite.NotNil(pmwg) -} - -func (suite *KafkaExtensionTestSuite) TestCleanup_StopRunning() { - suite.consumer.run = true - err := suite.consumer.Cleanup() - suite.NoError(err) - suite.False(suite.consumer.run) -} - -func (suite *KafkaExtensionTestSuite) TestCleanup_CloseConnection() { - err := suite.consumer.Cleanup() - suite.NoError(err) - suite.True(suite.kafkaConsumerClient.Closed) -} - -func (suite *KafkaExtensionTestSuite) TestCleanup_CloseConnectionError() { - suite.kafkaConsumerClient.Error = fmt.Errorf("could not close connection") - err := suite.consumer.Cleanup() - suite.Error(err) - suite.Equal(suite.kafkaConsumerClient.Error.Error(), err.Error()) -} - -func (suite *KafkaExtensionTestSuite) TestCreatingNewClient_Integration() { - configFile := os.Getenv("CONFIG_FILE") - if configFile == "" { - configFile = "../config/test.yaml" - } - config, err := util.NewViperWithConfigFile(configFile) - suite.NoError(err) - - stopChannel := make(chan struct{}) - client, err := NewKafkaConsumer(config, suite.logger, &stopChannel) - suite.NoError(err) - - suite.NotEmpty(client.Brokers) - suite.Len(client.Topics, 1) - suite.NotNil(client.ConsumerGroup) - closedChanMatcher := gomega.BeClosed() - isClosed, err := closedChanMatcher.Match(client.msgChan) - suite.NoError(err) - suite.False(isClosed) -} - -func TestKafkaExtensionSuite(t *testing.T) { - suite.Run(t, new(KafkaExtensionTestSuite)) -} +var _ = Describe("Kafka Extension", func() { + logger, hook := test.NewNullLogger() + logger.Level = logrus.DebugLevel + + BeforeEach(func() { + hook.Reset() + }) + + Describe("[Unit]", func() { + var kafkaConsumerClientMock *mocks.KafkaConsumerClientMock + var consumer *KafkaConsumer + + startConsuming := func() { + go func() { + defer GinkgoRecover() + consumer.ConsumeLoop() + }() + time.Sleep(5 * time.Millisecond) + } + + publishEvent := func(ev *kafka.Message) { + consumer.Consumer.(*mocks.KafkaConsumerClientMock).MessagesChan <- ev + //This time.sleep is necessary to allow go's goroutines to perform work + //Please do not remove + time.Sleep(5 * time.Millisecond) + } + + BeforeEach(func() { + kafkaConsumerClientMock = mocks.NewKafkaConsumerClientMock() + config := viper.New() + config.Set("queue.topics", []string{"com.games.test"}) + config.Set("queue.brokers", "localhost:9941") + config.Set("queue.group", "testGroup") + config.Set("queue.sessionTimeout", 6000) + config.Set("queue.offsetResetStrategy", "latest") + config.Set("queue.handleAllMessagesBeforeExiting", true) + + var err error + stopChannel := make(chan struct{}) + consumer, err = NewKafkaConsumer( + config, logger, + &stopChannel, kafkaConsumerClientMock, + ) + Expect(err).NotTo(HaveOccurred()) + }) + + Describe("Creating new client", func() { + It("should return connected client", func() { + Expect(consumer.Brokers).NotTo(HaveLen(0)) + Expect(consumer.Topics).To(HaveLen(1)) + Expect(consumer.ConsumerGroup).NotTo(BeNil()) + Expect(consumer.msgChan).NotTo(BeClosed()) + Expect(consumer.Consumer).To(Equal(kafkaConsumerClientMock)) + }) + }) + + Describe("Stop consuming", func() { + It("should stop consuming", func() { + consumer.run = true + consumer.StopConsuming() + Expect(consumer.run).To(BeFalse()) + }) + }) + + Describe("Consume loop", func() { + It("should fail if subscribing to topic fails", func() { + kafkaConsumerClientMock.Error = fmt.Errorf("could not subscribe") + err := consumer.ConsumeLoop() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("could not subscribe")) + }) + + It("should subscribe to topic", func() { + startConsuming() + defer consumer.StopConsuming() + Eventually(kafkaConsumerClientMock.SubscribedTopics, 5).Should(HaveKey("com.games.test")) + }) + + It("should receive message", func() { + topic := "push-games_apns-single" + startConsuming() + defer consumer.StopConsuming() + part := kafka.TopicPartition{ + Topic: &topic, + Partition: 1, + } + val := []byte("test") + event := &kafka.Message{TopicPartition: part, Value: val} + consumer.messagesReceived = 999 + + publishEvent(event) + Eventually(consumer.msgChan, 5).Should(Receive(&interfaces.KafkaMessage{ + Topic: topic, + Value: val, + })) + Expect(consumer.messagesReceived).To(BeEquivalentTo(1000)) + }) + }) + + Describe("Configuration Defaults", func() { + It("should configure defaults", func() { + cnf := viper.New() + stopChannel := make(chan struct{}) + cons, err := NewKafkaConsumer(cnf, logger, &stopChannel, kafkaConsumerClientMock) + Expect(err).NotTo(HaveOccurred()) + cons.loadConfigurationDefaults() + + Expect(cnf.GetStringSlice("queue.topics")).To(Equal([]string{"com.games.test"})) + Expect(cnf.GetString("queue.brokers")).To(Equal("localhost:9092")) + Expect(cnf.GetString("queue.group")).To(Equal("test")) + Expect(cnf.GetInt("queue.sessionTimeout")).To(Equal(6000)) + Expect(cnf.GetString("queue.offsetResetStrategy")).To(Equal("latest")) + Expect(cnf.GetBool("queue.handleAllMessagesBeforeExiting")).To(BeTrue()) + }) + }) + + Describe("Pending Messages Waiting Group", func() { + It("should return the waiting group", func() { + pmwg := consumer.PendingMessagesWaitGroup() + Expect(pmwg).NotTo(BeNil()) + }) + }) + + Describe("Cleanup", func() { + It("should stop running upon cleanup", func() { + consumer.run = true + err := consumer.Cleanup() + Expect(err).NotTo(HaveOccurred()) + Expect(consumer.run).To(BeFalse()) + }) + + It("should close connection to kafka upon cleanup", func() { + err := consumer.Cleanup() + Expect(err).NotTo(HaveOccurred()) + Expect(kafkaConsumerClientMock.Closed).To(BeTrue()) + }) + + It("should return error when closing connection to kafka upon cleanup", func() { + kafkaConsumerClientMock.Error = fmt.Errorf("could not close connection") + err := consumer.Cleanup() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal(kafkaConsumerClientMock.Error.Error())) + }) + }) + }) + + Describe("[Integration]", func() { + var config *viper.Viper + var err error + configFile := os.Getenv("CONFIG_FILE") + if configFile == "" { + configFile = "../config/test.yaml" + } + + BeforeEach(func() { + config, err = util.NewViperWithConfigFile(configFile) + Expect(err).NotTo(HaveOccurred()) + }) + + Describe("Creating new client", func() { + It("should return connected client", func() { + stopChannel := make(chan struct{}) + client, err := NewKafkaConsumer(config, logger, &stopChannel) + Expect(err).NotTo(HaveOccurred()) + + Expect(client.Brokers).NotTo(HaveLen(0)) + Expect(client.Topics).To(HaveLen(1)) + Expect(client.ConsumerGroup).NotTo(BeNil()) + Expect(client.msgChan).NotTo(BeClosed()) + }) + }) + + PDescribe("ConsumeLoop", func() { + It("should consume message and add it to msgChan", func() { + stopChannel := make(chan struct{}) + client, err := NewKafkaConsumer(config, logger, &stopChannel) + Expect(err).NotTo(HaveOccurred()) + Expect(client).NotTo(BeNil()) + defer client.StopConsuming() + go client.ConsumeLoop() + + // Required to assure the consumer to be ready before producing a message + time.Sleep(5 * time.Second) + + p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": client.Brokers}) + Expect(err).NotTo(HaveOccurred()) + err = p.Produce( + &kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &client.Topics[0], + Partition: kafka.PartitionAny, + }, + Value: []byte("Hello Go!")}, + nil, + ) + Expect(err).NotTo(HaveOccurred()) + Eventually(client.msgChan, 10*time.Second).Should(Receive(Equal([]byte("Hello Go!")))) + }) + }) + }) +}) diff --git a/feedback/kafka_consumer.go b/feedback/kafka_consumer.go index 37c2177..9902baa 100644 --- a/feedback/kafka_consumer.go +++ b/feedback/kafka_consumer.go @@ -23,7 +23,6 @@ package feedback import ( - "fmt" "sync" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -54,8 +53,6 @@ type KafkaConsumer struct { stopChannel chan struct{} run bool HandleAllMessagesBeforeExiting bool - readyChan chan bool - readyOnce sync.Once } // NewKafkaConsumer for creating a new KafkaConsumer instance @@ -71,7 +68,6 @@ func NewKafkaConsumer( messagesReceived: 0, pendingMessagesWG: nil, stopChannel: *stopChannel, - readyChan: make(chan bool), } var client interfaces.KafkaConsumerClient @@ -193,16 +189,7 @@ func (q *KafkaConsumer) ConsumeLoop() error { "topics": q.Topics, }) - err := q.Consumer.SubscribeTopics(q.Topics, func(c *kafka.Consumer, e kafka.Event) error { - if _, ok := e.(kafka.AssignedPartitions); ok { - fmt.Println("SONIA: ready") - q.readyOnce.Do(func() { - close(q.readyChan) - }) - } - fmt.Println("SONIA: event arrived") - return nil - }) + err := q.Consumer.SubscribeTopics(q.Topics, nil) if err != nil { l.WithError(err).Error("error subscribing to topics") return err @@ -226,10 +213,6 @@ func (q *KafkaConsumer) ConsumeLoop() error { return nil } -func (q *KafkaConsumer) Ready() <-chan bool { - return q.readyChan -} - func (q *KafkaConsumer) receiveMessage(topicPartition kafka.TopicPartition, value []byte) { l := q.Logger.WithFields(logrus.Fields{ "method": "receiveMessage", diff --git a/feedback/kafka_consumer_test.go b/feedback/kafka_consumer_test.go index 19fafde..ec7cfb1 100644 --- a/feedback/kafka_consumer_test.go +++ b/feedback/kafka_consumer_test.go @@ -228,7 +228,9 @@ var _ = Describe("Kafka Consumer", func() { Expect(client).NotTo(BeNil()) defer client.StopConsuming() go client.ConsumeLoop() - Eventually(client.Ready(), 30*time.Second).Should(BeClosed()) + + // Required to assure the consumer to be ready before producing a message + time.Sleep(5 * time.Second) p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": client.Brokers}) Expect(err).NotTo(HaveOccurred()) diff --git a/interfaces/kafka.go b/interfaces/kafka.go index 912b431..e9599d7 100644 --- a/interfaces/kafka.go +++ b/interfaces/kafka.go @@ -42,5 +42,4 @@ type KafkaConsumerClient interface { Pause([]kafka.TopicPartition) error Resume([]kafka.TopicPartition) error Assignment() ([]kafka.TopicPartition, error) - Assign(partitions []kafka.TopicPartition) (err error) }