Skip to content

Commit

Permalink
Refactor options, rename --amqp-binding-key to --binding-key
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Nov 27, 2024
1 parent 56908c5 commit 72c7571
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 77 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ declare a queue, use `--queues`; see below for more about topic/queue/routing ke
A more complex example:
```shell
$ omq mqtt-amqp --publishers 10 --publish-to 'sensor/%d' --rate 1 --size 100 \
--consumers 1 --consume-from /queues/sensors --amqp-binding-key 'sensor.#' --queues classic
--consumers 1 --consume-from /queues/sensors --binding-key 'sensor.#' --queues classic
```
will start 10 MQTT publishers, each publishing 1 message a second, with 100 bytes of payload, to the `amq.topic` exchange (default for the MQTT plugin)
with the topic/routing key of `sensor/%d`, where the `%d` is the ID of the publisher (from 1 to 10). It will also start a single AMQP 1.0 consumer that
Expand Down Expand Up @@ -130,7 +130,7 @@ messages published with perf-test can be consumed by `omq` or vice versa, and th
```
--amqp-app-property stringArray AMQP application properties, eg. key1=val1,val2
--amqp-app-property-filter stringArray AMQP application property filters, eg. key1=$p:prefix
--amqp-binding-key string AMQP 1.0 consumer binding key
--binding-key string AMQP 1.0 consumer binding key
--amqp-property-filter stringArray AMQP property filters, eg. subject=foo
--amqp-reject-rate int Rate of messages to reject (0-100%)
--amqp-release-rate int Rate of messages to release without accepting (0-100%)
Expand Down
184 changes: 113 additions & 71 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,41 @@ func RootCmd() *cobra.Command {
cfg = config.NewConfig()

mqttConsumerFlags := pflag.NewFlagSet("mqtt-consumer", pflag.ContinueOnError)
mqttConsumerFlags.IntVar(&cfg.MqttConsumer.Version, "mqtt-consumer-version", 5, "MQTT consumer protocol version (3, 4 or 5; default=5)")
mqttConsumerFlags.IntVar(&cfg.MqttConsumer.QoS, "mqtt-consumer-qos", 0, "MQTT consumer QoS level (0, 1 or 2; default=0)")
mqttConsumerFlags.BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true, "MQTT consumer clean session")

mqttConsumerFlags.IntVar(&cfg.MqttConsumer.Version, "mqtt-consumer-version", 5,
"MQTT consumer protocol version (3, 4 or 5; default=5)")
mqttConsumerFlags.IntVar(&cfg.MqttConsumer.QoS, "mqtt-consumer-qos", 0,
"MQTT consumer QoS level (0, 1 or 2; default=0)")
mqttConsumerFlags.BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true,
"MQTT consumer clean session")

mqttPublisherFlags := pflag.NewFlagSet("mqtt-publisher", pflag.ContinueOnError)
mqttPublisherFlags.IntVar(&cfg.MqttPublisher.Version, "mqtt-publisher-version", 5, "MQTT consumer protocol version (3, 4 or 5; default=5)")
mqttPublisherFlags.IntVar(&cfg.MqttPublisher.QoS, "mqtt-publisher-qos", 0, "MQTT publisher QoS level (0, 1 or 2; default=0)")
mqttPublisherFlags.BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true, "MQTT publisher clean session")
mqttPublisherFlags.IntVar(&cfg.MqttPublisher.Version, "mqtt-publisher-version", 5,
"MQTT consumer protocol version (3, 4 or 5; default=5)")
mqttPublisherFlags.IntVar(&cfg.MqttPublisher.QoS, "mqtt-publisher-qos", 0,
"MQTT publisher QoS level (0, 1 or 2; default=0)")
mqttPublisherFlags.BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true,
"MQTT publisher clean session")

amqpPublisherFlags := pflag.NewFlagSet("amqp-publisher", pflag.ContinueOnError)

amqpPublisherFlags.StringArrayVar(&amqpAppProperties, "amqp-app-property", []string{},
"AMQP application properties, eg. key1=val1,val2")
amqpPublisherFlags.StringSliceVar(&cfg.Amqp.Subjects, "amqp-subject", []string{},
"AMQP 1.0 message subject(s)")
amqpPublisherFlags.BoolVar(&cfg.Amqp.SendSettled, "amqp-send-settled", false,
"Send settled messages (fire and forget)")

amqpConsumerFlags := pflag.NewFlagSet("amqp-consumer", pflag.ContinueOnError)

amqpConsumerFlags.IntVar(&cfg.Amqp.RejectRate, "amqp-reject-rate", 0,
"Rate of messages to reject (0-100%)")
amqpConsumerFlags.IntVar(&cfg.Amqp.ReleaseRate, "amqp-release-rate", 0,
"Rate of messages to release without accepting (0-100%)")
amqpConsumerFlags.StringArrayVar(&amqpAppPropertyFilters, "amqp-app-property-filter", []string{},
"AMQP application property filters, eg. key1=$p:prefix")
amqpConsumerFlags.StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{},
"AMQP property filters, eg. key1=$p:prefix")

amqp_amqp = &cobra.Command{
Use: "amqp-amqp",
Expand All @@ -82,6 +109,8 @@ func RootCmd() *cobra.Command {
start(cfg)
},
}
amqp_amqp.Flags().AddFlagSet(amqpPublisherFlags)
amqp_amqp.Flags().AddFlagSet(amqpConsumerFlags)

amqp_stomp = &cobra.Command{
Use: "amqp-stomp",
Expand All @@ -91,6 +120,7 @@ func RootCmd() *cobra.Command {
start(cfg)
},
}
amqp_stomp.Flags().AddFlagSet(amqpPublisherFlags)

amqp_mqtt = &cobra.Command{
Use: "amqp-mqtt",
Expand All @@ -100,6 +130,7 @@ func RootCmd() *cobra.Command {
start(cfg)
},
}
amqp_mqtt.Flags().AddFlagSet(amqpPublisherFlags)
amqp_mqtt.Flags().AddFlagSet(mqttConsumerFlags)

stomp_stomp = &cobra.Command{
Expand All @@ -120,6 +151,7 @@ func RootCmd() *cobra.Command {
start(cfg)
},
}
stomp_amqp.Flags().AddFlagSet(amqpConsumerFlags)

stomp_mqtt = &cobra.Command{
Use: "stomp-mqtt",
Expand Down Expand Up @@ -152,6 +184,7 @@ func RootCmd() *cobra.Command {
},
}
mqtt_amqp.Flags().AddFlagSet(mqttPublisherFlags)
mqtt_amqp.Flags().AddFlagSet(amqpConsumerFlags)

mqtt_stomp = &cobra.Command{
Use: "mqtt-stomp",
Expand Down Expand Up @@ -186,73 +219,81 @@ func RootCmd() *cobra.Command {
}
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
mgmt.DeleteDeclaredQueues()
metrics.GetMetricsServer().PrintSummary()
if cfg.PrintAllMetrics {
metrics.GetMetricsServer().PrintAll()
}
shutdown(cfg.CleanupQueues, cfg.PrintAllMetrics)
},
}
rootCmd.PersistentFlags().
VarP(enumflag.New(&log.Level, "log-level", log.Levels, enumflag.EnumCaseInsensitive), "log-level", "l", "Log level (debug, info, error)")
rootCmd.PersistentFlags().
BoolVar(&cfg.PrintAllMetrics, "print-all-metrics", false, "Print all metrics before exiting")
rootCmd.PersistentFlags().StringSliceVarP(&cfg.Uri, "uri", "", nil, "URI for both publishers and consumers")
rootCmd.PersistentFlags().StringSliceVarP(&cfg.PublisherUri, "publisher-uri", "", nil, "URI for publishing")
rootCmd.PersistentFlags().StringSliceVarP(&cfg.ConsumerUri, "consumer-uri", "", nil, "URI for consuming")
rootCmd.PersistentFlags().IntVarP(&cfg.Publishers, "publishers", "x", 1, "The number of publishers to start")
rootCmd.PersistentFlags().IntVarP(&cfg.Consumers, "consumers", "y", 1, "The number of consumers to start")
rootCmd.PersistentFlags().
IntVarP(&cfg.PublishCount, "pmessages", "C", math.MaxInt, "The number of messages to send per publisher")
rootCmd.PersistentFlags().
IntVarP(&cfg.ConsumeCount, "cmessages", "D", math.MaxInt, "The number of messages to consume per consumer (default=MaxInt)")
rootCmd.PersistentFlags().
StringVarP(&cfg.PublishTo, "publish-to", "t", "/queues/omq-%d", "The topic/terminus to publish to (%d will be replaced with the publisher's id)")
rootCmd.PersistentFlags().
StringVarP(&cfg.ConsumeFrom, "consume-from", "T", "/queues/omq-%d", "The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)")
rootCmd.PersistentFlags().
VarP(enumflag.New(&cfg.Queues, "queues", config.QueueTypes, enumflag.EnumCaseInsensitive), "queues", "", "Type of queues to declare (or `predeclared` to use existing queues)")
rootCmd.PersistentFlags().
StringVar(&cfg.ConsumerId, "consumer-id", "omq-consumer-%d", "Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random)")
rootCmd.PersistentFlags().
StringVar(&cfg.PublisherId, "publisher-id", "omq-publisher-%d", "Client ID for AMQP and MQTT publishers (%d => consumer's id, %r => random)")
rootCmd.PersistentFlags().
BoolVar(&cfg.CleanupQueues, "cleanup-queues", false, "Delete the queues at the end (only explicitly declared queues, not STOMP subscriptions)")

rootCmd.PersistentFlags().StringSliceVarP(&cfg.Uri, "uri", "", nil,
"URI for both publishers and consumers")
rootCmd.PersistentFlags().StringSliceVarP(&cfg.PublisherUri, "publisher-uri", "", nil,
"URI for publishing")
rootCmd.PersistentFlags().StringSliceVarP(&cfg.ConsumerUri, "consumer-uri", "", nil,
"URI for consuming")
rootCmd.PersistentFlags().BoolVar(&cfg.SpreadConnections, "spread-connections", true,
"Spread connections across URIs")

rootCmd.PersistentFlags().IntVarP(&cfg.Publishers, "publishers", "x", 1,
"The number of publishers to start")
rootCmd.PersistentFlags().IntVarP(&cfg.PublishCount, "pmessages", "C", math.MaxInt,
"The number of messages to send per publisher")
rootCmd.PersistentFlags().StringVarP(&cfg.PublishTo, "publish-to", "t", "/queues/omq-%d",
"The topic/terminus to publish to (%d will be replaced with the publisher's id)")
rootCmd.PersistentFlags().StringVar(&cfg.PublisherId, "publisher-id", "omq-publisher-%d",
"Client ID for AMQP and MQTT publishers (%d => consumer's id, %r => random)")

rootCmd.PersistentFlags().IntVarP(&cfg.Consumers, "consumers", "y", 1,
"The number of consumers to start")
rootCmd.PersistentFlags().IntVarP(&cfg.ConsumeCount, "cmessages", "D", math.MaxInt,
"The number of messages to consume per consumer (default=MaxInt)")
rootCmd.PersistentFlags().StringVarP(&cfg.ConsumeFrom, "consume-from", "T", "/queues/omq-%d",
"The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)")
rootCmd.PersistentFlags().StringVar(&cfg.ConsumerId, "consumer-id", "omq-consumer-%d",
"Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "",
"Stream consumer offset specification (default=next)")
rootCmd.PersistentFlags().Int32Var(&cfg.ConsumerPriority, "consumer-priority", 0, "Consumer priority")
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1,
"AMQP-1.0 consumer credits / STOMP prefetch count")
rootCmd.PersistentFlags().DurationVarP(&cfg.ConsumerLatency, "consumer-latency", "L", 0*time.Second,
"consumer latency (time to accept message)")
rootCmd.PersistentFlags().BoolVar(&cfg.LogOutOfOrder, "log-out-of-order-messages", false,
"Print a log line when a message is received that is older than the previously received message")
rootCmd.PersistentFlags().DurationVar(&cfg.ConsumerStartupDelay, "consumer-startup-delay", 0,
"Delay consumer startup to allow a backlog of messages to build up (eg. 10s)")

rootCmd.PersistentFlags().VarP(enumflag.New(&cfg.Queues, "queues", config.QueueTypes, enumflag.EnumCaseInsensitive), "queues", "",
"Type of queues to declare (or `predeclared` to use existing queues)")
rootCmd.PersistentFlags().BoolVar(&cfg.CleanupQueues, "cleanup-queues", false,
"Delete the queues at the end (omq only deletes the queues it explicitly declared)")
rootCmd.PersistentFlags().VarP(enumflag.New(&cfg.QueueDurability, "queue-durability", config.AmqpDurabilityModes, enumflag.EnumCaseInsensitive), "queue-durability", "",
"Queue durability (default: configuration - the queue definition is durable)")
rootCmd.PersistentFlags().StringVar(&cfg.BindingKey, "binding-key", "",
"Binding key for queue declarations")

// messages
rootCmd.PersistentFlags().IntVarP(&cfg.Size, "size", "s", 12, "Message payload size in bytes")
rootCmd.PersistentFlags().Float32VarP(&cfg.Rate, "rate", "r", -1, "Messages per second (-1 = unlimited)")
rootCmd.PersistentFlags().IntVarP(&cfg.MaxInFlight, "max-in-flight", "c", 1, "Maximum number of in-flight messages per publisher")
rootCmd.PersistentFlags().DurationVarP(&cfg.Duration, "time", "z", 0, "Run duration (eg. 10s, 5m, 2h)")
rootCmd.PersistentFlags().
BoolVarP(&cfg.UseMillis, "use-millis", "m", false, "Use milliseconds for timestamps (automatically enabled when no publishers or no consumers)")
rootCmd.PersistentFlags().
VarP(enumflag.New(&cfg.QueueDurability, "queue-durability", config.AmqpDurabilityModes, enumflag.EnumCaseInsensitive), "queue-durability", "", "Queue durability (default: configuration - the queue definition is durable)")
rootCmd.PersistentFlags().StringSliceVar(&cfg.Amqp.Subjects, "amqp-subject", []string{}, "AMQP 1.0 message subject(s)")
rootCmd.PersistentFlags().StringVar(&cfg.Amqp.BindingKey, "amqp-binding-key", "", "AMQP 1.0 consumer binding key")
rootCmd.PersistentFlags().
BoolVar(&cfg.Amqp.SendSettled, "amqp-send-settled", false, "Send settled messages (fire and forget)")
rootCmd.PersistentFlags().IntVar(&cfg.Amqp.RejectRate, "amqp-reject-rate", 0, "Rate of messages to reject (0-100%)")
rootCmd.PersistentFlags().IntVar(&cfg.Amqp.ReleaseRate, "amqp-release-rate", 0, "Rate of messages to release without accepting (0-100%)")
rootCmd.PersistentFlags().
BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable")
rootCmd.PersistentFlags().BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable")
rootCmd.PersistentFlags().StringVar(&cfg.MessagePriority, "message-priority", "", "Message priority (0-255, default=unset)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)")
rootCmd.PersistentFlags().DurationVar(&cfg.MessageTTL, "message-ttl", 0, "Message TTL (not set by default)")
rootCmd.PersistentFlags().Int32Var(&cfg.ConsumerPriority, "consumer-priority", 0, "Consumer priority")
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValues, "stream-filter-values", "", "Stream consumer filter")
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValueSet, "stream-filter-value-set", "", "Stream filter value for publisher")
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count")
rootCmd.PersistentFlags().DurationVarP(&cfg.ConsumerLatency, "consumer-latency", "L", 0*time.Second, "consumer latency (time to accept message)")
rootCmd.PersistentFlags().StringSliceVar(&metricTags, "metric-tags", []string{}, "Prometheus label-value pairs, eg. l1=v1,l2=v2")
rootCmd.PersistentFlags().
BoolVar(&cfg.LogOutOfOrder, "log-out-of-order-messages", false, "Print a log line when a message is received that is older than the previously received message")
rootCmd.PersistentFlags().
BoolVar(&cfg.SpreadConnections, "spread-connections", true, "Spread connections across URIs")
rootCmd.PersistentFlags().DurationVar(&cfg.ConsumerStartupDelay, "consumer-startup-delay", 0, "Delay consumer startup to allow a backlog of messages to build up (eg. 10s)")
rootCmd.PersistentFlags().StringArrayVar(&amqpAppProperties, "amqp-app-property", []string{}, "AMQP application properties, eg. key1=val1,val2")
rootCmd.PersistentFlags().StringArrayVar(&amqpAppPropertyFilters, "amqp-app-property-filter", []string{}, "AMQP application property filters, eg. key1=$p:prefix")
rootCmd.PersistentFlags().StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{}, "AMQP property filters, eg. key1=$p:prefix")
rootCmd.PersistentFlags().IntVar(&cfg.ExpectedInstances, "expected-instances", 1, "The number of instances to synchronize")
rootCmd.PersistentFlags().StringVar(&cfg.SyncName, "expected-instances-endpoint", "", "The DNS name that will return members to synchronize with")

rootCmd.PersistentFlags().StringSliceVar(&metricTags, "metric-tags", []string{},
"Prometheus label-value pairs, eg. l1=v1,l2=v2")
rootCmd.PersistentFlags().VarP(enumflag.New(&log.Level, "log-level", log.Levels, enumflag.EnumCaseInsensitive), "log-level", "l",
"Log level (debug, info, error)")
rootCmd.PersistentFlags().BoolVar(&cfg.PrintAllMetrics, "print-all-metrics", false,
"Print all metrics before exiting")
rootCmd.PersistentFlags().BoolVarP(&cfg.UseMillis, "use-millis", "m", false,
"Use milliseconds for timestamps (automatically enabled when no publishers or no consumers)")
rootCmd.PersistentFlags().DurationVarP(&cfg.Duration, "time", "z", 0,
"Run duration (eg. 10s, 5m, 2h)")

// instance synchronization
rootCmd.PersistentFlags().IntVar(&cfg.ExpectedInstances, "expected-instances", 1,
"The number of instances to synchronize")
rootCmd.PersistentFlags().StringVar(&cfg.SyncName, "expected-instances-endpoint", "",
"The DNS name that will return members to synchronize with")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
Expand Down Expand Up @@ -287,15 +328,16 @@ func start(cfg config.Config) {
defer cancel()

// handle ^C
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
select {
case <-c:
cancel()
println("Received SIGTERM, shutting down...")
time.Sleep(500 * time.Millisecond)
shutdown(cfg.CleanupQueues, cfg.PrintAllMetrics)
log.Print("Received SIGTERM, shutting down...")
// PersistentPostRun does all the cleanup
// this is just just a backup mechanism
time.Sleep(30 * time.Second)
os.Exit(0)
case <-ctx.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ var _ = Describe("OMQ CLI", func() {
"--pmessages=5",
"--publish-to=sensor/%d",
"--consume-from=/queues/sensors",
"--amqp-binding-key=sensor.#",
"--binding-key=sensor.#",
"--queues=classic",
"--cleanup-queues=true",
"--time=2s",
Expand Down
1 change: 1 addition & 0 deletions pkg/amqp10_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (p *Amqp10Publisher) Connect() {
}

func (p *Amqp10Publisher) CreateSender() {
// TODO do we need this?
var durability amqp.Durability
switch p.Config.QueueDurability {
case config.None:
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type AmqpOptions struct {
SendSettled bool
ReleaseRate int
RejectRate int
BindingKey string
PropertyFilters map[string]string
AppProperties map[string][]string
AppPropertyFilters map[string]string
Expand Down Expand Up @@ -80,6 +79,7 @@ type Config struct {
PublishTo string
ConsumeFrom string
Queues QueueType
BindingKey string
CleanupQueues bool
ConsumerCredits int
ConsumerLatency time.Duration
Expand Down
4 changes: 2 additions & 2 deletions pkg/mgmt/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func DeclareAndBind(cfg config.Config, queueName string, id int) rmq.IQueueInfo
}

// explicitly set routing key overrides everything else
if cfg.Amqp.BindingKey != "" {
routingKey = utils.InjectId(cfg.Amqp.BindingKey, id)
if cfg.BindingKey != "" {
routingKey = utils.InjectId(cfg.BindingKey, id)
}

if exchangeName != "amq.default" {
Expand Down

0 comments on commit 72c7571

Please sign in to comment.