-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.go
133 lines (116 loc) · 2.63 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"github.com/gorilla/websocket"
"io/ioutil"
"log"
"net/http"
"sync"
)
type mqueue struct {
topics map[string][]chan []byte
mtx sync.RWMutex
}
var (
defaultMqueue = &mqueue{
topics: make(map[string][]chan []byte),
}
)
// Method to add subscriber to specefied topic in mqueue
func (m *mqueue) sub(topic string) (<-chan []byte, error) {
channel := make(chan []byte, 100)
m.mtx.Lock()
m.topics[topic] = append(m.topics[topic], channel)
m.mtx.Unlock()
return channel, nil
}
// Method to remove subscriber from specefied topic in mqueue
func (m *mqueue) unsub(topic string, sub <-chan []byte) error {
m.mtx.RLock()
subscribers, ok := m.topics[topic]
m.mtx.RUnlock()
if !ok {
return nil
}
var subs []chan []byte
for _, subscriber := range subscribers {
if subscriber != sub {
subs = append(subs, subscriber)
}
continue
}
m.mtx.Lock()
m.topics[topic] = subs
m.mtx.Unlock()
return nil
}
// Method to publish/push payload to every subscriber
func (m *mqueue) pub(topic string, payload []byte) error {
m.mtx.RLock()
subscribers, ok := m.topics[topic]
m.mtx.RUnlock()
if !ok {
return nil
}
go func() {
for _, subscriber := range subscribers {
select {
case subscriber <- payload:
default:
}
}
}()
return nil
}
// Subscribe to a specefic topic in mqueue
func sub(w http.ResponseWriter, r *http.Request) {
connection, err := websocket.Upgrade(w, r, w.Header(),
1024, 1024)
if err != nil {
log.Println("Websocket connection failed:", err)
http.Error(w, "Could not open websocket connection",
http.StatusBadRequest)
return
}
topic := r.URL.Query().Get("topic")
channel, err := defaultMqueue.sub(topic)
if err != nil {
log.Println("Could not retrieve %s.", topic)
http.Error(w, "Could not retrieve events",
http.StatusInternalServerError)
return
}
defer defaultMqueue.unsub(topic, channel)
for {
select {
case e := <-channel:
err = connection.WriteMessage(websocket.BinaryMessage, e)
if err != nil {
log.Printf("Error sending event: %v",
err.Error())
return
}
}
}
}
// Publishes and prints to console of every subscriber
func pub(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Pub Error", http.StatusInternalServerError)
return
}
r.Body.Close()
err = defaultMqueue.pub(topic, b)
if err != nil {
http.Error(w, "Pub Error", http.StatusInternalServerError)
return
}
}
// Entry Point
func main() {
http.HandleFunc("/pub", pub)
http.HandleFunc("/sub", sub)
log.Println("Mqueue listening on :8081")
http.ListenAndServe(":8081", nil)
}