-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.go
108 lines (93 loc) · 2.29 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package rpc
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"runtime/debug"
"sync/atomic"
"time"
)
func connect(addr string, dynamicLink bool, ops ...grpc.DialOption) (*grpc.ClientConn, error) {
if dynamicLink == true {
return grpc.Dial(addr, ops...)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return grpc.DialContext(ctx, addr, ops...)
}
func (pool *Pool) init() error {
len := cap(pool.connections) - len(pool.connections)
addr := pool.ServerAddr
dynamicLink := pool.DynamicLink
ops := pool.dialOptions
for i := 1; i <= len; i++ {
client, err := connect(addr, dynamicLink, ops...)
if err != nil {
return err
}
pool.connections <- client
}
return nil
}
func (pool *Pool) count(add int64) {
atomic.AddInt64(&pool.counter, add)
}
// close Recycling available links
func (pool *Pool) close(conn *grpc.ClientConn) {
// double check
if conn == nil {
return
}
go func() {
defer func() {
if err := recover(); nil != err {
debug.PrintStack()
}
}()
detect, _ := passivate(conn)
if detect && pool.ChannelStat {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
select {
case pool.connections <- conn:
case <-ctx.Done():
destroy(conn)
}
}
pool.count(-1)
}()
}
// destroy tears down the ClientConn and all underlying connections.
func destroy(conn *grpc.ClientConn) error {
return conn.Close()
}
type condition = int
const (
// Ready Can be used
Ready condition = iota
// Put Not available. Maybe later.
Put
// Destroy Failure occurs and cannot be restored
Destroy
)
// passivate Action before releasing the resource
func passivate(conn *grpc.ClientConn) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if conn.WaitForStateChange(ctx, connectivity.Ready) && conn.WaitForStateChange(ctx, connectivity.Shutdown) && conn.WaitForStateChange(ctx, connectivity.Idle) {
return true, nil
}
return false, destroy(conn)
}
// activate Action taken after getting the resource
func activate(conn *grpc.ClientConn) int {
stat := conn.GetState()
switch {
case stat == connectivity.Ready:
return Ready
case stat == connectivity.Shutdown:
return Destroy
default:
return Put
}
}