diff --git a/cmd/root.go b/cmd/root.go index 53218d7..a8ca3c9 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -4,6 +4,8 @@ import ( "fmt" "math" "os" + "strings" + "sync" "github.com/rabbitmq/omq/pkg/amqp10_client" "github.com/rabbitmq/omq/pkg/config" @@ -14,10 +16,16 @@ import ( ) var ( - amqp = &cobra.Command{} - stomp = &cobra.Command{} - mqtt = &cobra.Command{} - rootCmd = &cobra.Command{} + amqp_amqp = &cobra.Command{} + amqp_stomp = &cobra.Command{} + amqp_mqtt = &cobra.Command{} + stomp_stomp = &cobra.Command{} + stomp_amqp = &cobra.Command{} + stomp_mqtt = &cobra.Command{} + mqtt_mqtt = &cobra.Command{} + mqtt_amqp = &cobra.Command{} + mqtt_stomp = &cobra.Command{} + rootCmd = &cobra.Command{} ) func Execute() { @@ -31,49 +39,83 @@ func Execute() { func RootCmd() *cobra.Command { var cfg config.Config - amqp = &cobra.Command{ + amqp_amqp = &cobra.Command{ Use: "amqp-amqp", Aliases: []string{"amqp"}, Run: func(cmd *cobra.Command, args []string) { - amqp10_client.Start(cfg) + start(cfg, amqp10_client.Publisher, amqp10_client.Consumer) }, - PreRun: func(cmd *cobra.Command, args []string) { - if cfg.Size < 12 { - _, _ = fmt.Fprintf(os.Stderr, "ERROR: size can't be less than 12 bytes\n") - os.Exit(1) - } + } + + amqp_stomp = &cobra.Command{ + Use: "amqp-stomp", + Run: func(cmd *cobra.Command, args []string) { + start(cfg, amqp10_client.Publisher, stomp_client.Consumer) + }, + } + + amqp_mqtt = &cobra.Command{ + Use: "amqp-mqtt", + Run: func(cmd *cobra.Command, args []string) { + start(cfg, amqp10_client.Publisher, mqtt_client.Consumer) }, } - stomp = &cobra.Command{ + stomp_stomp = &cobra.Command{ Use: "stomp-stomp", Aliases: []string{"stomp"}, Run: func(cmd *cobra.Command, args []string) { - stomp_client.Start(cfg) + start(cfg, stomp_client.Publisher, stomp_client.Consumer) }, - PreRun: func(cmd *cobra.Command, args []string) { - if cfg.Size < 12 { - _, _ = fmt.Fprintf(os.Stderr, "ERROR: size can't be less than 12 bytes\n") - os.Exit(1) - } + } + + stomp_amqp = &cobra.Command{ + Use: "stomp-amqp", + Run: func(cmd *cobra.Command, args []string) { + start(cfg, stomp_client.Publisher, amqp10_client.Consumer) }, } - mqtt = &cobra.Command{ + stomp_mqtt = &cobra.Command{ + Use: "stomp-mqtt", + Run: func(cmd *cobra.Command, args []string) { + start(cfg, stomp_client.Publisher, mqtt_client.Consumer) + }, + } + + mqtt_mqtt = &cobra.Command{ Use: "mqtt-mqtt", Aliases: []string{"mqtt"}, Run: func(cmd *cobra.Command, args []string) { - mqtt_client.Start(cfg) + start(cfg, mqtt_client.Publisher, mqtt_client.Consumer) + }, + } + + mqtt_amqp = &cobra.Command{ + Use: "mqtt-amqp", + Run: func(cmd *cobra.Command, args []string) { + start(cfg, mqtt_client.Publisher, amqp10_client.Consumer) + }, + } + + mqtt_stomp = &cobra.Command{ + Use: "mqtt-stomp", + Run: func(cmd *cobra.Command, args []string) { + start(cfg, mqtt_client.Publisher, stomp_client.Consumer) }, - PreRun: func(cmd *cobra.Command, args []string) { + } + + var rootCmd = &cobra.Command{Use: "omq", + PersistentPreRun: func(cmd *cobra.Command, args []string) { if cfg.Size < 12 { _, _ = fmt.Fprintf(os.Stderr, "ERROR: size can't be less than 12 bytes\n") os.Exit(1) } + setUris(&cfg, cmd.Use) }, } - - var rootCmd = &cobra.Command{Use: "omq"} + rootCmd.PersistentFlags().StringVarP(&cfg.PublisherUri, "publisher-uri", "", "", "URI for publishing") + rootCmd.PersistentFlags().StringVarP(&cfg.ConsumerUri, "consumer-uri", "", "", "URI for consuming") rootCmd.PersistentFlags().IntVarP(&cfg.Publishers, "publishers", "x", 1, "The number of publishers to start") rootCmd.PersistentFlags().IntVarP(&cfg.Consumers, "consumers", "y", 1, "The number of consumers to start") rootCmd.PersistentFlags().IntVarP(&cfg.PublishCount, "pmessages", "C", math.MaxInt, "The number of messages to send per publisher (default=MaxInt)") @@ -85,9 +127,71 @@ func RootCmd() *cobra.Command { rootCmd.PersistentFlags().DurationVarP(&cfg.Duration, "duration", "z", 0, "Duration (eg. 10s, 5m, 2h)") rootCmd.PersistentFlags().BoolVarP(&cfg.UseMillis, "use-millis", "m", false, "Use milliseconds for timestamps") - rootCmd.AddCommand(amqp) - rootCmd.AddCommand(stomp) - rootCmd.AddCommand(mqtt) + rootCmd.AddCommand(amqp_amqp) + rootCmd.AddCommand(amqp_stomp) + rootCmd.AddCommand(amqp_mqtt) + rootCmd.AddCommand(stomp_stomp) + rootCmd.AddCommand(stomp_amqp) + rootCmd.AddCommand(stomp_mqtt) + rootCmd.AddCommand(mqtt_mqtt) + rootCmd.AddCommand(mqtt_amqp) + rootCmd.AddCommand(mqtt_stomp) return rootCmd } + +func start(cfg config.Config, publisherFunc func(config.Config, int), consumerFunc func(config.Config, chan bool, int)) { + var wg sync.WaitGroup + + if cfg.Consumers > 0 { + for i := 1; i <= cfg.Consumers; i++ { + subscribed := make(chan bool) + n := i + wg.Add(1) + go func() { + defer wg.Done() + consumerFunc(cfg, subscribed, n) + }() + + // wait until we know the receiver has subscribed + <-subscribed + } + } + + if cfg.Publishers > 0 { + for i := 1; i <= cfg.Publishers; i++ { + n := i + wg.Add(1) + go func() { + defer wg.Done() + publisherFunc(cfg, n) + }() + } + } + + wg.Wait() +} + +func setUris(cfg *config.Config, command string) { + if cfg.PublisherUri == "" { + println("setting publisher uri to ", defaultUri(strings.Split(command, "-")[0])) + (*cfg).PublisherUri = defaultUri(strings.Split(command, "-")[0]) + } + if cfg.ConsumerUri == "" { + println("setting consumer uri to ", defaultUri(strings.Split(command, "-")[1])) + (*cfg).ConsumerUri = defaultUri(strings.Split(command, "-")[1]) + } +} + +func defaultUri(proto string) string { + var uri = "localhost" + switch proto { + case "amqp": + uri = "amqp://localhost" + case "stomp": + uri = "localhost:61613" + case "mqtt": + uri = "localhost:1883" + } + return uri +} diff --git a/pkg/amqp10_client/amqp10.go b/pkg/amqp10_client/amqp10.go index 2123f1d..72898c0 100644 --- a/pkg/amqp10_client/amqp10.go +++ b/pkg/amqp10_client/amqp10.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "sync" "time" "github.com/rabbitmq/omq/pkg/config" @@ -17,45 +16,13 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -func Start(cfg config.Config) { - var wg sync.WaitGroup - - if cfg.Consumers > 0 { - for i := 1; i <= cfg.Consumers; i++ { - subscribed := make(chan bool) - n := i - wg.Add(1) - go func() { - defer wg.Done() - Consumer(cfg, subscribed, n) - }() - - // wait until we know the receiver has subscribed - <-subscribed - } - } - - if cfg.Publishers > 0 { - for i := 1; i <= cfg.Publishers; i++ { - n := i - wg.Add(1) - go func() { - defer wg.Done() - Publisher(cfg, n) - }() - } - } - - wg.Wait() -} - func Publisher(cfg config.Config, n int) { // sleep random interval to avoid all publishers publishing at the same time s := rand.Intn(cfg.Publishers) time.Sleep(time.Duration(s) * time.Millisecond) // open connection - conn, err := amqp.Dial(context.TODO(), cfg.AmqpUrl, nil) + conn, err := amqp.Dial(context.TODO(), cfg.PublisherUri, nil) if err != nil { log.Error("publisher connection failed", "protocol", "amqp-1.0", "publisherId", n, "error", err.Error()) return @@ -92,6 +59,7 @@ func Publisher(cfg config.Config, n int) { return } metrics.MessagesPublished.With(prometheus.Labels{"protocol": "amqp-1.0"}).Inc() + log.Error("message sent", "protocol", "amqp-1.0", "publisherId", n) utils.WaitBetweenMessages(cfg.Rate) } @@ -100,7 +68,7 @@ func Publisher(cfg config.Config, n int) { func Consumer(cfg config.Config, subscribed chan bool, n int) { // open connection - conn, err := amqp.Dial(context.TODO(), cfg.AmqpUrl, nil) + conn, err := amqp.Dial(context.TODO(), cfg.ConsumerUri, nil) if err != nil { log.Error("consumer failed to connect", "protocol", "amqp-1.0", "consumerId", n, "error", err.Error()) return diff --git a/pkg/config/config.go b/pkg/config/config.go index c928291..f6952a2 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,9 +3,8 @@ package config import "time" type Config struct { - AmqpUrl string - StompUrl string - MqttUrl string + PublisherUri string + ConsumerUri string Publishers int Consumers int PublishCount int diff --git a/pkg/mqtt_client/mqtt.go b/pkg/mqtt_client/mqtt.go index 7286de4..d0fa7d0 100644 --- a/pkg/mqtt_client/mqtt.go +++ b/pkg/mqtt_client/mqtt.go @@ -3,7 +3,6 @@ package mqtt_client import ( "fmt" "math/rand" - "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -16,38 +15,6 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -func Start(cfg config.Config) { - var wg sync.WaitGroup - - if cfg.Consumers > 0 { - for i := 1; i <= cfg.Consumers; i++ { - subscribed := make(chan bool) - n := i - wg.Add(1) - go func() { - defer wg.Done() - Consumer(cfg, subscribed, n) - }() - - // wait until we know the receiver has subscribed - <-subscribed - } - } - - if cfg.Publishers > 0 { - for i := 1; i <= cfg.Publishers; i++ { - n := i - wg.Add(1) - go func() { - defer wg.Done() - Publisher(cfg, n) - }() - } - } - - wg.Wait() -} - func Publisher(cfg config.Config, n int) { var token mqtt.Token @@ -57,20 +24,20 @@ func Publisher(cfg config.Config, n int) { // open connection opts := mqtt.NewClientOptions(). - AddBroker(cfg.MqttUrl). + AddBroker(cfg.PublisherUri). SetUsername("guest"). SetPassword("guest"). SetClientID(fmt.Sprintf("omq-pub-%d", n)). SetAutoReconnect(true). SetConnectionLostHandler(func(client mqtt.Client, reason error) { - log.Info("connection lost", "protocol", "mqtt", "publisherId", n) + log.Info("connection lost", "protocol", "MQTT", "publisherId", n) }). SetProtocolVersion(4) c := mqtt.NewClient(opts) token = c.Connect() token.Wait() - log.Info("publisher started", "protocol", "mqtt", "publisherId", n) + log.Info("publisher started", "protocol", "MQTT", "publisherId", n) topic := fmt.Sprintf("%s-%d", cfg.QueueNamePrefix, ((n-1)%cfg.QueueCount)+1) @@ -85,14 +52,14 @@ func Publisher(cfg config.Config, n int) { token.Wait() timer.ObserveDuration() if token.Error() != nil { - log.Error("message sending failure", "protocol", "mqtt", "publisherId", n, "error", token.Error()) + log.Error("message sending failure", "protocol", "MQTT", "publisherId", n, "error", token.Error()) } - log.Debug("message sent", "protocol", "mqtt", "publisherId", n) + log.Debug("message sent", "protocol", "MQTT", "publisherId", n) metrics.MessagesPublished.With(prometheus.Labels{"protocol": "mqtt"}).Inc() utils.WaitBetweenMessages(cfg.Rate) } - log.Debug("publisher stopped", "protocol", "mqtt", "publisherId", n) + log.Debug("publisher stopped", "protocol", "MQTT", "publisherId", n) } func Consumer(cfg config.Config, subscribed chan bool, n int) { @@ -102,14 +69,14 @@ func Consumer(cfg config.Config, subscribed chan bool, n int) { // open connection opts := mqtt.NewClientOptions(). - AddBroker(cfg.MqttUrl). + AddBroker(cfg.ConsumerUri). SetUsername("guest"). SetPassword("guest"). SetClientID(fmt.Sprintf("omq-sub-%d", n)). SetAutoReconnect(true). SetCleanSession(false). SetConnectionLostHandler(func(client mqtt.Client, reason error) { - log.Info("connection lost", "protocol", "mqtt", "consumerId", n) + log.Info("connection lost", "protocol", "MQTT", "consumerId", n) }). SetProtocolVersion(4) @@ -117,7 +84,6 @@ func Consumer(cfg config.Config, subscribed chan bool, n int) { c := mqtt.NewClient(opts) token = c.Connect() token.Wait() - log.Info("consumer started", "protocol", "mqtt", "publisherId", n) topic := fmt.Sprintf("%s-%d", cfg.QueueNamePrefix, ((n-1)%cfg.QueueCount)+1) @@ -130,15 +96,16 @@ func Consumer(cfg config.Config, subscribed chan bool, n int) { payload := msg.Payload() m.Observe(utils.CalculateEndToEndLatency(&payload)) msgsReceived++ - log.Debug("message received", "protocol", "mqtt", "subscriberId", n, "terminus", topic, "size", len(payload)) + log.Debug("message received", "protocol", "MQTT", "subscriberId", n, "terminus", topic, "size", len(payload)) } close(subscribed) token = c.Subscribe(topic, 1, handler) token.Wait() if token.Error() != nil { - log.Error("failed to subscribe", "protocol", "mqtt", "publisherId", n, "error", token.Error()) + log.Error("failed to subscribe", "protocol", "MQTT", "publisherId", n, "error", token.Error()) } + log.Info("consumer started", "protocol", "MQTT", "publisherId", n, "topic", topic) for { time.Sleep(1 * time.Second) @@ -146,5 +113,5 @@ func Consumer(cfg config.Config, subscribed chan bool, n int) { break } } - log.Debug("consumer finished", "protocol", "mqtt", "publisherId", n) + log.Debug("consumer finished", "protocol", "MQTT", "publisherId", n) } diff --git a/pkg/stomp_client/stomp.go b/pkg/stomp_client/stomp.go index 2e90f86..a08110a 100644 --- a/pkg/stomp_client/stomp.go +++ b/pkg/stomp_client/stomp.go @@ -3,7 +3,6 @@ package stomp_client import ( "fmt" "math/rand" - "sync" "time" "github.com/rabbitmq/omq/pkg/config" @@ -21,45 +20,12 @@ var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{ stomp.ConnOpt.Host("/"), } -func Start(cfg config.Config) { - - var wg sync.WaitGroup - - if cfg.Consumers > 0 { - for i := 1; i <= cfg.Consumers; i++ { - subscribed := make(chan bool) - n := i - wg.Add(1) - go func() { - defer wg.Done() - Consumer(cfg, subscribed, n) - }() - - // wait until we know the receiver has subscribed - <-subscribed - } - } - - if cfg.Publishers > 0 { - for i := 1; i <= cfg.Publishers; i++ { - n := i - wg.Add(1) - go func() { - defer wg.Done() - Publisher(cfg, n) - }() - } - } - - wg.Wait() -} - func Publisher(cfg config.Config, n int) { // sleep random interval to avoid all publishers publishing at exactly the same time s := rand.Intn(cfg.Publishers) time.Sleep(time.Duration(s) * time.Millisecond) - conn, err := stomp.Dial("tcp", cfg.StompUrl, options...) + conn, err := stomp.Dial("tcp", cfg.PublisherUri, options...) if err != nil { log.Error("publisher connection failed", "protocol", "STOMP", "publisherId", n, "error", err.Error()) return @@ -93,7 +59,7 @@ func Publisher(cfg config.Config, n int) { } func Consumer(cfg config.Config, subscribed chan bool, n int) { - conn, err := stomp.Dial("tcp", cfg.StompUrl, options...) + conn, err := stomp.Dial("tcp", cfg.ConsumerUri, options...) if err != nil { log.Error("consumer connection failed", "protocol", "STOMP", "consumerId", n, "error", err.Error())