-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.go
166 lines (152 loc) · 4.35 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package main
import (
"flag"
"fmt"
"log"
"os"
"sync/atomic"
"time"
_ "expvar"
"github.com/recoilme/graceful"
"github.com/tidwall/evio"
)
var (
version = "0.4.1"
port = flag.Int("p", 11211, "TCP port number to listen on (default: 11211)")
slaveadr = flag.String("slave", "", "Slave address, optional, example slave=127.0.0.1:11212")
unixs = flag.String("unixs", "", "unix socket")
stdlib = flag.Bool("stdlib", false, "use stdlib")
noudp = flag.Bool("noudp", false, "disable udp interface")
loops = flag.Int("loops", -1, "num loops")
balance = flag.String("balance", "random", "balance - random, round-robin or least-connections")
keepalive = flag.Int("keepalive", 10, "keepalive connection, in seconds")
params = flag.String("params", "", "params for b52 engines, url query format, all size in Mb, default: dbdir=db")
restore = flag.String("restore", "", "backup file name for load")
startTime = int64(0)
)
type conn struct {
is evio.InputStream
addr string
}
func main() {
startTime = time.Now().Unix()
flag.Parse()
var b52 McEngine
var totalConnections uint32 // Total number of connections opened since the server started running
var currConnections int32 // Number of open connections
atomic.StoreUint32(&totalConnections, 0)
atomic.StoreInt32(&currConnections, 0)
b52, err := Newb52(*params, *slaveadr)
if err != nil {
log.Fatalf("failed to create Newb52 database: %s", err.Error())
}
// Wait for interrupt signal to gracefully shutdown the server with
// setup signal catching
// Wait for interrupt signal to gracefully shutdown the server with
quit := make(chan os.Signal, 1)
fallback := func() error {
fmt.Println("terminated server")
err = b52.Close()
if err != nil {
fmt.Println("Close", err.Error())
}
return nil
}
graceful.Unignore(quit, fallback, graceful.Terminate...)
if *restore != "" {
err = b52.Restore(*restore)
if err != nil {
log.Fatalf("failed restore database: %s", err.Error())
}
}
var events evio.Events
switch *balance {
default:
log.Fatalf("invalid -balance flag: '%v'", balance)
case "random":
events.LoadBalance = evio.Random
case "round-robin":
events.LoadBalance = evio.RoundRobin
case "least-connections":
events.LoadBalance = evio.LeastConnections
}
events.NumLoops = *loops
events.Serving = func(srv evio.Server) (action evio.Action) {
fmt.Printf("b52 server started on port %d (loops: %d)\n", *port, srv.NumLoops)
return
}
events.Opened = func(ec evio.Conn) (out []byte, opts evio.Options, action evio.Action) {
atomic.AddUint32(&totalConnections, 1)
atomic.AddInt32(&currConnections, 1)
//fmt.Printf("opened: %v\n", ec.RemoteAddr())
if (*keepalive) > 0 {
opts.TCPKeepAlive = time.Second * (time.Duration(*keepalive))
//fmt.Println("TCPKeepAlive:", opts.TCPKeepAlive)
}
//opts.ReuseInputBuffer = true // don't do it!
ec.SetContext(&conn{})
return
}
events.Closed = func(ec evio.Conn, err error) (action evio.Action) {
//fmt.Printf("closed: %v\n", ec.RemoteAddr())
atomic.AddInt32(&currConnections, -1)
return
}
events.Data = func(ec evio.Conn, in []byte) (out []byte, action evio.Action) {
if in == nil {
fmt.Printf("wake from %s\n", ec.RemoteAddr())
return nil, evio.Close
}
//println(string(in))
var data []byte
var c *conn
if ec.Context() == nil {
data = in
} else {
c = ec.Context().(*conn)
data = c.is.Begin(in)
}
responses := make([]byte, 0)
for {
leftover, response, err := mcproto(data, b52)
if err != nil {
if err != ErrClose {
// bad thing happened
fmt.Println(err.Error())
}
action = evio.Close
break
} else if len(leftover) == len(data) {
// request not ready, yet
break
}
// handle the request
//println("handle the request", string(response))
responses = append(responses, response...)
//out = response
data = leftover
}
//println("handle the responses", string(responses))
out = responses
if c != nil {
c.is.End(data)
}
return
}
var ssuf string
if *stdlib {
ssuf = "-net"
}
addrs := []string{fmt.Sprintf("tcp"+ssuf+"://:%d", *port)} //?reuseport=true
if *unixs != "" {
addrs = append(addrs, fmt.Sprintf("unix"+ssuf+"://%s", *unixs))
}
if !*noudp {
addrs = append(addrs, fmt.Sprintf("udp"+ssuf+"://:%d", *port))
}
err = evio.Serve(events, addrs...)
if err != nil {
fmt.Println(err.Error())
log.Fatal(err)
}
}