Skip to content

Commit

Permalink
Add cross-protocol commands
Browse files Browse the repository at this point in the history
You can now publish and consume through different protocols.
It's also possible to publish and conusme through the same protocol,
but with different URIs (eg. after setting up federation, publish
to the upstream but conusme downstream to exercise federation)
  • Loading branch information
mkuratczyk committed Sep 25, 2023
1 parent 8ce7d44 commit 139a1f0
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 145 deletions.
156 changes: 130 additions & 26 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"math"
"os"
"strings"
"sync"

"github.com/rabbitmq/omq/pkg/amqp10_client"
"github.com/rabbitmq/omq/pkg/config"
Expand All @@ -14,10 +16,16 @@ import (
)

var (
amqp = &cobra.Command{}
stomp = &cobra.Command{}
mqtt = &cobra.Command{}
rootCmd = &cobra.Command{}
amqp_amqp = &cobra.Command{}
amqp_stomp = &cobra.Command{}
amqp_mqtt = &cobra.Command{}
stomp_stomp = &cobra.Command{}
stomp_amqp = &cobra.Command{}
stomp_mqtt = &cobra.Command{}
mqtt_mqtt = &cobra.Command{}
mqtt_amqp = &cobra.Command{}
mqtt_stomp = &cobra.Command{}
rootCmd = &cobra.Command{}
)

func Execute() {
Expand All @@ -31,49 +39,83 @@ func Execute() {
func RootCmd() *cobra.Command {
var cfg config.Config

amqp = &cobra.Command{
amqp_amqp = &cobra.Command{
Use: "amqp-amqp",
Aliases: []string{"amqp"},
Run: func(cmd *cobra.Command, args []string) {
amqp10_client.Start(cfg)
start(cfg, amqp10_client.Publisher, amqp10_client.Consumer)
},
PreRun: func(cmd *cobra.Command, args []string) {
if cfg.Size < 12 {
_, _ = fmt.Fprintf(os.Stderr, "ERROR: size can't be less than 12 bytes\n")
os.Exit(1)
}
}

amqp_stomp = &cobra.Command{
Use: "amqp-stomp",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, amqp10_client.Publisher, stomp_client.Consumer)
},
}

amqp_mqtt = &cobra.Command{
Use: "amqp-mqtt",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, amqp10_client.Publisher, mqtt_client.Consumer)
},
}

stomp = &cobra.Command{
stomp_stomp = &cobra.Command{
Use: "stomp-stomp",
Aliases: []string{"stomp"},
Run: func(cmd *cobra.Command, args []string) {
stomp_client.Start(cfg)
start(cfg, stomp_client.Publisher, stomp_client.Consumer)
},
PreRun: func(cmd *cobra.Command, args []string) {
if cfg.Size < 12 {
_, _ = fmt.Fprintf(os.Stderr, "ERROR: size can't be less than 12 bytes\n")
os.Exit(1)
}
}

stomp_amqp = &cobra.Command{
Use: "stomp-amqp",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, stomp_client.Publisher, amqp10_client.Consumer)
},
}

mqtt = &cobra.Command{
stomp_mqtt = &cobra.Command{
Use: "stomp-mqtt",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, stomp_client.Publisher, mqtt_client.Consumer)
},
}

mqtt_mqtt = &cobra.Command{
Use: "mqtt-mqtt",
Aliases: []string{"mqtt"},
Run: func(cmd *cobra.Command, args []string) {
mqtt_client.Start(cfg)
start(cfg, mqtt_client.Publisher, mqtt_client.Consumer)
},
}

mqtt_amqp = &cobra.Command{
Use: "mqtt-amqp",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, mqtt_client.Publisher, amqp10_client.Consumer)
},
}

mqtt_stomp = &cobra.Command{
Use: "mqtt-stomp",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, mqtt_client.Publisher, stomp_client.Consumer)
},
PreRun: func(cmd *cobra.Command, args []string) {
}

var rootCmd = &cobra.Command{Use: "omq",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if cfg.Size < 12 {
_, _ = fmt.Fprintf(os.Stderr, "ERROR: size can't be less than 12 bytes\n")
os.Exit(1)
}
setUris(&cfg, cmd.Use)
},
}

var rootCmd = &cobra.Command{Use: "omq"}
rootCmd.PersistentFlags().StringVarP(&cfg.PublisherUri, "publisher-uri", "", "", "URI for publishing")
rootCmd.PersistentFlags().StringVarP(&cfg.ConsumerUri, "consumer-uri", "", "", "URI for consuming")
rootCmd.PersistentFlags().IntVarP(&cfg.Publishers, "publishers", "x", 1, "The number of publishers to start")
rootCmd.PersistentFlags().IntVarP(&cfg.Consumers, "consumers", "y", 1, "The number of consumers to start")
rootCmd.PersistentFlags().IntVarP(&cfg.PublishCount, "pmessages", "C", math.MaxInt, "The number of messages to send per publisher (default=MaxInt)")
Expand All @@ -85,9 +127,71 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().DurationVarP(&cfg.Duration, "duration", "z", 0, "Duration (eg. 10s, 5m, 2h)")
rootCmd.PersistentFlags().BoolVarP(&cfg.UseMillis, "use-millis", "m", false, "Use milliseconds for timestamps")

