Skip to content

Commit

Permalink
MQTT Publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Sep 25, 2023
1 parent 139a1f0 commit b1632e2
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 1 deletion.
4 changes: 3 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func RootCmd() *cobra.Command {
Use: "mqtt-mqtt",
Aliases: []string{"mqtt"},
Run: func(cmd *cobra.Command, args []string) {
start(cfg, mqtt_client.Publisher, mqtt_client.Consumer)
// start(cfg, mqtt_client.Publisher, mqtt_client.Consumer)
p := mqtt_client.NewPublisher(cfg, 1)
p.Start()
},
}

Expand Down
89 changes: 89 additions & 0 deletions pkg/mqtt_client/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package mqtt_client

import (
"fmt"
"math/rand"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/log"
"github.com/rabbitmq/omq/pkg/utils"

"github.com/rabbitmq/omq/pkg/metrics"

"github.com/prometheus/client_golang/prometheus"
)

type MqttPublisher struct {
Id int
Connection mqtt.Client
Topic string
Config config.Config
}

func NewPublisher(cfg config.Config, n int) *MqttPublisher {
var token mqtt.Token

// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(cfg.Publishers)
time.Sleep(time.Duration(s) * time.Millisecond)

// open connection
opts := mqtt.NewClientOptions().
AddBroker(cfg.PublisherUri).
SetUsername("guest").
SetPassword("guest").
SetClientID(fmt.Sprintf("omq-pub-%d", n)).
SetAutoReconnect(true).
SetConnectionLostHandler(func(client mqtt.Client, reason error) {
log.Info("connection lost", "protocol", "MQTT", "publisherId", n)
}).
SetProtocolVersion(4)

connection := mqtt.NewClient(opts)
token = connection.Connect()
token.Wait()

topic := fmt.Sprintf("%s-%d", cfg.QueueNamePrefix, ((n-1)%cfg.QueueCount)+1)

return &MqttPublisher{
Id: n,
Connection: connection,
Topic: topic,
Config: cfg,
}

}

func (p MqttPublisher) Start() {
if p.Config.Rate == -1 {
p.StartFullSpeed()
} else {
p.StartRateLimited()
}
}

func (p MqttPublisher) StartFullSpeed() {
msg := utils.MessageBody(p.Config)
log.Info("publisher started", "protocol", "MQTT", "publisherId", p.Id, "rate", "unlimited", "destination", p.Topic)
for i := 1; i <= p.Config.PublishCount; i++ {
utils.UpdatePayload(p.Config.UseMillis, &msg)
timer := prometheus.NewTimer(metrics.PublishingLatency.With(prometheus.Labels{"protocol": "mqtt"}))
token := p.Connection.Publish(p.Topic, 1, false, msg)
token.Wait()
timer.ObserveDuration()
if token.Error() != nil {
log.Error("message sending failure", "protocol", "MQTT", "publisherId", p.Id, "error", token.Error())
}
log.Debug("message sent", "protocol", "MQTT", "publisherId", p.Id)
metrics.MessagesPublished.With(prometheus.Labels{"protocol": "mqtt"}).Inc()
utils.WaitBetweenMessages(p.Config.Rate)
}

log.Debug("publisher stopped", "protocol", "MQTT", "publisherId", p.Id)
}

func (p MqttPublisher) StartRateLimited() {
log.Info("publisher started", "protocol", "MQTT", "publisherId", p.Id, "rate", p.Config.Rate, "destination", p.Topic)
}

0 comments on commit b1632e2

Please sign in to comment.