-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.go
134 lines (115 loc) · 3.84 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Package mqtt provides easy-to-use MQTT connection for projects.
package mqtt
import (
"crypto/tls"
"crypto/x509"
"errors"
"io/ioutil"
"time"
)
type handler func(topic string, payload []byte)
type responseHandler func(responseTopic string, payload []byte, id []byte)
// MQTT is an interface for mqttv3 and mqttv5 structs.
type MQTT interface {
// Handle handles new messages to subscribed topics.
Handle(handler)
// Publish sends a message to broker with a specific topic.
Publish(string, interface{}) error
// Request sends a message to broker and waits for the response.
Request(string, interface{}, time.Duration, handler) error
// RequestWith sends a message to broker with specific response topic,
// and waits for the response.
RequestWith(string, string, interface{}, time.Duration, handler) error
// SubscribeResponse creates new subscription for response topic.
SubscribeResponse(string) error
// Respond sends message to response topic with correlation id (use inside HandleRequest).
Respond(string, interface{}, []byte) error
// HandleRequest handles imcoming request.
HandleRequest(responseHandler)
// GetConnectionStatus returns the connection status: Connected or Disconnected
GetConnectionStatus() ConnectionState
// Disconnect will close the connection to broker.
Disconnect()
}
// Version of the client
type Version int
const (
// V3 is MQTT Version 3
V3 Version = iota
// V5 is MQTT Version 5
V5
)
// ConnectionState of the Client
type ConnectionState int
const (
// Disconnected : no connection to broker
Disconnected ConnectionState = iota
// Connected : connection established to broker
Connected
)
// Config contains configurable options for connecting to broker(s).
type Config struct {
Brokers []string // MQTT Broker address. Format: scheme://host:port
ClientID string // Client ID
Username string // Username to connect the broker(s)
Password string // Password to connect the broker(s)
Topics []string // Topics for subscription
QoS int // QoS
Retained bool // Retain Message
AutoReconnect bool // Reconnect if connection is lost
MaxReconnectInterval time.Duration // Maximum time that will be waited between reconnection attempts
PersistentSession bool // Set persistent(clean start for v5) of session
KeepAlive uint16 // Keep Alive time in sec
TLSCA string // CA file path
TLSCert string // Cert file path
TLSKey string // Key file path
Version Version // MQTT Version of client
}
// CreateConnection will automatically create connection to broker(s) with MQTTConfig parameters.
func (m *Config) CreateConnection() (MQTT, error) {
if len(m.Brokers) == 0 {
return nil, errors.New("no broker address to connect")
}
if m.QoS > 2 || m.QoS < 0 {
return nil, errors.New("value of qos must be 0, 1, 2")
}
switch m.Version {
case V3:
client, err := newMQTTv3(m)
if err != nil {
return nil, err
}
return client, nil
case V5:
client, err := newMQTTv5(m)
if err != nil {
return nil, err
}
return client, nil
}
return nil, nil
}
func (m *Config) tlsConfig() (*tls.Config, error) {
tlsConfig := &tls.Config{}
if m.TLSCA != "" {
pool := x509.NewCertPool()
pem, err := ioutil.ReadFile(m.TLSCA)
if err != nil {
return nil, err
}
check := pool.AppendCertsFromPEM(pem)
if !check {
return nil, errors.New("certificate can not added to pool")
}
tlsConfig.RootCAs = pool
}
if m.TLSCert != "" && m.TLSKey != "" {
cert, err := tls.LoadX509KeyPair(m.TLSCert, m.TLSKey)
if err != nil {
return nil, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
tlsConfig.BuildNameToCertificate()
}
return tlsConfig, nil
}