Skip to content

Commit

Permalink
Refactor MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Dec 2, 2024
1 parent c8ebc8f commit c6b6c5b
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 135 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b // indirect
github.com/google/pprof v0.0.0-20241128161848-dc51965c6481 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b h1:SXO0REt4iu865upYCk8aKBBJQ4BqoE0ReP23ClMu60s=
github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/pprof v0.0.0-20241128161848-dc51965c6481 h1:yudKIrXagAOl99WQzrP1gbz5HLB9UjhcOFnPzdd6Qec=
github.com/google/pprof v0.0.0-20241128161848-dc51965c6481/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
Expand Down
47 changes: 27 additions & 20 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ var _ = Describe("OMQ CLI", func() {
args := []string{
"mqtt",
"--publishers=100",
"--publish-to=sensor/%d",
"--publish-to=time/%d",
"--rate=1",
"--consumers=100",
"--consume-from=/queues/sensors",
"--binding-key=sensor.#",
"--consume-from=/queues/time",
"--binding-key=time.#",
"--time", "5s",
}
session := omq(args)
Expand Down Expand Up @@ -68,11 +68,11 @@ var _ = Describe("OMQ CLI", func() {
args := []string{
"mqtt",
"--publishers=2",
"--publish-to=sensor/%d",
"--publish-to=can-stop-me-now/%d",
"--rate=1",
"--consumers=2",
"--consume-from=/queues/sensors",
"--binding-key=sensor.#",
"--consume-from=/queues/can-stop-me-now",
"--binding-key=can-stop-me-now.#",
}
session := omq(args)
// wait until metrics are printed (after consuemrs connected and publishers were spawned)
Expand Down Expand Up @@ -201,16 +201,16 @@ var _ = Describe("OMQ CLI", func() {
"--publishers=3",
"--consumers=1",
"--pmessages=5",
"--publish-to=sensor/%d",
"--consume-from=/queues/sensors",
"--binding-key=sensor.#",
"--publish-to=fan-in-mqtt-amqp",
"--consume-from=/queues/fan-in-mqtt-amqp",
"--binding-key=fan-in-mqtt-amqp",
"--queues=classic",
"--cleanup-queues=true",
"--time=2s",
"--time=5s",
}

session := omq(args)
Eventually(session).WithTimeout(4 * time.Second).Should(gexec.Exit(0))
Eventually(session).WithTimeout(6 * time.Second).Should(gexec.Exit(0))
Eventually(session.Err).Should(gbytes.Say(`TOTAL PUBLISHED messages=15`))
Eventually(session.Err).Should(gbytes.Say(`TOTAL CONSUMED messages=15`))
})
Expand Down Expand Up @@ -278,11 +278,10 @@ var _ = Describe("OMQ CLI", func() {
Expect(err).ShouldNot(HaveOccurred())
args := []string{
"mqtt",
"--time=6s",
"--publish-to=/topic/foo",
"--consume-from=/topic/foo",
"--consumer-id=omq-test-%r",
"--publisher-id=omq-test-%r",
"--publish-to=/topic/omq-version-test-topic-" + versionFlag,
"--consume-from=/topic/omq-version-test-topic-" + versionFlag,
"--consumer-id=omq-version-test-consumer-" + versionFlag,
"--publisher-id=omq-version-test-publisher-" + versionFlag,
"--rate", "1",
"--print-all-metrics",
}
Expand All @@ -291,16 +290,24 @@ var _ = Describe("OMQ CLI", func() {
args = append(args, "--mqtt-consumer-version", versionFlag)
}
session := omq(args)
Eventually(session.Err).WithTimeout(5 * time.Second).Should(gbytes.Say(`published=`))

Eventually(func() bool {
conns, err := rmqc.ListConnections()
return err == nil &&
if err == nil &&
len(conns) >= 2 &&
slices.ContainsFunc(conns, func(conn rabbithole.ConnectionInfo) bool {
return conn.Protocol == connectionVersion
})
return conn.Protocol == connectionVersion &&
strings.HasPrefix(conn.ClientProperties["client_id"].(string), "omq-version-test")
}) {
return true
} else {
GinkgoWriter.Printf("\n--- time: %v len: %v ---\n%+v\n---\n", time.Now(), len(conns), conns)
return false
}
}, 7*time.Second, 500*time.Millisecond).Should(BeTrue())
Eventually(session).WithTimeout(7 * time.Second).Should(gexec.Exit(0))
session.Signal(os.Signal(os.Interrupt))
Eventually(session).WithTimeout(5 * time.Second).Should(gexec.Exit(0))

output, _ := io.ReadAll(session.Out)
buf := bytes.NewReader(output)
Expand Down
120 changes: 15 additions & 105 deletions pkg/mqtt_client/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@ import (
"context"
"net/url"
"strings"
"time"

"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/log"
"github.com/rabbitmq/omq/pkg/utils"
)

Expand All @@ -23,114 +18,19 @@ type Publisher interface {
}

func NewConsumer(ctx context.Context, cfg config.Config, id int) Consumer {
topic := utils.InjectId(cfg.ConsumeFrom, id)
topic = strings.TrimPrefix(topic, "/exchange/amq.topic/")
topic = strings.TrimPrefix(topic, "/topic/")

if cfg.MqttConsumer.Version == 5 {
return &Mqtt5Consumer{
Id: id,
Connection: nil,
Topic: topic,
Config: cfg,
}
if cfg.MqttPublisher.Version == 5 {
return NewMqtt5Consumer(cfg, id)
} else {
return &MqttConsumer{
Id: id,
Connection: nil,
Topic: topic,
Config: cfg,
}
return NewMqttConsumer(cfg, id)
}
}

func NewPublisher(ctx context.Context, cfg config.Config, id int) Publisher {
topic := utils.InjectId(cfg.PublishTo, id)
// AMQP-1.0 and STOMP allow /exchange/amq.topic/ prefix
// since MQTT has no concept of exchanges, we need to remove it
// this should get more flexible in the future
topic = strings.TrimPrefix(topic, "/exchange/amq.topic/")
topic = strings.TrimPrefix(topic, "/topic/")

if cfg.MqttPublisher.Version == 5 {
connection := newMqtt5Connection(cfg, id)
return &Mqtt5Publisher{
Id: id,
Connection: connection,
Topic: topic,
Config: cfg,
}
return NewMqtt5Publisher(cfg, id)
} else {
connection := newMqtt34Connection(cfg, id)
return &MqttPublisher{
Id: id,
Connection: connection,
Topic: topic,
Config: cfg,
}
}
}

func newMqtt34Connection(cfg config.Config, id int) mqtt.Client {
var token mqtt.Token

opts := mqtt.NewClientOptions().
SetClientID(utils.InjectId(cfg.PublisherId, id)).
SetAutoReconnect(true).
SetCleanSession(cfg.MqttPublisher.CleanSession).
SetConnectionLostHandler(func(client mqtt.Client, reason error) {
log.Info("publisher connection lost", "id", id)
}).
SetProtocolVersion(uint(cfg.MqttPublisher.Version))

var j int
for i, n := range utils.WrappedSequence(len(cfg.PublisherUri), id-1) {
if cfg.SpreadConnections {
j = n
} else {
j = i
}
parsedUri := utils.ParseURI(cfg.PublisherUri[j], "mqtt", "1883")
opts.AddBroker(parsedUri.Broker).SetUsername(parsedUri.Username).SetPassword(parsedUri.Password)
}

connection := mqtt.NewClient(opts)
token = connection.Connect()
token.Wait()
if token.Error() != nil {
log.Error("publisher connection failed", "id", id, "error", token.Error())
return NewMqttPublisher(cfg, id)
}
return connection
}

func newMqtt5Connection(cfg config.Config, id int) *autopaho.ConnectionManager {
opts := autopaho.ClientConfig{
ServerUrls: stringsToUrls(cfg.PublisherUri),
CleanStartOnInitialConnection: cfg.MqttPublisher.CleanSession,
KeepAlive: 20,
ConnectRetryDelay: 1 * time.Second,
OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) {
log.Info("publisher connected", "id", id)
},
OnConnectError: func(err error) {
log.Info("publisher failed to connect ", "id", id, "error", err)
},
ClientConfig: paho.ClientConfig{
ClientID: utils.InjectId(cfg.PublisherId, id),
OnClientError: func(err error) {
log.Error("publisher error", "id", id, "error", err)
},
OnServerDisconnect: func(d *paho.Disconnect) {
log.Error("publisher disconnected", "id", id, "reasonCode", d.ReasonCode, "reasonString", d.Properties.ReasonString)
},
},
}

connection, err := autopaho.NewConnection(context.TODO(), opts)
if err != nil {
log.Error("publisher connection failed", "id", id, "error", err)
}
return connection
}

func stringsToUrls(connectionStrings []string) []*url.URL {
Expand All @@ -144,3 +44,13 @@ func stringsToUrls(connectionStrings []string) []*url.URL {
}
return serverUrls
}

func publisherTopic(topic string, id int) string {
topic = utils.InjectId(topic, id)
// AMQP-1.0 and STOMP allow /exchange/amq.topic/ prefix
// since MQTT has no concept of exchanges, we need to remove it
// this should get more flexible in the future
topic = strings.TrimPrefix(topic, "/exchange/amq.topic/")
topic = strings.TrimPrefix(topic, "/topic/")
return topic
}
10 changes: 10 additions & 0 deletions pkg/mqtt_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ type MqttConsumer struct {
Config config.Config
}

func NewMqttConsumer(cfg config.Config, id int) MqttConsumer {
topic := publisherTopic(cfg.ConsumeFrom, id)
return MqttConsumer{
Id: id,
Connection: nil,
Topic: topic,
Config: cfg,
}
}

func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) {
msgsReceived := 0
previousMessageTimeSent := time.Unix(0, 0)
Expand Down
11 changes: 11 additions & 0 deletions pkg/mqtt_client/consumer_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ type Mqtt5Consumer struct {
Config config.Config
}

func NewMqtt5Consumer(cfg config.Config, id int) Mqtt5Consumer {
topic := publisherTopic(cfg.ConsumeFrom, id)
return Mqtt5Consumer{
Id: id,
Connection: nil,
Topic: topic,
Config: cfg,
}
}

func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) {
msgsReceived := 0
previousMessageTimeSent := time.Unix(0, 0)
Expand Down Expand Up @@ -81,6 +91,7 @@ func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) {
if err != nil {
log.Error("consumer connection failed", "id", c.Id, "error", err)
}
c.Connection.AwaitConnection(ctx)

Check failure on line 94 in pkg/mqtt_client/consumer_v5.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `c.Connection.AwaitConnection` is not checked (errcheck)
close(subscribed)

// TODO: currently we can consume more than ConsumerCount messages
Expand Down
13 changes: 13 additions & 0 deletions pkg/mqtt_client/mqtt_client_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package mqtt_client_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestMqttClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "MqttClient Suite")
}
42 changes: 42 additions & 0 deletions pkg/mqtt_client/mqtt_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package mqtt_client

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/rabbitmq/omq/pkg/config"
)

var _ = Context("MQTT 3.1/3.1.1 client", func() {
Describe("uses the correct URI for the publisher", func() {
cfg := config.Config{
PublisherUri: []string{"mqtt://publisher:1883"},
PublisherId: "my-client-id-%d",
MqttPublisher: config.MqttOptions{
Version: 3,
CleanSession: true,
},
}
publisher := NewMqttPublisher(cfg, 1)
opts := publisher.connectionOptions()
Expect(opts.ClientID).To(Equal("my-client-id-1"))
Expect(opts.Servers[0].Host).To(Equal("publisher:1883"))
})
Describe("MQTT 5.0 client", func() {
Describe("uses the correct URI for the publisher", func() {
It("--publisher-uri", func() {
cfg := config.Config{
PublisherUri: []string{"mqtt://publisher:1883"},
PublisherId: "my-client-id-%d",
MqttPublisher: config.MqttOptions{
Version: 5,
CleanSession: true,
},
}
publisher := NewMqtt5Publisher(cfg, 1)
opts := publisher.connectionOptions()
Expect(opts.ClientID).To(Equal("my-client-id-1"))
Expect(opts.ServerUrls[0].Host).To(Equal("publisher:1883"))
})
})
})
})
Loading

0 comments on commit c6b6c5b

Please sign in to comment.