Skip to content

Commit

Permalink
chore: fix case error
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Nov 21, 2023
1 parent 6643941 commit c876518
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ func (sc *SafeConfig) ReloadConfig(confFile string) (err error) {
if probe.ClientID == "" {
hostname, _ := os.Hostname()
hostname = strings.Replace(hostname, ".", "-", -1)
probe.ClientID = "emqx-exporter-probe-" + hostname + fmt.Sprintf("%d", index)
probe.ClientID = fmt.Sprintf("emqx-exporter-probe-%s-%d", hostname, index)
}
if probe.Topic == "" {
hostname, _ := os.Hostname()
hostname = strings.Replace(hostname, ".", "-", -1)
probe.Topic = "emqx-exporter-probe/" + hostname + "/" + fmt.Sprintf("%d", index)
probe.Topic = fmt.Sprintf("emqx-exporter-probe/%s/%d", hostname, index)
}
if probe.KeepAlive == 0 {
probe.KeepAlive = 30
Expand Down
5 changes: 3 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ dashboard {
}
}
}
log.console.level = debug
listeners {
tcp.fake{
bind = 11883
Expand Down Expand Up @@ -590,7 +591,7 @@ rule_engine {

mqttxSubResp, err := cli.ContainerCreate(ctx, &container.Config{
Image: mqttxContainer.image,
Cmd: []string{"mqttx", "bench", "sub", "-t", "test", "-h", emqxInfo.NetworkSettings.IPAddress},
Cmd: []string{"mqttx", "sub", "-t", "test", "-h", emqxInfo.NetworkSettings.IPAddress},
}, nil, nil, nil, "mqttx-sub")
if err != nil {
panic(err)
Expand All @@ -602,7 +603,7 @@ rule_engine {

mqttxPubResp, err := cli.ContainerCreate(ctx, &container.Config{
Image: mqttxContainer.image,
Cmd: []string{"mqttx", "bench", "pub", "-c", "1", "-t", "test", "-h", emqxInfo.NetworkSettings.IPAddress},
Cmd: []string{"mqttx", "pub", "-c", "1", "-t", "test", "-h", emqxInfo.NetworkSettings.IPAddress},
}, nil, nil, nil, "mqttx-pub")
if err != nil {
panic(err)
Expand Down
13 changes: 12 additions & 1 deletion prober/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prober
import (
"context"
"emqx-exporter/config"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -49,6 +50,7 @@ func init() {
}

func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
var isReady = make(chan struct{})
var msgChan = make(chan mqtt.Message)

opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target)
Expand All @@ -60,7 +62,8 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
opt.SetTLSConfig(probe.TLSClientConfig.ToTLSConfig())
}
opt.SetOnConnectHandler(func(c mqtt.Client) {
level.Info(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target)
optReader := c.OptionsReader()
level.Info(logger).Log("msg", "Connected to MQTT broker", "target", probe.Target, "client_id", optReader.ClientID())
token := c.Subscribe(probe.Topic, probe.QoS, func(c mqtt.Client, m mqtt.Message) {
msgChan <- m
})
Expand All @@ -69,6 +72,7 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
level.Error(logger).Log("msg", "Failed to subscribe to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS, "err", token.Error())
return
}
isReady <- struct{}{}
level.Info(logger).Log("msg", "Subscribed to MQTT topic", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
})
opt.SetConnectionLostHandler(func(c mqtt.Client, err error) {
Expand All @@ -80,6 +84,12 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
return nil, token.Error()
}

select {
case <-isReady:
case <-time.After(60 * time.Second):
return nil, fmt.Errorf("MQTT probe connect timeout")
}

return &MQTTProbe{
Client: c,
MsgChan: msgChan,
Expand All @@ -102,6 +112,7 @@ func ProbeMQTT(probe config.Probe, logger log.Logger) bool {
return false
}

level.Info(logger).Log("msg", "Publishing MQTT message", "target", probe.Target, "topic", probe.Topic, "qos", probe.QoS)
if token := mqttProbe.Client.Publish(probe.Topic, probe.QoS, false, "hello world"); token.Wait() && token.Error() != nil {
return false
}
Expand Down

0 comments on commit c876518

Please sign in to comment.