Skip to content

Commit

Permalink
feat: add mqtt probe
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Oct 10, 2023
1 parent 952c0ee commit dc42a0b
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 13 deletions.
8 changes: 2 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ all::
ARCH = $(shell go env GOARCH)
OS = $(shell go env GOOS)

# Needs to be defined before including Makefile.common to auto-generate targets
DOCKER_ARCHS ?= amd64

DOCKER_IMAGE_NAME ?= emqx-exporter:latest

.PHONY: build LOCALBIN
build:
CGO_ENABLED=0 GOOS=${OS} GOARCH=${ARCH} go build -o .build/${OS}-${ARCH}/emqx-exporter
Expand All @@ -30,8 +25,9 @@ build:
test:
go test -race --cover -covermode=atomic -coverpkg=./... -coverprofile=cover.out ./...

DOCKER_IMAGE_NAME ?= emqx-exporter
.PHONY: docker-build
docker-build: build
docker-build:
docker build -t ${DOCKER_IMAGE_NAME} .

## Location to install dependencies to
Expand Down
12 changes: 10 additions & 2 deletions examples/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.8'
services:
emqx:
image: emqx/emqx-enterprise:5.0.1
image: emqx/emqx-enterprise:5.3
container_name: emqx-demo
ports:
- 18083:18083
Expand All @@ -11,11 +11,19 @@ services:
- 8883:8883
environment:
EMQX_DASHBOARD__BOOTSTRAP_USERS_FILE: '"/opt/emqx/data/api_secret"'
EMQX_API_KEY__BOOTSTRAP_FILE: '"/opt/emqx/data/api_secret"'
volumes:
- ./api_secret:/opt/emqx/data/api_secret
healthcheck:
test: ["CMD", "emqx", "ctl", "status"]
interval: 30s
timeout: 10s
retries: 3

emqx-exporter:
image: emqx/emqx-exporter
depends_on:
- emqx
image: emqx-exporter
container_name: exporter-demo
ports:
- 8085:8085
Expand Down
12 changes: 11 additions & 1 deletion examples/docker-compose/prometheus-emqx5.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,14 @@ scrape_configs:
# label the cluster name of where the metrics data from
cluster: test
# fix value, don't modify
from: exporter
from: exporter
- job_name: 'mqtt-probe'
metrics_path: /probe
scrape_interval: 5s
static_configs:
- targets: [exporter-demo:8085]
labels:
# label the cluster name of where the metrics data from
cluster: test
# fix value, don't modify
from: probe
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.16.4 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
Expand All @@ -28,6 +30,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down
19 changes: 15 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ package main

import (
"emqx-exporter/client"
"emqx-exporter/prober"
"fmt"
"github.com/alecthomas/kingpin/v2"
promcollectors "github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
stdlog "log"
"net/http"
_ "net/http/pprof"
Expand All @@ -28,7 +25,13 @@ import (
"runtime"
"sort"

"github.com/alecthomas/kingpin/v2"
promcollectors "github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"

"emqx-exporter/collector"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -143,6 +146,10 @@ func main() {

http.Handle("/metrics", newHandler(!*disableExporterMetrics, *maxRequests, logger))

http.HandleFunc("/probe", func(w http.ResponseWriter, r *http.Request) {
prober.Handler(w, r, logger)
})

landingConfig := web.LandingConfig{
Name: "EMQX Exporter",
Description: "EMQX Exporter",
Expand All @@ -152,6 +159,10 @@ func main() {
Address: "/metrics",
Text: "Metrics",
},
{
Address: "/probe",
Text: "Probe",
},
},
}
landingPage, err := web.NewLandingPage(landingConfig)
Expand Down
40 changes: 40 additions & 0 deletions prober/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package prober

import (
"net/http"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func Handler(w http.ResponseWriter, r *http.Request, logger log.Logger) {
probeSuccessGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "emqx",
Subsystem: "mqtt",
Name: "probe_success",
Help: "Displays whether or not the probe was a success",
})
probeDurationGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "emqx",
Subsystem: "mqtt",
Name: "probe_duration_seconds",
Help: "Returns how long the probe took to complete in seconds",
})

registry := prometheus.NewRegistry()
registry.MustRegister(probeSuccessGauge)
registry.MustRegister(probeDurationGauge)

start := time.Now()
if ProbeMQTT(logger) {
probeSuccessGauge.Set(1)
} else {
probeSuccessGauge.Set(0)
}
probeDurationGauge.Set(time.Since(start).Seconds())

h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
}
69 changes: 69 additions & 0 deletions prober/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package prober

import (
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

type MQTTProbe struct {
Client mqtt.Client
MsgChan <-chan mqtt.Message
}

var mqttProbe *MQTTProbe

func initMQTTProbe(logger log.Logger) *MQTTProbe {
opt := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx-exporter")
opt.SetOnConnectHandler(func(c mqtt.Client) {
level.Info(logger).Log("msg", "Connected to MQTT broker")
})
opt.SetConnectionLostHandler(func(c mqtt.Client, err error) {
level.Error(logger).Log("msg", "Lost connection to MQTT broker", "err", err)
})
c := mqtt.NewClient(opt)
if token := c.Connect(); token.Wait() && token.Error() != nil {
level.Error(logger).Log("msg", "Failed to connect to MQTT broker", "err", token.Error())
panic(token.Error())
}

var msgChan = make(chan mqtt.Message)
if token := c.Subscribe("emqx-exporter", 1, func(c mqtt.Client, m mqtt.Message) {
msgChan <- m
}); token.Wait() && token.Error() != nil {
level.Error(logger).Log("msg", "Failed to subscribe to MQTT topic", "err", token.Error())
panic(token.Error())
}

return &MQTTProbe{
Client: c,
MsgChan: msgChan,
}
}

func ProbeMQTT(logger log.Logger) bool {
if mqttProbe == nil {
mqttProbe = initMQTTProbe(logger)
}

if !mqttProbe.Client.IsConnected() {
return false
}

if token := mqttProbe.Client.Publish("emqx-exporter", 1, false, "hello world"); token.Wait() && token.Error() != nil {
return false
}

select {
case msg := <-mqttProbe.MsgChan:
if msg == nil {
return false
}
case <-time.After(5 * time.Second):
return false
}

return true
}

0 comments on commit dc42a0b

Please sign in to comment.