-
Notifications
You must be signed in to change notification settings - Fork 21
/
pool.go
184 lines (156 loc) · 4.16 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package gremtune
import (
"fmt"
"sync"
"time"
)
// Pool maintains a list of connections.
type Pool struct {
Dial func() (*Client, error)
MaxActive int
IdleTimeout time.Duration
mu sync.Mutex
idle []*idleConnection
active int
cond *sync.Cond
closed bool
}
// PooledConnection represents a shared and reusable connection.
type PooledConnection struct {
Pool *Pool
Client *Client
}
type idleConnection struct {
pc *PooledConnection
// t is the time the connection was idled
t time.Time
}
// Get will return an available pooled connection. Either an idle connection or
// by dialing a new one if the pool does not currently have a maximum number
// of active connections.
func (p *Pool) Get() (*PooledConnection, error) {
// Lock the pool to keep the kids out.
p.mu.Lock()
// Clean this place up.
p.purge()
// Wait loop
for {
// Try to grab first available idle connection
if conn := p.first(); conn != nil {
// Remove the connection from the idle slice
p.idle = append(p.idle[:0], p.idle[1:]...)
p.active++
p.mu.Unlock()
pc := &PooledConnection{Pool: p, Client: conn.pc.Client}
return pc, nil
}
// No idle connections, try dialing a new one
if p.MaxActive == 0 || p.active < p.MaxActive {
p.active++
dial := p.Dial
// Unlock here so that any other connections that need to be
// dialed do not have to wait.
p.mu.Unlock()
dc, err := dial()
if err != nil {
p.mu.Lock()
p.release()
p.mu.Unlock()
return nil, err
}
pc := &PooledConnection{Pool: p, Client: dc}
return pc, nil
}
//No idle connections and max active connections, let's wait.
if p.cond == nil {
p.cond = sync.NewCond(&p.mu)
}
p.cond.Wait()
}
}
// put pushes the supplied PooledConnection to the top of the idle slice to be reused.
// It is not threadsafe. The caller should manage locking the pool.
func (p *Pool) put(pc *PooledConnection) {
if p.closed {
pc.Client.Close()
return
}
idle := &idleConnection{pc: pc, t: time.Now()}
// Prepend the connection to the front of the slice
p.idle = append([]*idleConnection{idle}, p.idle...)
}
// purge removes expired idle connections from the pool.
// It is not threadsafe. The caller should manage locking the pool.
func (p *Pool) purge() {
if timeout := p.IdleTimeout; timeout > 0 {
var valid []*idleConnection
now := time.Now()
for _, v := range p.idle {
// If the client has an error then exclude it from the pool
if v.pc.Client.Errored {
continue
}
if v.t.Add(timeout).After(now) {
valid = append(valid, v)
} else {
// Force underlying connection closed
v.pc.Client.Close()
}
}
p.idle = valid
}
}
// release decrements active and alerts waiters.
// It is not threadsafe. The caller should manage locking the pool.
func (p *Pool) release() {
if p.closed {
return
}
p.active--
if p.cond != nil {
p.cond.Signal()
}
}
func (p *Pool) first() *idleConnection {
if len(p.idle) == 0 {
return nil
}
return p.idle[0]
}
// Close closes the pool.
func (p *Pool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.idle {
c.pc.Client.Close()
}
p.closed = true
}
// ExecuteWithBindings formats a raw Gremlin query, sends it to Gremlin Server, and returns the result.
func (p *Pool) ExecuteWithBindings(query string, bindings, rebindings map[string]string) (resp []Response, err error) {
pc, err := p.Get()
if err != nil {
fmt.Printf("Error aquiring connection from pool: %s", err)
return nil, err
}
defer pc.Close()
return pc.Client.ExecuteWithBindings(query, bindings, rebindings)
}
// Execute grabs a connection from the pool, formats a raw Gremlin query, sends it to Gremlin Server, and returns the result.
func (p *Pool) Execute(query string) (resp []Response, err error) {
pc, err := p.Get()
if err != nil {
fmt.Printf("Error aquiring connection from pool: %s", err)
return nil, err
}
defer pc.Close()
return pc.Client.Execute(query)
}
// Close signals that the caller is finished with the connection and should be
// returned to the pool for future use.
func (pc *PooledConnection) Close() {
pc.Pool.mu.Lock()
defer pc.Pool.mu.Unlock()
pc.Pool.put(pc)
pc.Pool.release()
}