rootCmd.AddCommand(amqp)
rootCmd.AddCommand(stomp)
rootCmd.AddCommand(mqtt)
rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
rootCmd.AddCommand(amqp_mqtt)
rootCmd.AddCommand(stomp_stomp)
rootCmd.AddCommand(stomp_amqp)
rootCmd.AddCommand(stomp_mqtt)
rootCmd.AddCommand(mqtt_mqtt)
rootCmd.AddCommand(mqtt_amqp)
rootCmd.AddCommand(mqtt_stomp)

return rootCmd
}

func start(cfg config.Config, publisherFunc func(config.Config, int), consumerFunc func(config.Config, chan bool, int)) {
var wg sync.WaitGroup

if cfg.Consumers > 0 {
for i := 1; i <= cfg.Consumers; i++ {
subscribed := make(chan bool)
n := i
wg.Add(1)
go func() {
defer wg.Done()
consumerFunc(cfg, subscribed, n)
}()

// wait until we know the receiver has subscribed
<-subscribed
}
}

if cfg.Publishers > 0 {
for i := 1; i <= cfg.Publishers; i++ {
n := i
wg.Add(1)
go func() {
defer wg.Done()
publisherFunc(cfg, n)
}()
}
}

wg.Wait()
}

func setUris(cfg *config.Config, command string) {
if cfg.PublisherUri == "" {
println("setting publisher uri to ", defaultUri(strings.Split(command, "-")[0]))
(*cfg).PublisherUri = defaultUri(strings.Split(command, "-")[0])
}
if cfg.ConsumerUri == "" {
println("setting consumer uri to ", defaultUri(strings.Split(command, "-")[1]))
(*cfg).ConsumerUri = defaultUri(strings.Split(command, "-")[1])
}
}

func defaultUri(proto string) string {
var uri = "localhost"
switch proto {
case "amqp":
uri = "amqp://localhost"
case "stomp":
uri = "localhost:61613"
case "mqtt":
uri = "localhost:1883"
}
return uri
}
38 changes: 3 additions & 35 deletions pkg/amqp10_client/amqp10.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/rabbitmq/omq/pkg/config"
Expand All @@ -17,45 +16,13 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

func Start(cfg config.Config) {
var wg sync.WaitGroup

if cfg.Consumers > 0 {
for i := 1; i <= cfg.Consumers; i++ {
subscribed := make(chan bool)
n := i
wg.Add(1)
go func() {
defer wg.Done()
Consumer(cfg, subscribed, n)
}()

// wait until we know the receiver has subscribed
<-subscribed
}
}

if cfg.Publishers > 0 {
for i := 1; i <= cfg.Publishers; i++ {
n := i
wg.Add(1)
go func() {
defer wg.Done()
Publisher(cfg, n)
}()
}
}

wg.Wait()
}

func Publisher(cfg config.Config, n int) {
// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(cfg.Publishers)
time.Sleep(time.Duration(s) * time.Millisecond)

// open connection
conn, err := amqp.Dial(context.TODO(), cfg.AmqpUrl, nil)
conn, err := amqp.Dial(context.TODO(), cfg.PublisherUri, nil)
if err != nil {
log.Error("publisher connection failed", "protocol", "amqp-1.0", "publisherId", n, "error", err.Error())
return
Expand Down Expand Up @@ -92,6 +59,7 @@ func Publisher(cfg config.Config, n int) {
return
}
metrics.MessagesPublished.With(prometheus.Labels{"protocol": "amqp-1.0"}).Inc()
log.Error("message sent", "protocol", "amqp-1.0", "publisherId", n)
utils.WaitBetweenMessages(cfg.Rate)
}

Expand All @@ -100,7 +68,7 @@ func Publisher(cfg config.Config, n int) {

func Consumer(cfg config.Config, subscribed chan bool, n int) {
// open connection
conn, err := amqp.Dial(context.TODO(), cfg.AmqpUrl, nil)
conn, err := amqp.Dial(context.TODO(), cfg.ConsumerUri, nil)
if err != nil {
log.Error("consumer failed to connect", "protocol", "amqp-1.0", "consumerId", n, "error", err.Error())
return
Expand Down
5 changes: 2 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package config
import "time"

type Config struct {
AmqpUrl string
StompUrl string
MqttUrl string
PublisherUri string
ConsumerUri string
Publishers int
Consumers int
PublishCount int
Expand Down
Loading

0 comments on commit 139a1f0

Please sign in to comment.