-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathstomp.go
152 lines (131 loc) · 3.6 KB
/
stomp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package shoveler
import (
"crypto/tls"
"net/url"
"strings"
"time"
stomp "github.com/go-stomp/stomp/v3"
)
func StartStomp(config *Config, queue *ConfirmationQueue) {
// TODO: Get the username, password, server, topic from the config
stompUser := config.StompUser
stompPassword := config.StompPassword
stompUrl := config.StompURL
stompTopic := config.StompTopic
stompCert := config.StompCert
stompCertKey := config.StompCertKey
if !strings.HasPrefix(stompTopic, "/topic/") {
stompTopic = "/topic/" + stompTopic
}
stompSession := GetNewStompConnection(stompUser, stompPassword,
*stompUrl, stompTopic, stompCert, stompCertKey)
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
messagesQueue := make(chan []byte)
go readMsgStomp(messagesQueue, queue)
// Message loop, constantly be dequeing and sending the message
for {
select {
// Add reconnection every hour to make sure connection to brokers is kept balanced
case <-ticker.C:
stompSession.handleReconnect()
case msg := <-messagesQueue:
stompSession.publish(msg)
}
}
}
func GetNewStompConnection(username string, password string,
stompUrl url.URL, topic string, stompCert string, stompCertKey string) *StompSession {
if stompCert != "" && stompCertKey != "" {
cert, err := tls.LoadX509KeyPair(stompCert, stompCertKey)
if err != nil {
log.Errorln("Failed to load certificate:", err)
}
return NewStompConnection(username, password,
stompUrl, topic, cert)
} else {
return NewStompConnection(username, password,
stompUrl, topic)
}
}
type StompSession struct {
username string
password string
stompUrl url.URL
topic string
cert []tls.Certificate
conn *stomp.Conn
}
func NewStompConnection(username string, password string,
stompUrl url.URL, topic string, cert ...tls.Certificate) *StompSession {
session := StompSession{
username: username,
password: password,
stompUrl: stompUrl,
topic: topic,
cert: cert,
}
session.handleReconnect()
return &session
}
func readMsgStomp(messagesQueue chan<- []byte, queue *ConfirmationQueue) {
for {
msg, err := queue.Dequeue()
if err != nil {
log.Errorln("Failed to read from queue:", err)
continue
}
messagesQueue <- msg
}
}
// handleReconnect reconnects to the stomp server
func (session *StompSession) handleReconnect() {
// Close the current session
if session.conn != nil {
err := session.conn.Disconnect()
if err != nil {
log.Errorln("Error handling the disconnection:", err.Error())
}
}
reconnectLoop:
for {
// Start a new session
conn, err := GetStompConnection(session)
if err == nil {
session.conn = conn
break reconnectLoop
} else {
log.Errorln("Failed to reconnect, retrying:", err.Error())
<-time.After(reconnectDelay)
}
}
}
func GetStompConnection(session *StompSession) (*stomp.Conn, error) {
if session.cert != nil {
netConn, err := tls.Dial("tcp", session.stompUrl.String(), &tls.Config{Certificates: session.cert})
if err != nil {
log.Errorln("Failed to connect using TLS:", err.Error())
}
return stomp.Connect(netConn)
}
cfg := stomp.ConnOpt.Login(session.username, session.password)
return stomp.Dial("tcp", session.stompUrl.String(), cfg)
}
// publish will send the message to the stomp message bus
// It will also handle any error in sending by calling handleReconnect
func (session *StompSession) publish(msg []byte) {
sendMessageLoop:
for {
err := session.conn.Send(
session.topic,
"text/plain",
msg,
stomp.SendOpt.Receipt)
if err != nil {
log.Errorln("Failed to publish message:", err)
session.handleReconnect()
} else {
break sendMessageLoop
}
}
}