From 94fcb63a6c4225fd1631b70ff3463aabb734c992 Mon Sep 17 00:00:00 2001 From: Mick Staugaard Date: Thu, 9 Jan 2020 15:46:21 +1300 Subject: [PATCH] Add connection retry logic --- cmd/createTopic.go | 13 +++---------- cmd/deleteTopic.go | 13 ++++--------- cmd/root.go | 46 ++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/cmd/createTopic.go b/cmd/createTopic.go index f516a8a..71d1a39 100644 --- a/cmd/createTopic.go +++ b/cmd/createTopic.go @@ -18,18 +18,11 @@ kafkaCLI createTopic --bootstrap-server kafka:9092 --partitions 4 --replication- `, Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { - client, err := kafkaClient() - if err != nil { - panic(err) - } - kafkaAdmin, err := sarama.NewClusterAdminFromClient(client) - if err != nil { - panic(err) - } + admin := kafkaAdmin() for _, topicName := range args { fmt.Println("Creating topic " + topicName) - err = kafkaAdmin.CreateTopic(topicName, topicDetail(), false) + err := admin.CreateTopic(topicName, topicDetail(), false) if err != nil { switch err.(type) { case *sarama.TopicError: @@ -51,7 +44,7 @@ kafkaCLI createTopic --bootstrap-server kafka:9092 --partitions 4 --replication- } } - _ = kafkaAdmin.Close() + _ = admin.Close() }, } diff --git a/cmd/deleteTopic.go b/cmd/deleteTopic.go index f723018..d83c62c 100644 --- a/cmd/deleteTopic.go +++ b/cmd/deleteTopic.go @@ -17,18 +17,11 @@ kafkaCLI deleteTopic --bootstrap-server kafka:9092 topic1 topic2 `, Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { - client, err := kafkaClient() - if err != nil { - panic(err) - } - kafkaAdmin, err := sarama.NewClusterAdminFromClient(client) - if err != nil { - panic(err) - } + admin := kafkaAdmin() for _, topicName := range args { fmt.Println("Deleting topic " + topicName) - err = kafkaAdmin.DeleteTopic(topicName) + err := admin.DeleteTopic(topicName) if err != nil { switch err { @@ -40,6 +33,8 @@ kafkaCLI deleteTopic --bootstrap-server kafka:9092 topic1 topic2 } } } + + _ = admin.Close() }, } diff --git a/cmd/root.go b/cmd/root.go index 7a08414..3e94013 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "os" + "time" "github.com/Shopify/sarama" "github.com/spf13/cobra" @@ -30,9 +31,50 @@ func init() { _ = rootCmd.MarkPersistentFlagRequired("bootstrap-server") } -func kafkaClient() (sarama.Client, error) { +func kafkaClient() sarama.Client { kafkaConfig := sarama.NewConfig() kafkaConfig.Version = sarama.V1_0_0_0 addresses := []string{bootstrapServer} - return sarama.NewClient(addresses, kafkaConfig) + + var client sarama.Client + var err error + + for i := 0; i < 30; i++ { + client, err = sarama.NewClient(addresses, kafkaConfig) + if err == nil { + break + } + + fmt.Println("failed to connect to " + bootstrapServer + " Retrying in 1s") + time.Sleep(time.Second) + } + + if err != nil { + panic(err) + } + + return client +} + +func kafkaAdmin() sarama.ClusterAdmin { + client := kafkaClient() + + var admin sarama.ClusterAdmin + var err error + + for i := 0; i < 10; i++ { + admin, err = sarama.NewClusterAdminFromClient(client) + if err == nil { + break + } + + fmt.Println("failed to admin cluster at " + bootstrapServer + " Retrying in 1s") + time.Sleep(time.Second) + } + + if err != nil { + panic(err) + } + + return admin }