forked from scylladb/gocql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtopology.go
110 lines (94 loc) · 1.94 KB
/
topology.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package gocql
import (
"sync"
"sync/atomic"
"time"
)
type Node interface {
ExecuteQuery(qry *Query) (*Iter, error)
ExecuteBatch(batch *Batch) error
Close()
}
type NodePicker interface {
AddNode(node Node)
RemoveNode(node Node)
Pick(qry *Query) Node
}
type RoundRobinPicker struct {
pool []Node
pos uint32
mu sync.RWMutex
}
func NewRoundRobinPicker() *RoundRobinPicker {
return &RoundRobinPicker{}
}
func (r *RoundRobinPicker) AddNode(node Node) {
r.mu.Lock()
r.pool = append(r.pool, node)
r.mu.Unlock()
}
func (r *RoundRobinPicker) RemoveNode(node Node) {
r.mu.Lock()
n := len(r.pool)
for i := 0; i < n; i++ {
if r.pool[i] == node {
r.pool[i], r.pool[n-1] = r.pool[n-1], r.pool[i]
r.pool = r.pool[:n-1]
break
}
}
r.mu.Unlock()
}
func (r *RoundRobinPicker) Pick(query *Query) Node {
pos := atomic.AddUint32(&r.pos, 1)
var node Node
r.mu.RLock()
if len(r.pool) > 0 {
node = r.pool[pos%uint32(len(r.pool))]
}
r.mu.RUnlock()
return node
}
type Reconnector interface {
Reconnect(session *Session, address string)
}
type ExponentialReconnector struct {
baseDelay time.Duration
maxDelay time.Duration
}
func NewExponentialReconnector(baseDelay, maxDelay time.Duration) *ExponentialReconnector {
return &ExponentialReconnector{baseDelay, maxDelay}
}
func (e *ExponentialReconnector) Reconnect(session *Session, address string) {
delay := e.baseDelay
for {
conn, err := Connect(address, session.cfg)
if err != nil {
<-time.After(delay)
if delay *= 2; delay > e.maxDelay {
delay = e.maxDelay
}
continue
}
node := &Host{conn}
go func() {
conn.Serve()
session.pool.RemoveNode(node)
e.Reconnect(session, address)
}()
session.pool.AddNode(node)
return
}
}
type Host struct {
conn *Conn
}
func (h *Host) ExecuteQuery(qry *Query) (*Iter, error) {
return h.conn.ExecuteQuery(qry)
}
func (h *Host) ExecuteBatch(batch *Batch) error {
return nil
}
func (h *Host) Close() {
h.conn.conn.Close()
}