Skip to content

Commit

Permalink
Add --amqp-subject option
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Oct 3, 2023
1 parent fb18dfe commit 28f0d27
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 16 deletions.
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().DurationVarP(&cfg.Duration, "duration", "z", 0, "Duration (eg. 10s, 5m, 2h)")
rootCmd.PersistentFlags().BoolVarP(&cfg.UseMillis, "use-millis", "m", false, "Use milliseconds for timestamps")
rootCmd.PersistentFlags().VarP(enumflag.New(&cfg.QueueDurability, "queue-durability", config.AmqpDurabilityModes, enumflag.EnumCaseInsensitive), "queue-durability", "", "Queue durability (default: configuration - the queue definition is durable)")
rootCmd.PersistentFlags().StringVar(&cfg.Amqp.Subject, "amqp-subject", "", "AMQP 1.0 message subject")
rootCmd.PersistentFlags().BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable (default=true)")

rootCmd.AddCommand(amqp_amqp)
Expand Down
3 changes: 3 additions & 0 deletions pkg/amqp10_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (p Amqp10Publisher) StartRateLimited() {
func (p Amqp10Publisher) Send() {
utils.UpdatePayload(p.Config.UseMillis, &p.msg)
msg := amqp.NewMessage(p.msg)
if p.Config.Amqp.Subject != "" {
msg.Properties = &amqp.MessageProperties{Subject: &p.Config.Amqp.Subject}
}
msg.Header = &amqp.MessageHeader{
Durable: p.Config.MessageDurability}
timer := prometheus.NewTimer(metrics.PublishingLatency.With(prometheus.Labels{"protocol": "amqp-1.0"}))
Expand Down
33 changes: 17 additions & 16 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var AmqpDurabilityModes = map[AmqpDurabilityMode][]string{

type AmqpOptions struct {
ConsumerCredits int
Subject string
}

type MqttOptions struct {
Expand All @@ -30,23 +31,23 @@ type MqttOptions struct {
}

type Config struct {
PublisherUri string
ConsumerUri string
Publishers int
Consumers int
PublishCount int
ConsumeCount int
PublishTo string
ConsumeFrom string
Size int
Rate int
Duration time.Duration
UseMillis bool
QueueDurability AmqpDurabilityMode
PublisherUri string
ConsumerUri string
Publishers int
Consumers int
PublishCount int
ConsumeCount int
PublishTo string
ConsumeFrom string
Size int
Rate int
Duration time.Duration
UseMillis bool
QueueDurability AmqpDurabilityMode
MessageDurability bool
Amqp AmqpOptions
MqttPublisher MqttOptions
MqttConsumer MqttOptions
Amqp AmqpOptions
MqttPublisher MqttOptions
MqttConsumer MqttOptions
}

func NewConfig() Config {
Expand Down

0 comments on commit 28f0d27

Please sign in to comment.