Skip to content

Commit

Permalink
fix(probe): fix muilt target error
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Oct 12, 2023
1 parent e1227e8 commit 52071dd
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions prober/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prober

import (
"context"
"emqx-exporter/config"
"time"

Expand All @@ -14,7 +15,31 @@ type MQTTProbe struct {
MsgChan <-chan mqtt.Message
}

var mqttProbe *MQTTProbe
var mqttProbeMap map[string]*MQTTProbe

func init() {
mqttProbeMap = make(map[string]*MQTTProbe)
go func() {
for {
for target, probe := range mqttProbeMap {
if probe == nil {
delete(mqttProbeMap, target)
continue
}
if !probe.Client.IsConnected() {
delete(mqttProbeMap, target)
continue
}
}

select {
case <-context.Background().Done():
return
case <-time.After(5 * time.Second):
}
}
}()
}

func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target).SetClientID(probe.ClientID).SetUsername(probe.Username).SetPassword(probe.Password)
Expand Down Expand Up @@ -45,11 +70,13 @@ func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
}

func ProbeMQTT(probe config.Probe, logger log.Logger) bool {
if mqttProbe == nil {
mqttProbe, ok := mqttProbeMap[probe.Target]
if !ok {
var err error
if mqttProbe, err = initMQTTProbe(probe, logger); err != nil {
return false
}
mqttProbeMap[probe.Target] = mqttProbe
}

if !mqttProbe.Client.IsConnected() {
Expand Down

0 comments on commit 52071dd

Please sign in to comment.