-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
337 lines (297 loc) · 8.38 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package main
import (
"crypto/md5"
"encoding/hex"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
"github.com/gomodule/redigo/redis"
"github.com/paulbellamy/ratecounter"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"github.com/tile38/msgkit"
)
const dist = 500
var (
pool *redis.Pool // The Tile38 connection pool
h msgkit.Handler // The websocket server handler
idmu sync.Mutex // guard maps
connClientM map[string]string // clientID -> connID map
clientConnM map[string]string // connID -> clientID map
)
var staticGeofenceObject = func() string {
data, err := ioutil.ReadFile("web/fences/" + "convention-center.geojson")
if err != nil {
panic(err)
}
return string(data)
}()
func main() {
var addr string
flag.StringVar(&addr, "tile38", ":9851", "Tile38 Address")
flag.BoolVar(&metrics, "metrics", false, "Show message metrics")
flag.Parse()
// Create a new pool of connections to Tile38
pool = &redis.Pool{
MaxIdle: 16,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", addr)
}, TestOnBorrow: func(conn redis.Conn, _ time.Time) error {
if resp, _ := redis.String(conn.Do("PING")); resp != "PONG" {
return errors.New("expected PONG")
}
return nil
},
}
connClientM = make(map[string]string)
clientConnM = make(map[string]string)
// Initialize a new msgkit server
h.OnOpen = onOpen
h.OnClose = onClose
h.Handle("Feature", feature)
h.Handle("Viewport", viewport)
h.Handle("Message", message)
// Bind websockets to "/ws" and static site to "/"
http.Handle("/ws", &h)
http.Handle("/", http.FileServer(http.Dir("web")))
// Subscribe to geofence channels
go geofenceSubscribe()
// Start listening for websocket connections and messages
srv := &http.Server{Addr: ":8000"}
log.Printf("Listening at %s", srv.Addr)
log.Fatal(srv.ListenAndServe())
}
var metrics bool
var msgMu sync.Mutex
var msgCount int
var msgSize uint64
var msgCounter = ratecounter.NewRateCounter(time.Second)
func send(id, msg string) {
h.Send(id, msg)
if metrics {
msgMu.Lock()
msgCounter.Incr(1)
msgCount++
msgSize += uint64(len(msg))
rate := msgCounter.Rate()
fmt.Printf("\rmsg: %d (%d/sec), bytes: %d MB", msgCount, rate, msgSize/1024/1024)
msgMu.Unlock()
}
}
// geofenceSubscribe listens on geofence channels notifications, piping them out
// to all connected websocket clients who can see the changes
func geofenceSubscribe() {
fn := func() error {
// Ensure that the roaming geofence channel exists
if _, err := tile38Do(
"SETCHAN", "roam-chan",
"NEARBY", "people", "ROAM", "people", "*", dist,
); err != nil {
return err
}
if _, err := tile38Do(
"SETCHAN", "static-chan",
"WITHIN", "people", "DETECT", "enter,inside,exit", "OBJECT", staticGeofenceObject,
); err != nil {
return err
}
// Subscribe to the channel
psc := redis.PubSubConn{Conn: pool.Get()}
defer psc.Close()
if err := psc.Subscribe("roam-chan", "static-chan"); err != nil {
return err
}
// for each pub/sub message
for {
switch v := psc.Receive().(type) {
case redis.Message:
// Received a geofence notification
msg := string(v.Data)
clientID := gjson.Get(msg, "object.id").String()
idmu.Lock()
connID := clientConnM[clientID] // get the connection from the id
idmu.Unlock()
switch v.Channel {
case "static-chan":
var outMsg string
switch gjson.Get(msg, "detect").String() {
case "enter", "inside":
outMsg = `{"type":"Inside","feature":` +
secureFeature(gjson.Get(msg, "object").Raw) + `}`
case "exit":
outMsg = `{"type":"Outside","feature":` +
secureFeature(gjson.Get(msg, "object").Raw) + `}`
default:
continue
}
h.Range(func(id string) bool {
if id == connID {
send(id, outMsg[:len(outMsg)-1]+`,"me":true}`)
} else {
send(id, outMsg)
}
return true
})
case "roam-chan":
nearby := gjson.Get(msg, "nearby")
if nearby.Exists() {
// an object is nearby, notify the target connection
send(connID, `{"type":"Nearby",`+
`"feature":`+secureFeature(nearby.Get("object").Raw)+`}`)
continue
}
faraway := gjson.Get(msg, "faraway")
if faraway.Exists() {
// an object is faraway, notify the target connection
send(connID, `{"type":"Faraway",`+
`"feature":`+secureFeature(faraway.Get("object").Raw)+`}`)
continue
}
}
case error:
return v
}
}
}
for {
err := fn()
log.Printf("psubscribe: %v", err)
time.Sleep(time.Second)
}
}
var connected int32
func onOpen(connID string) {
// println("open", connID, atomic.AddInt32(&connected, 1))
}
// onClose deletes the clients point in the people collection on a disconnect
func onClose(connID string) {
// println("close", connID, atomic.AddInt32(&connected, -1))
idmu.Lock()
clientID, ok := connClientM[connID]
if ok {
delete(connClientM, connID)
delete(clientConnM, clientID)
}
idmu.Unlock()
if ok {
tile38Do("DEL", "people", clientID)
}
}
// feature is a websocket message handler that creates/updates a persons
// position in Tile38
func feature(connID, msg string) {
clientID := gjson.Get(msg, "id").String()
if len(clientID) != 24 {
return
}
// Track all connID <-> clientID
idmu.Lock()
clientConnM[clientID] = connID
connClientM[connID] = clientID
idmu.Unlock()
// fmt.Printf("A: connID: %v, clientID: %v\n", connID, clientID)
// Update the position in the database
tile38Do("SET", "people", clientID, "EX", 10, "OBJECT", msg)
}
// secureFeature re-hashes the clientID to avoid spoofing
func secureFeature(feature string) string {
feature, _ = sjson.Set(feature, "id",
secureClientID(gjson.Get(feature, "id").String()))
return feature
}
// secureClientID re-hashes the clientID to avoid spoofing
func secureClientID(clientID string) string {
b := md5.Sum([]byte(clientID))
return hex.EncodeToString(b[:12])
}
// viewport is a websocket message handler that queries Tile38 for all people
// currently in a clients viewport
func viewport(id, msg string) {
swLat := gjson.Get(msg, "bounds._sw.lat").Float()
swLng := gjson.Get(msg, "bounds._sw.lng").Float()
neLat := gjson.Get(msg, "bounds._ne.lat").Float()
neLng := gjson.Get(msg, "bounds._ne.lng").Float()
var cursor int64
for {
// Query for all people in the viewport bounds
people, _ := redis.Values(tile38Do(
"INTERSECTS", "people",
"CURSOR", cursor,
"BOUNDS", swLat, swLng, neLat, neLng,
))
if len(people) < 2 {
return
}
cursor, _ = redis.Int64(people[0], nil)
idmu.Lock()
clientID := connClientM[id]
idmu.Unlock()
// Send all people in the viewport to the messager
var features []byte
var idx int
features = append(features, `{"type":"Update","features":[`...)
ps, _ := redis.Values(people[1], nil)
for _, p := range ps {
strs, _ := redis.Strings(p, nil)
if len(strs) > 1 && strs[0] != clientID {
feature := secureFeature(strs[1])
if idx > 0 {
features = append(features, ',')
}
features = append(features, feature...)
idx++
}
}
features = append(features, `]}`...)
send(id, string(features))
if cursor == 0 {
break
}
}
}
// message is a websocket message handler that queries Tile38 for other users
// located in the messagers geofence and broadcasts a chat message to them
func message(id, msg string) {
// create a new message
nmsg := `{"type":"Message"}`
nmsg, _ = sjson.SetRaw(nmsg, "feature", secureFeature(gjson.Get(msg, "feature").String()))
nmsg, _ = sjson.Set(nmsg, "text", gjson.Get(msg, "text").String())
// Query all nearby people from Tile38
lat := gjson.Get(msg, "feature.geometry.coordinates.1").Float()
lng := gjson.Get(msg, "feature.geometry.coordinates.0").Float()
var cursor int64
for {
people, _ := redis.Values(
tile38Do(
"NEARBY", "people", "CURSOR", 0, "IDS", "POINT", lat, lng, dist,
),
)
if len(people) < 2 {
return
}
cursor, _ = redis.Int64(people[0], nil)
ps, _ := redis.Values(people[1], nil)
for _, p := range ps {
clientID, _ := redis.String(p, nil)
idmu.Lock()
connID := clientConnM[clientID]
idmu.Unlock()
send(connID, nmsg)
}
if cursor == 0 {
break
}
}
}
// tile38Do executes a redis command on a new connection and returns the response
func tile38Do(cmd string, args ...interface{}) (interface{}, error) {
conn := pool.Get()
defer conn.Close()
return conn.Do(cmd, args...)
}