From 72c757169d4b5f257364a1e984f3f3e8a577d4c0 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Wed, 27 Nov 2024 11:27:43 +0100 Subject: [PATCH] Refactor options, rename --amqp-binding-key to --binding-key --- README.md | 4 +- cmd/root.go | 184 ++++++++++++++++++++------------- main_test.go | 2 +- pkg/amqp10_client/publisher.go | 1 + pkg/config/config.go | 2 +- pkg/mgmt/mgmt.go | 4 +- 6 files changed, 120 insertions(+), 77 deletions(-) diff --git a/README.md b/README.md index 550cc69..b082116 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ declare a queue, use `--queues`; see below for more about topic/queue/routing ke A more complex example: ```shell $ omq mqtt-amqp --publishers 10 --publish-to 'sensor/%d' --rate 1 --size 100 \ - --consumers 1 --consume-from /queues/sensors --amqp-binding-key 'sensor.#' --queues classic + --consumers 1 --consume-from /queues/sensors --binding-key 'sensor.#' --queues classic ``` will start 10 MQTT publishers, each publishing 1 message a second, with 100 bytes of payload, to the `amq.topic` exchange (default for the MQTT plugin) with the topic/routing key of `sensor/%d`, where the `%d` is the ID of the publisher (from 1 to 10). It will also start a single AMQP 1.0 consumer that @@ -130,7 +130,7 @@ messages published with perf-test can be consumed by `omq` or vice versa, and th ``` --amqp-app-property stringArray AMQP application properties, eg. key1=val1,val2 --amqp-app-property-filter stringArray AMQP application property filters, eg. key1=$p:prefix - --amqp-binding-key string AMQP 1.0 consumer binding key + --binding-key string AMQP 1.0 consumer binding key --amqp-property-filter stringArray AMQP property filters, eg. subject=foo --amqp-reject-rate int Rate of messages to reject (0-100%) --amqp-release-rate int Rate of messages to release without accepting (0-100%) diff --git a/cmd/root.go b/cmd/root.go index 16628b4..7d958cf 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -64,14 +64,41 @@ func RootCmd() *cobra.Command { cfg = config.NewConfig() mqttConsumerFlags := pflag.NewFlagSet("mqtt-consumer", pflag.ContinueOnError) - mqttConsumerFlags.IntVar(&cfg.MqttConsumer.Version, "mqtt-consumer-version", 5, "MQTT consumer protocol version (3, 4 or 5; default=5)") - mqttConsumerFlags.IntVar(&cfg.MqttConsumer.QoS, "mqtt-consumer-qos", 0, "MQTT consumer QoS level (0, 1 or 2; default=0)") - mqttConsumerFlags.BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true, "MQTT consumer clean session") + + mqttConsumerFlags.IntVar(&cfg.MqttConsumer.Version, "mqtt-consumer-version", 5, + "MQTT consumer protocol version (3, 4 or 5; default=5)") + mqttConsumerFlags.IntVar(&cfg.MqttConsumer.QoS, "mqtt-consumer-qos", 0, + "MQTT consumer QoS level (0, 1 or 2; default=0)") + mqttConsumerFlags.BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true, + "MQTT consumer clean session") mqttPublisherFlags := pflag.NewFlagSet("mqtt-publisher", pflag.ContinueOnError) - mqttPublisherFlags.IntVar(&cfg.MqttPublisher.Version, "mqtt-publisher-version", 5, "MQTT consumer protocol version (3, 4 or 5; default=5)") - mqttPublisherFlags.IntVar(&cfg.MqttPublisher.QoS, "mqtt-publisher-qos", 0, "MQTT publisher QoS level (0, 1 or 2; default=0)") - mqttPublisherFlags.BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true, "MQTT publisher clean session") + mqttPublisherFlags.IntVar(&cfg.MqttPublisher.Version, "mqtt-publisher-version", 5, + "MQTT consumer protocol version (3, 4 or 5; default=5)") + mqttPublisherFlags.IntVar(&cfg.MqttPublisher.QoS, "mqtt-publisher-qos", 0, + "MQTT publisher QoS level (0, 1 or 2; default=0)") + mqttPublisherFlags.BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true, + "MQTT publisher clean session") + + amqpPublisherFlags := pflag.NewFlagSet("amqp-publisher", pflag.ContinueOnError) + + amqpPublisherFlags.StringArrayVar(&amqpAppProperties, "amqp-app-property", []string{}, + "AMQP application properties, eg. key1=val1,val2") + amqpPublisherFlags.StringSliceVar(&cfg.Amqp.Subjects, "amqp-subject", []string{}, + "AMQP 1.0 message subject(s)") + amqpPublisherFlags.BoolVar(&cfg.Amqp.SendSettled, "amqp-send-settled", false, + "Send settled messages (fire and forget)") + + amqpConsumerFlags := pflag.NewFlagSet("amqp-consumer", pflag.ContinueOnError) + + amqpConsumerFlags.IntVar(&cfg.Amqp.RejectRate, "amqp-reject-rate", 0, + "Rate of messages to reject (0-100%)") + amqpConsumerFlags.IntVar(&cfg.Amqp.ReleaseRate, "amqp-release-rate", 0, + "Rate of messages to release without accepting (0-100%)") + amqpConsumerFlags.StringArrayVar(&amqpAppPropertyFilters, "amqp-app-property-filter", []string{}, + "AMQP application property filters, eg. key1=$p:prefix") + amqpConsumerFlags.StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{}, + "AMQP property filters, eg. key1=$p:prefix") amqp_amqp = &cobra.Command{ Use: "amqp-amqp", @@ -82,6 +109,8 @@ func RootCmd() *cobra.Command { start(cfg) }, } + amqp_amqp.Flags().AddFlagSet(amqpPublisherFlags) + amqp_amqp.Flags().AddFlagSet(amqpConsumerFlags) amqp_stomp = &cobra.Command{ Use: "amqp-stomp", @@ -91,6 +120,7 @@ func RootCmd() *cobra.Command { start(cfg) }, } + amqp_stomp.Flags().AddFlagSet(amqpPublisherFlags) amqp_mqtt = &cobra.Command{ Use: "amqp-mqtt", @@ -100,6 +130,7 @@ func RootCmd() *cobra.Command { start(cfg) }, } + amqp_mqtt.Flags().AddFlagSet(amqpPublisherFlags) amqp_mqtt.Flags().AddFlagSet(mqttConsumerFlags) stomp_stomp = &cobra.Command{ @@ -120,6 +151,7 @@ func RootCmd() *cobra.Command { start(cfg) }, } + stomp_amqp.Flags().AddFlagSet(amqpConsumerFlags) stomp_mqtt = &cobra.Command{ Use: "stomp-mqtt", @@ -152,6 +184,7 @@ func RootCmd() *cobra.Command { }, } mqtt_amqp.Flags().AddFlagSet(mqttPublisherFlags) + mqtt_amqp.Flags().AddFlagSet(amqpConsumerFlags) mqtt_stomp = &cobra.Command{ Use: "mqtt-stomp", @@ -186,73 +219,81 @@ func RootCmd() *cobra.Command { } }, PersistentPostRun: func(cmd *cobra.Command, args []string) { - mgmt.DeleteDeclaredQueues() - metrics.GetMetricsServer().PrintSummary() - if cfg.PrintAllMetrics { - metrics.GetMetricsServer().PrintAll() - } + shutdown(cfg.CleanupQueues, cfg.PrintAllMetrics) }, } - rootCmd.PersistentFlags(). - VarP(enumflag.New(&log.Level, "log-level", log.Levels, enumflag.EnumCaseInsensitive), "log-level", "l", "Log level (debug, info, error)") - rootCmd.PersistentFlags(). - BoolVar(&cfg.PrintAllMetrics, "print-all-metrics", false, "Print all metrics before exiting") - rootCmd.PersistentFlags().StringSliceVarP(&cfg.Uri, "uri", "", nil, "URI for both publishers and consumers") - rootCmd.PersistentFlags().StringSliceVarP(&cfg.PublisherUri, "publisher-uri", "", nil, "URI for publishing") - rootCmd.PersistentFlags().StringSliceVarP(&cfg.ConsumerUri, "consumer-uri", "", nil, "URI for consuming") - rootCmd.PersistentFlags().IntVarP(&cfg.Publishers, "publishers", "x", 1, "The number of publishers to start") - rootCmd.PersistentFlags().IntVarP(&cfg.Consumers, "consumers", "y", 1, "The number of consumers to start") - rootCmd.PersistentFlags(). - IntVarP(&cfg.PublishCount, "pmessages", "C", math.MaxInt, "The number of messages to send per publisher") - rootCmd.PersistentFlags(). - IntVarP(&cfg.ConsumeCount, "cmessages", "D", math.MaxInt, "The number of messages to consume per consumer (default=MaxInt)") - rootCmd.PersistentFlags(). - StringVarP(&cfg.PublishTo, "publish-to", "t", "/queues/omq-%d", "The topic/terminus to publish to (%d will be replaced with the publisher's id)") - rootCmd.PersistentFlags(). - StringVarP(&cfg.ConsumeFrom, "consume-from", "T", "/queues/omq-%d", "The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)") - rootCmd.PersistentFlags(). - VarP(enumflag.New(&cfg.Queues, "queues", config.QueueTypes, enumflag.EnumCaseInsensitive), "queues", "", "Type of queues to declare (or `predeclared` to use existing queues)") - rootCmd.PersistentFlags(). - StringVar(&cfg.ConsumerId, "consumer-id", "omq-consumer-%d", "Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random)") - rootCmd.PersistentFlags(). - StringVar(&cfg.PublisherId, "publisher-id", "omq-publisher-%d", "Client ID for AMQP and MQTT publishers (%d => consumer's id, %r => random)") - rootCmd.PersistentFlags(). - BoolVar(&cfg.CleanupQueues, "cleanup-queues", false, "Delete the queues at the end (only explicitly declared queues, not STOMP subscriptions)") + + rootCmd.PersistentFlags().StringSliceVarP(&cfg.Uri, "uri", "", nil, + "URI for both publishers and consumers") + rootCmd.PersistentFlags().StringSliceVarP(&cfg.PublisherUri, "publisher-uri", "", nil, + "URI for publishing") + rootCmd.PersistentFlags().StringSliceVarP(&cfg.ConsumerUri, "consumer-uri", "", nil, + "URI for consuming") + rootCmd.PersistentFlags().BoolVar(&cfg.SpreadConnections, "spread-connections", true, + "Spread connections across URIs") + + rootCmd.PersistentFlags().IntVarP(&cfg.Publishers, "publishers", "x", 1, + "The number of publishers to start") + rootCmd.PersistentFlags().IntVarP(&cfg.PublishCount, "pmessages", "C", math.MaxInt, + "The number of messages to send per publisher") + rootCmd.PersistentFlags().StringVarP(&cfg.PublishTo, "publish-to", "t", "/queues/omq-%d", + "The topic/terminus to publish to (%d will be replaced with the publisher's id)") + rootCmd.PersistentFlags().StringVar(&cfg.PublisherId, "publisher-id", "omq-publisher-%d", + "Client ID for AMQP and MQTT publishers (%d => consumer's id, %r => random)") + + rootCmd.PersistentFlags().IntVarP(&cfg.Consumers, "consumers", "y", 1, + "The number of consumers to start") + rootCmd.PersistentFlags().IntVarP(&cfg.ConsumeCount, "cmessages", "D", math.MaxInt, + "The number of messages to consume per consumer (default=MaxInt)") + rootCmd.PersistentFlags().StringVarP(&cfg.ConsumeFrom, "consume-from", "T", "/queues/omq-%d", + "The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)") + rootCmd.PersistentFlags().StringVar(&cfg.ConsumerId, "consumer-id", "omq-consumer-%d", + "Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random)") + rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", + "Stream consumer offset specification (default=next)") + rootCmd.PersistentFlags().Int32Var(&cfg.ConsumerPriority, "consumer-priority", 0, "Consumer priority") + rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, + "AMQP-1.0 consumer credits / STOMP prefetch count") + rootCmd.PersistentFlags().DurationVarP(&cfg.ConsumerLatency, "consumer-latency", "L", 0*time.Second, + "consumer latency (time to accept message)") + rootCmd.PersistentFlags().BoolVar(&cfg.LogOutOfOrder, "log-out-of-order-messages", false, + "Print a log line when a message is received that is older than the previously received message") + rootCmd.PersistentFlags().DurationVar(&cfg.ConsumerStartupDelay, "consumer-startup-delay", 0, + "Delay consumer startup to allow a backlog of messages to build up (eg. 10s)") + + rootCmd.PersistentFlags().VarP(enumflag.New(&cfg.Queues, "queues", config.QueueTypes, enumflag.EnumCaseInsensitive), "queues", "", + "Type of queues to declare (or `predeclared` to use existing queues)") + rootCmd.PersistentFlags().BoolVar(&cfg.CleanupQueues, "cleanup-queues", false, + "Delete the queues at the end (omq only deletes the queues it explicitly declared)") + rootCmd.PersistentFlags().VarP(enumflag.New(&cfg.QueueDurability, "queue-durability", config.AmqpDurabilityModes, enumflag.EnumCaseInsensitive), "queue-durability", "", + "Queue durability (default: configuration - the queue definition is durable)") + rootCmd.PersistentFlags().StringVar(&cfg.BindingKey, "binding-key", "", + "Binding key for queue declarations") + + // messages rootCmd.PersistentFlags().IntVarP(&cfg.Size, "size", "s", 12, "Message payload size in bytes") rootCmd.PersistentFlags().Float32VarP(&cfg.Rate, "rate", "r", -1, "Messages per second (-1 = unlimited)") rootCmd.PersistentFlags().IntVarP(&cfg.MaxInFlight, "max-in-flight", "c", 1, "Maximum number of in-flight messages per publisher") - rootCmd.PersistentFlags().DurationVarP(&cfg.Duration, "time", "z", 0, "Run duration (eg. 10s, 5m, 2h)") - rootCmd.PersistentFlags(). - BoolVarP(&cfg.UseMillis, "use-millis", "m", false, "Use milliseconds for timestamps (automatically enabled when no publishers or no consumers)") - rootCmd.PersistentFlags(). - VarP(enumflag.New(&cfg.QueueDurability, "queue-durability", config.AmqpDurabilityModes, enumflag.EnumCaseInsensitive), "queue-durability", "", "Queue durability (default: configuration - the queue definition is durable)") - rootCmd.PersistentFlags().StringSliceVar(&cfg.Amqp.Subjects, "amqp-subject", []string{}, "AMQP 1.0 message subject(s)") - rootCmd.PersistentFlags().StringVar(&cfg.Amqp.BindingKey, "amqp-binding-key", "", "AMQP 1.0 consumer binding key") - rootCmd.PersistentFlags(). - BoolVar(&cfg.Amqp.SendSettled, "amqp-send-settled", false, "Send settled messages (fire and forget)") - rootCmd.PersistentFlags().IntVar(&cfg.Amqp.RejectRate, "amqp-reject-rate", 0, "Rate of messages to reject (0-100%)") - rootCmd.PersistentFlags().IntVar(&cfg.Amqp.ReleaseRate, "amqp-release-rate", 0, "Rate of messages to release without accepting (0-100%)") - rootCmd.PersistentFlags(). - BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable") + rootCmd.PersistentFlags().BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable") rootCmd.PersistentFlags().StringVar(&cfg.MessagePriority, "message-priority", "", "Message priority (0-255, default=unset)") - rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)") rootCmd.PersistentFlags().DurationVar(&cfg.MessageTTL, "message-ttl", 0, "Message TTL (not set by default)") - rootCmd.PersistentFlags().Int32Var(&cfg.ConsumerPriority, "consumer-priority", 0, "Consumer priority") - rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValues, "stream-filter-values", "", "Stream consumer filter") - rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValueSet, "stream-filter-value-set", "", "Stream filter value for publisher") - rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count") - rootCmd.PersistentFlags().DurationVarP(&cfg.ConsumerLatency, "consumer-latency", "L", 0*time.Second, "consumer latency (time to accept message)") - rootCmd.PersistentFlags().StringSliceVar(&metricTags, "metric-tags", []string{}, "Prometheus label-value pairs, eg. l1=v1,l2=v2") - rootCmd.PersistentFlags(). - BoolVar(&cfg.LogOutOfOrder, "log-out-of-order-messages", false, "Print a log line when a message is received that is older than the previously received message") - rootCmd.PersistentFlags(). - BoolVar(&cfg.SpreadConnections, "spread-connections", true, "Spread connections across URIs") - rootCmd.PersistentFlags().DurationVar(&cfg.ConsumerStartupDelay, "consumer-startup-delay", 0, "Delay consumer startup to allow a backlog of messages to build up (eg. 10s)") - rootCmd.PersistentFlags().StringArrayVar(&amqpAppProperties, "amqp-app-property", []string{}, "AMQP application properties, eg. key1=val1,val2") - rootCmd.PersistentFlags().StringArrayVar(&amqpAppPropertyFilters, "amqp-app-property-filter", []string{}, "AMQP application property filters, eg. key1=$p:prefix") - rootCmd.PersistentFlags().StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{}, "AMQP property filters, eg. key1=$p:prefix") - rootCmd.PersistentFlags().IntVar(&cfg.ExpectedInstances, "expected-instances", 1, "The number of instances to synchronize") - rootCmd.PersistentFlags().StringVar(&cfg.SyncName, "expected-instances-endpoint", "", "The DNS name that will return members to synchronize with") + + rootCmd.PersistentFlags().StringSliceVar(&metricTags, "metric-tags", []string{}, + "Prometheus label-value pairs, eg. l1=v1,l2=v2") + rootCmd.PersistentFlags().VarP(enumflag.New(&log.Level, "log-level", log.Levels, enumflag.EnumCaseInsensitive), "log-level", "l", + "Log level (debug, info, error)") + rootCmd.PersistentFlags().BoolVar(&cfg.PrintAllMetrics, "print-all-metrics", false, + "Print all metrics before exiting") + rootCmd.PersistentFlags().BoolVarP(&cfg.UseMillis, "use-millis", "m", false, + "Use milliseconds for timestamps (automatically enabled when no publishers or no consumers)") + rootCmd.PersistentFlags().DurationVarP(&cfg.Duration, "time", "z", 0, + "Run duration (eg. 10s, 5m, 2h)") + + // instance synchronization + rootCmd.PersistentFlags().IntVar(&cfg.ExpectedInstances, "expected-instances", 1, + "The number of instances to synchronize") + rootCmd.PersistentFlags().StringVar(&cfg.SyncName, "expected-instances-endpoint", "", + "The DNS name that will return members to synchronize with") rootCmd.AddCommand(amqp_amqp) rootCmd.AddCommand(amqp_stomp) @@ -287,15 +328,16 @@ func start(cfg config.Config) { defer cancel() // handle ^C - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) select { case <-c: cancel() - println("Received SIGTERM, shutting down...") - time.Sleep(500 * time.Millisecond) - shutdown(cfg.CleanupQueues, cfg.PrintAllMetrics) + log.Print("Received SIGTERM, shutting down...") + // PersistentPostRun does all the cleanup + // this is just just a backup mechanism + time.Sleep(30 * time.Second) os.Exit(0) case <-ctx.Done(): return diff --git a/main_test.go b/main_test.go index 7438f10..29c3fbd 100644 --- a/main_test.go +++ b/main_test.go @@ -142,7 +142,7 @@ var _ = Describe("OMQ CLI", func() { "--pmessages=5", "--publish-to=sensor/%d", "--consume-from=/queues/sensors", - "--amqp-binding-key=sensor.#", + "--binding-key=sensor.#", "--queues=classic", "--cleanup-queues=true", "--time=2s", diff --git a/pkg/amqp10_client/publisher.go b/pkg/amqp10_client/publisher.go index 55fba33..d883305 100644 --- a/pkg/amqp10_client/publisher.go +++ b/pkg/amqp10_client/publisher.go @@ -103,6 +103,7 @@ func (p *Amqp10Publisher) Connect() { } func (p *Amqp10Publisher) CreateSender() { + // TODO do we need this? var durability amqp.Durability switch p.Config.QueueDurability { case config.None: diff --git a/pkg/config/config.go b/pkg/config/config.go index ff0d0fd..83a26c5 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -50,7 +50,6 @@ type AmqpOptions struct { SendSettled bool ReleaseRate int RejectRate int - BindingKey string PropertyFilters map[string]string AppProperties map[string][]string AppPropertyFilters map[string]string @@ -80,6 +79,7 @@ type Config struct { PublishTo string ConsumeFrom string Queues QueueType + BindingKey string CleanupQueues bool ConsumerCredits int ConsumerLatency time.Duration diff --git a/pkg/mgmt/mgmt.go b/pkg/mgmt/mgmt.go index 568e000..e1c1a34 100644 --- a/pkg/mgmt/mgmt.go +++ b/pkg/mgmt/mgmt.go @@ -121,8 +121,8 @@ func DeclareAndBind(cfg config.Config, queueName string, id int) rmq.IQueueInfo } // explicitly set routing key overrides everything else - if cfg.Amqp.BindingKey != "" { - routingKey = utils.InjectId(cfg.Amqp.BindingKey, id) + if cfg.BindingKey != "" { + routingKey = utils.InjectId(cfg.BindingKey, id) } if exchangeName != "amq.default" {