diff --git a/examples/echo/tcp-echo/client/app/config.go b/examples/echo/tcp-echo/client/app/config.go index c49d77f..259d359 100644 --- a/examples/echo/tcp-echo/client/app/config.go +++ b/examples/echo/tcp-echo/client/app/config.go @@ -42,22 +42,23 @@ var conf *Config type ( GettySessionParam struct { - CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` - TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` - TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"` - KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` - keepAlivePeriod time.Duration - TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` - TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` - PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"` - TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` - tcpReadTimeout time.Duration - TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"` - tcpWriteTimeout time.Duration - WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"` - waitTimeout time.Duration - MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"` - SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"` + CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` + TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` + TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"` + KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` + keepAlivePeriod time.Duration + TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` + TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` + PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"` + TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` + tcpReadTimeout time.Duration + TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"` + tcpWriteTimeout time.Duration + WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"` + waitTimeout time.Duration + MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"` + SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"` + TcpMaxReconnectAttempts int `default:"10" yaml:"tcp_max_reconnect_attempts" json:"tcp_max_reconnect_attempts,omitempty"` } // Config holds supported types by the multiconfig package diff --git a/examples/echo/tcp-echo/client/app/main.go b/examples/echo/tcp-echo/client/app/main.go index 916a9d8..2df3b39 100644 --- a/examples/echo/tcp-echo/client/app/main.go +++ b/examples/echo/tcp-echo/client/app/main.go @@ -124,7 +124,7 @@ func newSession(session getty.Session) error { func initClient() { clientOpts := []getty.ClientOption{getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort))} clientOpts = append(clientOpts, getty.WithClientTaskPool(taskPool)) - + clientOpts = append(clientOpts, getty.WithReconnectAttempts(conf.GettySessionParam.TcpMaxReconnectAttempts)) if conf.ConnectionNum != 0 { clientOpts = append(clientOpts, getty.WithConnectionNumber(conf.ConnectionNum)) } diff --git a/transport/client.go b/transport/client.go index 4b17f94..fc63a44 100644 --- a/transport/client.go +++ b/transport/client.go @@ -22,6 +22,7 @@ import ( "crypto/x509" "encoding/pem" "fmt" + "math" "net" "os" "strings" @@ -41,10 +42,11 @@ import ( ) const ( - reconnectInterval = 3e8 // 300ms - connectInterval = 5e8 // 500ms - connectTimeout = 3e9 - maxTimes = 10 + defaultReconnectInterval = 3e8 // 300ms + connectInterval = 5e8 // 500ms + connectTimeout = 3e9 + defaultMaxReconnectAttempts = 50 + maxBackOffTimes = 10 ) var ( @@ -207,14 +209,18 @@ func (c *client) dialUDP() Session { } // check connection alive by write/read action - conn.SetWriteDeadline(time.Now().Add(1e9)) + if err := conn.SetWriteDeadline(time.Now().Add(1e9)); err != nil { + log.Warnf("failed to set write deadline: %+v", err) + } if length, err = conn.Write(connectPingPackage[:]); err != nil { conn.Close() log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err)) <-gxtime.After(connectInterval) continue } - conn.SetReadDeadline(time.Now().Add(1e9)) + if err := conn.SetReadDeadline(time.Now().Add(1e9)); err != nil { + log.Warnf("failed to set read deadline: %+v", err) + } length, err = conn.Read(buf) if netErr, ok := perrors.Cause(err).(net.Error); ok && netErr.Timeout() { err = nil @@ -423,33 +429,33 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { // a for-loop connect to make sure the connection pool is valid func (c *client) reConnect() { var ( - num, max, times, interval int - maxDuration int64 + sessionNum, reconnectAttempts int + maxReconnectInterval int64 ) - max = c.number - interval = c.reconnectInterval - if interval == 0 { - interval = reconnectInterval + reconnectInterval := c.reconnectInterval + if reconnectInterval == 0 { + reconnectInterval = defaultReconnectInterval + } + maxReconnectAttempts := c.maxReconnectAttempts + if maxReconnectAttempts == 0 { + maxReconnectAttempts = defaultMaxReconnectAttempts } + connPoolSize := c.number for { if c.IsClosed() { log.Warnf("client{peer:%s} goroutine exit now.", c.addr) break } - num = c.sessionNum() - if max <= num || max < times { - //Exit when the number of connection pools is sufficient or the reconnection times exceeds the connections numbers. + sessionNum = c.sessionNum() + if connPoolSize <= sessionNum || maxReconnectAttempts < reconnectAttempts { + //exit reconnect when the number of connection pools is sufficient or the current reconnection attempts exceeds the max reconnection attempts. break } c.connect() - times++ - if times > maxTimes { - maxDuration = int64(maxTimes) * int64(interval) - } else { - maxDuration = int64(times) * int64(interval) - } - <-gxtime.After(time.Duration(maxDuration)) + reconnectAttempts++ + maxReconnectInterval = int64(math.Min(float64(reconnectAttempts), float64(maxBackOffTimes))) * int64(reconnectInterval) + <-gxtime.After(time.Duration(maxReconnectInterval)) } } diff --git a/transport/client_test.go b/transport/client_test.go index 9f5aa87..d9d0e8c 100644 --- a/transport/client_test.go +++ b/transport/client_test.go @@ -115,6 +115,7 @@ func TestTCPClient(t *testing.T) { WithServerAddress(addr.String()), WithReconnectInterval(5e8), WithConnectionNumber(1), + WithReconnectAttempts(10), ) assert.NotNil(t, clt) assert.True(t, clt.ID() > 0) diff --git a/transport/options.go b/transport/options.go index d376e0c..f42b693 100644 --- a/transport/options.go +++ b/transport/options.go @@ -100,10 +100,10 @@ func WithServerTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ServerOption type ClientOption func(*ClientOptions) type ClientOptions struct { - addr string - number int - reconnectInterval int // reConnect Interval - + addr string + number int + reconnectInterval int // reConnect Interval + maxReconnectAttempts int // max reconnect attempts // tls sslEnabled bool tlsConfigBuilder TlsConfigBuilder @@ -168,3 +168,12 @@ func WithClientTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ClientOption o.tlsConfigBuilder = tlsConfigBuilder } } + +// WithReconnectAttempts @maxReconnectAttempts is max reconnect attempts. +func WithReconnectAttempts(maxReconnectAttempts int) ClientOption { + return func(o *ClientOptions) { + if 0 < maxReconnectAttempts { + o.maxReconnectAttempts = maxReconnectAttempts + } + } +} diff --git a/transport/session.go b/transport/session.go index b760b7c..deee1a0 100644 --- a/transport/session.go +++ b/transport/session.go @@ -860,8 +860,12 @@ func (s *session) stop() { // let read/Write timeout asap now := time.Now() if conn := s.Conn(); conn != nil { - conn.SetReadDeadline(now.Add(s.ReadTimeout())) - conn.SetWriteDeadline(now.Add(s.WriteTimeout())) + if err := conn.SetReadDeadline(now.Add(s.ReadTimeout())); err != nil { + log.Warnf("failed to set read deadline: %+v", err) + } + if err := conn.SetWriteDeadline(now.Add(s.WriteTimeout())); err != nil { + log.Warnf("failed to set write deadline: %+v", err) + } } close(s.done) clt, cltFound := s.GetAttribute(sessionClientKey).(*client)