From c6b6c5b622c2383e973c2b91ccaf3566ceba8d60 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Sat, 30 Nov 2024 18:31:03 +0100 Subject: [PATCH] Refactor MQTT --- go.mod | 2 +- go.sum | 4 +- main_test.go | 47 +++++---- pkg/mqtt_client/common.go | 120 +++------------------- pkg/mqtt_client/consumer.go | 10 ++ pkg/mqtt_client/consumer_v5.go | 11 ++ pkg/mqtt_client/mqtt_client_suite_test.go | 13 +++ pkg/mqtt_client/mqtt_client_test.go | 42 ++++++++ pkg/mqtt_client/publisher.go | 67 +++++++++++- pkg/mqtt_client/publisher_v5.go | 59 ++++++++++- 10 files changed, 240 insertions(+), 135 deletions(-) create mode 100644 pkg/mqtt_client/mqtt_client_suite_test.go create mode 100644 pkg/mqtt_client/mqtt_client_test.go diff --git a/go.mod b/go.mod index ee6025d..6b33b26 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/btree v1.1.3 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b // indirect + github.com/google/pprof v0.0.0-20241128161848-dc51965c6481 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect diff --git a/go.sum b/go.sum index a28e09c..b6e7ed8 100644 --- a/go.sum +++ b/go.sum @@ -84,8 +84,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= -github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b h1:SXO0REt4iu865upYCk8aKBBJQ4BqoE0ReP23ClMu60s= -github.com/google/pprof v0.0.0-20241122213907-cbe949e5a41b/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/pprof v0.0.0-20241128161848-dc51965c6481 h1:yudKIrXagAOl99WQzrP1gbz5HLB9UjhcOFnPzdd6Qec= +github.com/google/pprof v0.0.0-20241128161848-dc51965c6481/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= diff --git a/main_test.go b/main_test.go index 1761922..c76c01b 100644 --- a/main_test.go +++ b/main_test.go @@ -30,11 +30,11 @@ var _ = Describe("OMQ CLI", func() { args := []string{ "mqtt", "--publishers=100", - "--publish-to=sensor/%d", + "--publish-to=time/%d", "--rate=1", "--consumers=100", - "--consume-from=/queues/sensors", - "--binding-key=sensor.#", + "--consume-from=/queues/time", + "--binding-key=time.#", "--time", "5s", } session := omq(args) @@ -68,11 +68,11 @@ var _ = Describe("OMQ CLI", func() { args := []string{ "mqtt", "--publishers=2", - "--publish-to=sensor/%d", + "--publish-to=can-stop-me-now/%d", "--rate=1", "--consumers=2", - "--consume-from=/queues/sensors", - "--binding-key=sensor.#", + "--consume-from=/queues/can-stop-me-now", + "--binding-key=can-stop-me-now.#", } session := omq(args) // wait until metrics are printed (after consuemrs connected and publishers were spawned) @@ -201,16 +201,16 @@ var _ = Describe("OMQ CLI", func() { "--publishers=3", "--consumers=1", "--pmessages=5", - "--publish-to=sensor/%d", - "--consume-from=/queues/sensors", - "--binding-key=sensor.#", + "--publish-to=fan-in-mqtt-amqp", + "--consume-from=/queues/fan-in-mqtt-amqp", + "--binding-key=fan-in-mqtt-amqp", "--queues=classic", "--cleanup-queues=true", - "--time=2s", + "--time=5s", } session := omq(args) - Eventually(session).WithTimeout(4 * time.Second).Should(gexec.Exit(0)) + Eventually(session).WithTimeout(6 * time.Second).Should(gexec.Exit(0)) Eventually(session.Err).Should(gbytes.Say(`TOTAL PUBLISHED messages=15`)) Eventually(session.Err).Should(gbytes.Say(`TOTAL CONSUMED messages=15`)) }) @@ -278,11 +278,10 @@ var _ = Describe("OMQ CLI", func() { Expect(err).ShouldNot(HaveOccurred()) args := []string{ "mqtt", - "--time=6s", - "--publish-to=/topic/foo", - "--consume-from=/topic/foo", - "--consumer-id=omq-test-%r", - "--publisher-id=omq-test-%r", + "--publish-to=/topic/omq-version-test-topic-" + versionFlag, + "--consume-from=/topic/omq-version-test-topic-" + versionFlag, + "--consumer-id=omq-version-test-consumer-" + versionFlag, + "--publisher-id=omq-version-test-publisher-" + versionFlag, "--rate", "1", "--print-all-metrics", } @@ -291,16 +290,24 @@ var _ = Describe("OMQ CLI", func() { args = append(args, "--mqtt-consumer-version", versionFlag) } session := omq(args) + Eventually(session.Err).WithTimeout(5 * time.Second).Should(gbytes.Say(`published=`)) Eventually(func() bool { conns, err := rmqc.ListConnections() - return err == nil && + if err == nil && len(conns) >= 2 && slices.ContainsFunc(conns, func(conn rabbithole.ConnectionInfo) bool { - return conn.Protocol == connectionVersion - }) + return conn.Protocol == connectionVersion && + strings.HasPrefix(conn.ClientProperties["client_id"].(string), "omq-version-test") + }) { + return true + } else { + GinkgoWriter.Printf("\n--- time: %v len: %v ---\n%+v\n---\n", time.Now(), len(conns), conns) + return false + } }, 7*time.Second, 500*time.Millisecond).Should(BeTrue()) - Eventually(session).WithTimeout(7 * time.Second).Should(gexec.Exit(0)) + session.Signal(os.Signal(os.Interrupt)) + Eventually(session).WithTimeout(5 * time.Second).Should(gexec.Exit(0)) output, _ := io.ReadAll(session.Out) buf := bytes.NewReader(output) diff --git a/pkg/mqtt_client/common.go b/pkg/mqtt_client/common.go index da018f7..08f352b 100644 --- a/pkg/mqtt_client/common.go +++ b/pkg/mqtt_client/common.go @@ -4,13 +4,8 @@ import ( "context" "net/url" "strings" - "time" - "github.com/eclipse/paho.golang/autopaho" - "github.com/eclipse/paho.golang/paho" - mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/rabbitmq/omq/pkg/config" - "github.com/rabbitmq/omq/pkg/log" "github.com/rabbitmq/omq/pkg/utils" ) @@ -23,114 +18,19 @@ type Publisher interface { } func NewConsumer(ctx context.Context, cfg config.Config, id int) Consumer { - topic := utils.InjectId(cfg.ConsumeFrom, id) - topic = strings.TrimPrefix(topic, "/exchange/amq.topic/") - topic = strings.TrimPrefix(topic, "/topic/") - - if cfg.MqttConsumer.Version == 5 { - return &Mqtt5Consumer{ - Id: id, - Connection: nil, - Topic: topic, - Config: cfg, - } + if cfg.MqttPublisher.Version == 5 { + return NewMqtt5Consumer(cfg, id) } else { - return &MqttConsumer{ - Id: id, - Connection: nil, - Topic: topic, - Config: cfg, - } + return NewMqttConsumer(cfg, id) } } func NewPublisher(ctx context.Context, cfg config.Config, id int) Publisher { - topic := utils.InjectId(cfg.PublishTo, id) - // AMQP-1.0 and STOMP allow /exchange/amq.topic/ prefix - // since MQTT has no concept of exchanges, we need to remove it - // this should get more flexible in the future - topic = strings.TrimPrefix(topic, "/exchange/amq.topic/") - topic = strings.TrimPrefix(topic, "/topic/") - if cfg.MqttPublisher.Version == 5 { - connection := newMqtt5Connection(cfg, id) - return &Mqtt5Publisher{ - Id: id, - Connection: connection, - Topic: topic, - Config: cfg, - } + return NewMqtt5Publisher(cfg, id) } else { - connection := newMqtt34Connection(cfg, id) - return &MqttPublisher{ - Id: id, - Connection: connection, - Topic: topic, - Config: cfg, - } - } -} - -func newMqtt34Connection(cfg config.Config, id int) mqtt.Client { - var token mqtt.Token - - opts := mqtt.NewClientOptions(). - SetClientID(utils.InjectId(cfg.PublisherId, id)). - SetAutoReconnect(true). - SetCleanSession(cfg.MqttPublisher.CleanSession). - SetConnectionLostHandler(func(client mqtt.Client, reason error) { - log.Info("publisher connection lost", "id", id) - }). - SetProtocolVersion(uint(cfg.MqttPublisher.Version)) - - var j int - for i, n := range utils.WrappedSequence(len(cfg.PublisherUri), id-1) { - if cfg.SpreadConnections { - j = n - } else { - j = i - } - parsedUri := utils.ParseURI(cfg.PublisherUri[j], "mqtt", "1883") - opts.AddBroker(parsedUri.Broker).SetUsername(parsedUri.Username).SetPassword(parsedUri.Password) - } - - connection := mqtt.NewClient(opts) - token = connection.Connect() - token.Wait() - if token.Error() != nil { - log.Error("publisher connection failed", "id", id, "error", token.Error()) + return NewMqttPublisher(cfg, id) } - return connection -} - -func newMqtt5Connection(cfg config.Config, id int) *autopaho.ConnectionManager { - opts := autopaho.ClientConfig{ - ServerUrls: stringsToUrls(cfg.PublisherUri), - CleanStartOnInitialConnection: cfg.MqttPublisher.CleanSession, - KeepAlive: 20, - ConnectRetryDelay: 1 * time.Second, - OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) { - log.Info("publisher connected", "id", id) - }, - OnConnectError: func(err error) { - log.Info("publisher failed to connect ", "id", id, "error", err) - }, - ClientConfig: paho.ClientConfig{ - ClientID: utils.InjectId(cfg.PublisherId, id), - OnClientError: func(err error) { - log.Error("publisher error", "id", id, "error", err) - }, - OnServerDisconnect: func(d *paho.Disconnect) { - log.Error("publisher disconnected", "id", id, "reasonCode", d.ReasonCode, "reasonString", d.Properties.ReasonString) - }, - }, - } - - connection, err := autopaho.NewConnection(context.TODO(), opts) - if err != nil { - log.Error("publisher connection failed", "id", id, "error", err) - } - return connection } func stringsToUrls(connectionStrings []string) []*url.URL { @@ -144,3 +44,13 @@ func stringsToUrls(connectionStrings []string) []*url.URL { } return serverUrls } + +func publisherTopic(topic string, id int) string { + topic = utils.InjectId(topic, id) + // AMQP-1.0 and STOMP allow /exchange/amq.topic/ prefix + // since MQTT has no concept of exchanges, we need to remove it + // this should get more flexible in the future + topic = strings.TrimPrefix(topic, "/exchange/amq.topic/") + topic = strings.TrimPrefix(topic, "/topic/") + return topic +} diff --git a/pkg/mqtt_client/consumer.go b/pkg/mqtt_client/consumer.go index cedc7bd..591895e 100644 --- a/pkg/mqtt_client/consumer.go +++ b/pkg/mqtt_client/consumer.go @@ -19,6 +19,16 @@ type MqttConsumer struct { Config config.Config } +func NewMqttConsumer(cfg config.Config, id int) MqttConsumer { + topic := publisherTopic(cfg.ConsumeFrom, id) + return MqttConsumer{ + Id: id, + Connection: nil, + Topic: topic, + Config: cfg, + } +} + func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) { msgsReceived := 0 previousMessageTimeSent := time.Unix(0, 0) diff --git a/pkg/mqtt_client/consumer_v5.go b/pkg/mqtt_client/consumer_v5.go index 680d0dc..eae6a08 100644 --- a/pkg/mqtt_client/consumer_v5.go +++ b/pkg/mqtt_client/consumer_v5.go @@ -20,6 +20,16 @@ type Mqtt5Consumer struct { Config config.Config } +func NewMqtt5Consumer(cfg config.Config, id int) Mqtt5Consumer { + topic := publisherTopic(cfg.ConsumeFrom, id) + return Mqtt5Consumer{ + Id: id, + Connection: nil, + Topic: topic, + Config: cfg, + } +} + func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) { msgsReceived := 0 previousMessageTimeSent := time.Unix(0, 0) @@ -81,6 +91,7 @@ func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) { if err != nil { log.Error("consumer connection failed", "id", c.Id, "error", err) } + c.Connection.AwaitConnection(ctx) close(subscribed) // TODO: currently we can consume more than ConsumerCount messages diff --git a/pkg/mqtt_client/mqtt_client_suite_test.go b/pkg/mqtt_client/mqtt_client_suite_test.go new file mode 100644 index 0000000..8676f29 --- /dev/null +++ b/pkg/mqtt_client/mqtt_client_suite_test.go @@ -0,0 +1,13 @@ +package mqtt_client_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMqttClient(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "MqttClient Suite") +} diff --git a/pkg/mqtt_client/mqtt_client_test.go b/pkg/mqtt_client/mqtt_client_test.go new file mode 100644 index 0000000..ed7395e --- /dev/null +++ b/pkg/mqtt_client/mqtt_client_test.go @@ -0,0 +1,42 @@ +package mqtt_client + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rabbitmq/omq/pkg/config" +) + +var _ = Context("MQTT 3.1/3.1.1 client", func() { + Describe("uses the correct URI for the publisher", func() { + cfg := config.Config{ + PublisherUri: []string{"mqtt://publisher:1883"}, + PublisherId: "my-client-id-%d", + MqttPublisher: config.MqttOptions{ + Version: 3, + CleanSession: true, + }, + } + publisher := NewMqttPublisher(cfg, 1) + opts := publisher.connectionOptions() + Expect(opts.ClientID).To(Equal("my-client-id-1")) + Expect(opts.Servers[0].Host).To(Equal("publisher:1883")) + }) + Describe("MQTT 5.0 client", func() { + Describe("uses the correct URI for the publisher", func() { + It("--publisher-uri", func() { + cfg := config.Config{ + PublisherUri: []string{"mqtt://publisher:1883"}, + PublisherId: "my-client-id-%d", + MqttPublisher: config.MqttOptions{ + Version: 5, + CleanSession: true, + }, + } + publisher := NewMqtt5Publisher(cfg, 1) + opts := publisher.connectionOptions() + Expect(opts.ClientID).To(Equal("my-client-id-1")) + Expect(opts.ServerUrls[0].Host).To(Equal("publisher:1883")) + }) + }) + }) +}) diff --git a/pkg/mqtt_client/publisher.go b/pkg/mqtt_client/publisher.go index 46e5fd1..e532be6 100644 --- a/pkg/mqtt_client/publisher.go +++ b/pkg/mqtt_client/publisher.go @@ -21,12 +21,28 @@ type MqttPublisher struct { msg []byte } +func NewMqttPublisher(cfg config.Config, id int) MqttPublisher { + topic := publisherTopic(cfg.PublishTo, id) + return MqttPublisher{ + Id: id, + Connection: nil, + Topic: topic, + Config: cfg, + } +} + func (p MqttPublisher) Start(ctx context.Context) { // sleep random interval to avoid all publishers publishing at the same time s := rand.Intn(1000) time.Sleep(time.Duration(s) * time.Millisecond) - defer p.Connection.Disconnect(250) + defer func() { + if p.Connection != nil { + p.Connection.Disconnect(250) + } + }() + + p.Connect(ctx) p.msg = utils.MessageBody(p.Config.Size) @@ -41,6 +57,51 @@ func (p MqttPublisher) Start(ctx context.Context) { log.Debug("publisher stopped", "id", p.Id) } +func (p *MqttPublisher) Connect(ctx context.Context) { + var token mqtt.Token + + opts := p.connectionOptions() + connection := mqtt.NewClient(opts) + for { + token = connection.Connect() + token.Wait() + if token.Error() == nil { + break + } + log.Error("publisher connection failed", "id", p.Id, "error", token.Error()) + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + continue + } + } + p.Connection = connection +} + +func (p MqttPublisher) connectionOptions() *mqtt.ClientOptions { + opts := mqtt.NewClientOptions(). + SetClientID(utils.InjectId(p.Config.PublisherId, p.Id)). + SetAutoReconnect(true). + SetCleanSession(p.Config.MqttPublisher.CleanSession). + SetConnectionLostHandler(func(client mqtt.Client, reason error) { + log.Info("publisher connection lost", "id", p.Id) + }). + SetProtocolVersion(uint(p.Config.MqttPublisher.Version)) + + var j int + for i, n := range utils.WrappedSequence(len(p.Config.PublisherUri), p.Id-1) { + if p.Config.SpreadConnections { + j = n + } else { + j = i + } + parsedUri := utils.ParseURI(p.Config.PublisherUri[j], "mqtt", "1883") + opts.AddBroker(parsedUri.Broker).SetUsername(parsedUri.Username).SetPassword(parsedUri.Password) + } + return opts +} + func (p MqttPublisher) StartFullSpeed(ctx context.Context) { log.Info("publisher started", "id", p.Id, "rate", "unlimited", "destination", p.Topic) @@ -102,5 +163,7 @@ func (p MqttPublisher) Send() { func (p MqttPublisher) Stop(reason string) { log.Debug("closing connection", "id", p.Id, "reason", reason) - p.Connection.Disconnect(250) + if p.Connection != nil { + p.Connection.Disconnect(250) + } } diff --git a/pkg/mqtt_client/publisher_v5.go b/pkg/mqtt_client/publisher_v5.go index 7af3885..2600ca3 100644 --- a/pkg/mqtt_client/publisher_v5.go +++ b/pkg/mqtt_client/publisher_v5.go @@ -22,15 +22,29 @@ type Mqtt5Publisher struct { msg []byte } +func NewMqtt5Publisher(cfg config.Config, id int) Mqtt5Publisher { + topic := publisherTopic(cfg.PublishTo, id) + return Mqtt5Publisher{ + Id: id, + Connection: nil, + Topic: topic, + Config: cfg, + } +} + func (p Mqtt5Publisher) Start(ctx context.Context) { // sleep random interval to avoid all publishers publishing at the same time s := rand.Intn(1000) time.Sleep(time.Duration(s) * time.Millisecond) defer func() { - _ = p.Connection.Disconnect(context.TODO()) + if p.Connection != nil { + _ = p.Connection.Disconnect(context.TODO()) + } }() + p.Connect(ctx) + p.msg = utils.MessageBody(p.Config.Size) switch p.Config.Rate { @@ -41,9 +55,48 @@ func (p Mqtt5Publisher) Start(ctx context.Context) { default: p.StartRateLimited(ctx) } + // TODO it seems that sometimes if we stop quickly after sending + // a message, this message is not delivered, even though Publish + // is supposed to by synchronous; to be investigated + time.Sleep(500 * time.Millisecond) log.Debug("publisher stopped", "id", p.Id) } +func (p *Mqtt5Publisher) Connect(ctx context.Context) { + opts := p.connectionOptions() + connection, err := autopaho.NewConnection(context.TODO(), opts) + if err != nil { + log.Error("publisher connection failed", "id", p.Id, "error", err) + } + connection.AwaitConnection(ctx) + p.Connection = connection +} + +func (p Mqtt5Publisher) connectionOptions() autopaho.ClientConfig { + opts := autopaho.ClientConfig{ + ServerUrls: stringsToUrls(p.Config.PublisherUri), + CleanStartOnInitialConnection: p.Config.MqttPublisher.CleanSession, + KeepAlive: 20, + ConnectRetryDelay: 1 * time.Second, + OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) { + log.Info("publisher connected", "id", p.Id) + }, + OnConnectError: func(err error) { + log.Info("publisher failed to connect ", "id", p.Id, "error", err) + }, + ClientConfig: paho.ClientConfig{ + ClientID: utils.InjectId(p.Config.PublisherId, p.Id), + OnClientError: func(err error) { + log.Error("publisher error", "id", p.Id, "error", err) + }, + OnServerDisconnect: func(d *paho.Disconnect) { + log.Error("publisher disconnected", "id", p.Id, "reasonCode", d.ReasonCode, "reasonString", d.Properties.ReasonString) + }, + }, + } + return opts +} + func (p Mqtt5Publisher) StartFullSpeed(ctx context.Context) { log.Info("publisher started", "id", p.Id, "rate", "unlimited", "destination", p.Topic) @@ -85,10 +138,6 @@ func (p Mqtt5Publisher) StartRateLimited(ctx context.Context) { } func (p Mqtt5Publisher) Send() { - // if !p.Connection.IsConnected() { - // time.Sleep(1 * time.Second) - // return - // } utils.UpdatePayload(p.Config.UseMillis, &p.msg) startTime := time.Now() _, err := p.Connection.Publish(context.TODO(), &paho.Publish{