Skip to content

Commit

Permalink
Merge pull request #84 from AlexStocks/refactor/tcp_reconnect_var_rename
Browse files Browse the repository at this point in the history
Improvement: rename the variables in TCP reconnect function and some default constants
  • Loading branch information
AlexStocks authored Jul 16, 2024
2 parents da9fedf + bace946 commit 177fc63
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 45 deletions.
33 changes: 17 additions & 16 deletions examples/echo/tcp-echo/client/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,23 @@ var conf *Config

type (
GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
tcpWriteTimeout time.Duration
WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
waitTimeout time.Duration
MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"`
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
tcpWriteTimeout time.Duration
WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
waitTimeout time.Duration
MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"`
TcpMaxReconnectAttempts int `default:"10" yaml:"tcp_max_reconnect_attempts" json:"tcp_max_reconnect_attempts,omitempty"`
}

// Config holds supported types by the multiconfig package
Expand Down
2 changes: 1 addition & 1 deletion examples/echo/tcp-echo/client/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func newSession(session getty.Session) error {
func initClient() {
clientOpts := []getty.ClientOption{getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort))}
clientOpts = append(clientOpts, getty.WithClientTaskPool(taskPool))

clientOpts = append(clientOpts, getty.WithReconnectAttempts(conf.GettySessionParam.TcpMaxReconnectAttempts))
if conf.ConnectionNum != 0 {
clientOpts = append(clientOpts, getty.WithConnectionNumber(conf.ConnectionNum))
}
Expand Down
50 changes: 28 additions & 22 deletions transport/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"math"
"net"
"os"
"strings"
Expand All @@ -41,10 +42,11 @@ import (
)

const (
reconnectInterval = 3e8 // 300ms
connectInterval = 5e8 // 500ms
connectTimeout = 3e9
maxTimes = 10
defaultReconnectInterval = 3e8 // 300ms
connectInterval = 5e8 // 500ms
connectTimeout = 3e9
defaultMaxReconnectAttempts = 50
maxBackOffTimes = 10
)

var (
Expand Down Expand Up @@ -207,14 +209,18 @@ func (c *client) dialUDP() Session {
}

// check connection alive by write/read action
conn.SetWriteDeadline(time.Now().Add(1e9))
if err := conn.SetWriteDeadline(time.Now().Add(1e9)); err != nil {
log.Warnf("failed to set write deadline: %+v", err)
}
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
<-gxtime.After(connectInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
if err := conn.SetReadDeadline(time.Now().Add(1e9)); err != nil {
log.Warnf("failed to set read deadline: %+v", err)
}
length, err = conn.Read(buf)
if netErr, ok := perrors.Cause(err).(net.Error); ok && netErr.Timeout() {
err = nil
Expand Down Expand Up @@ -423,33 +429,33 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
// a for-loop connect to make sure the connection pool is valid
func (c *client) reConnect() {
var (
num, max, times, interval int
maxDuration int64
sessionNum, reconnectAttempts int
maxReconnectInterval int64
)
max = c.number
interval = c.reconnectInterval
if interval == 0 {
interval = reconnectInterval
reconnectInterval := c.reconnectInterval
if reconnectInterval == 0 {
reconnectInterval = defaultReconnectInterval
}
maxReconnectAttempts := c.maxReconnectAttempts
if maxReconnectAttempts == 0 {
maxReconnectAttempts = defaultMaxReconnectAttempts
}
connPoolSize := c.number
for {
if c.IsClosed() {
log.Warnf("client{peer:%s} goroutine exit now.", c.addr)
break
}

num = c.sessionNum()
if max <= num || max < times {
//Exit when the number of connection pools is sufficient or the reconnection times exceeds the connections numbers.
sessionNum = c.sessionNum()
if connPoolSize <= sessionNum || maxReconnectAttempts < reconnectAttempts {
//exit reconnect when the number of connection pools is sufficient or the current reconnection attempts exceeds the max reconnection attempts.
break
}
c.connect()
times++
if times > maxTimes {
maxDuration = int64(maxTimes) * int64(interval)
} else {
maxDuration = int64(times) * int64(interval)
}
<-gxtime.After(time.Duration(maxDuration))
reconnectAttempts++
maxReconnectInterval = int64(math.Min(float64(reconnectAttempts), float64(maxBackOffTimes))) * int64(reconnectInterval)
<-gxtime.After(time.Duration(maxReconnectInterval))
}
}

Expand Down
1 change: 1 addition & 0 deletions transport/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestTCPClient(t *testing.T) {
WithServerAddress(addr.String()),
WithReconnectInterval(5e8),
WithConnectionNumber(1),
WithReconnectAttempts(10),
)
assert.NotNil(t, clt)
assert.True(t, clt.ID() > 0)
Expand Down
17 changes: 13 additions & 4 deletions transport/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func WithServerTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ServerOption
type ClientOption func(*ClientOptions)

type ClientOptions struct {
addr string
number int
reconnectInterval int // reConnect Interval

addr string
number int
reconnectInterval int // reConnect Interval
maxReconnectAttempts int // max reconnect attempts
// tls
sslEnabled bool
tlsConfigBuilder TlsConfigBuilder
Expand Down Expand Up @@ -168,3 +168,12 @@ func WithClientTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ClientOption
o.tlsConfigBuilder = tlsConfigBuilder
}
}

// WithReconnectAttempts @maxReconnectAttempts is max reconnect attempts.
func WithReconnectAttempts(maxReconnectAttempts int) ClientOption {
return func(o *ClientOptions) {
if 0 < maxReconnectAttempts {
o.maxReconnectAttempts = maxReconnectAttempts
}
}
}
8 changes: 6 additions & 2 deletions transport/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,8 +860,12 @@ func (s *session) stop() {
// let read/Write timeout asap
now := time.Now()
if conn := s.Conn(); conn != nil {
conn.SetReadDeadline(now.Add(s.ReadTimeout()))
conn.SetWriteDeadline(now.Add(s.WriteTimeout()))
if err := conn.SetReadDeadline(now.Add(s.ReadTimeout())); err != nil {
log.Warnf("failed to set read deadline: %+v", err)
}
if err := conn.SetWriteDeadline(now.Add(s.WriteTimeout())); err != nil {
log.Warnf("failed to set write deadline: %+v", err)
}
}
close(s.done)
clt, cltFound := s.GetAttribute(sessionClientKey).(*client)
Expand Down

0 comments on commit 177fc63

Please sign in to comment.