diff --git a/go.mod b/go.mod index d33156c..f3ca18a 100644 --- a/go.mod +++ b/go.mod @@ -11,26 +11,28 @@ require ( github.com/pierrec/lz4/v4 v4.1.18 github.com/pkg/errors v0.9.1 github.com/scylladb/go-set v1.0.2 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.24.0 golang.org/x/crypto v0.17.0 ) +require github.com/frankban/quicktest v1.14.6 // indirect + +require ( + go.uber.org/goleak v1.3.0 // indirect + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect +) + require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/fatih/color v1.16.0 // indirect - github.com/frankban/quicktest v1.14.6 // indirect - github.com/google/uuid v1.5.0 // indirect - github.com/jessevdk/go-flags v1.5.0 // indirect + github.com/google/uuid v1.6.0 github.com/kylelemons/godebug v1.1.0 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect + github.com/panjf2000/ants/v2 v2.9.1 github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/uber/go-torch v0.0.0-20181107071353-86f327cc820e // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/sys v0.15.0 // indirect - gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + golang.org/x/sys v0.15.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 9439677..15a0eff 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/fatih/set v0.2.1 h1:nn2CaJyknWE/6txyUDGwysr3G5QC6xWB/PtVjPBbeaA= github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -15,10 +13,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= -github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc= -github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -27,15 +23,12 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= +github.com/panjf2000/ants/v2 v2.9.1 h1:Q5vh5xohbsZXGcD6hhszzGqB7jSSc2/CRr3QKIga8Kw= +github.com/panjf2000/ants/v2 v2.9.1/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= @@ -51,28 +44,30 @@ github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/uber/go-torch v0.0.0-20181107071353-86f327cc820e h1:jV0Y58RWaOMT3i5foW2YoEKlaN6biewBtngFwAfEwQ0= -github.com/uber/go-torch v0.0.0-20181107071353-86f327cc820e/go.mod h1:uuMPbyv6WJykZcarrIuJiTjfSGC997/jnfHyyeeG2Jo= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/log/logger.go b/log/logger.go index 4dbf6ba..20d3a38 100644 --- a/log/logger.go +++ b/log/logger.go @@ -20,15 +20,16 @@ package log import ( "context" "fmt" - "github.com/google/uuid" - "github.com/pkg/errors" - "golang.org/x/sys/unix" "io" "os" "strings" "sync" "time" + "github.com/google/uuid" + "github.com/pkg/errors" + "golang.org/x/sys/unix" + "go.uber.org/zap" "go.uber.org/zap/zapcore" "gopkg.in/natefinch/lumberjack.v2" @@ -42,8 +43,6 @@ const BufferSize = 4096 var ( globalMutex sync.Mutex defaultGlobalLogger *Logger - TraceId string - LogType = "Default" SlowQueryThreshold int64 ) @@ -125,53 +124,46 @@ var ( Any = zap.Any ) -func resetLogInfo(logType string, traceId any) { +func AddInfo(logType string, traceId any, msg string) string { if len(logType) == 0 { logType = "Default" - } else { - LogType = logType } - if traceId != nil { - TraceId = traceId.(string) + var traceStr string + if traceId == nil { + traceStr = "" } else { - TraceId = "" + traceStr = traceId.(string) } + return "[" + logType + "] " + "[" + traceStr + "] " + msg } // Default func Info(logType string, traceId any, msg string, fields ...Field) { - resetLogInfo(logType, traceId) - defaultGlobalLogger.Info(msg, fields...) + defaultGlobalLogger.Info(AddInfo(logType, traceId, msg), fields...) } func Error(logType string, traceId any, msg string, fields ...Field) { - resetLogInfo(logType, traceId) - defaultGlobalLogger.Error(msg, fields...) + defaultGlobalLogger.Error(AddInfo(logType, traceId, msg), fields...) } func Warn(logType string, traceId any, msg string, fields ...Field) { - resetLogInfo(logType, traceId) - defaultGlobalLogger.Warn(msg, fields...) + defaultGlobalLogger.Warn(AddInfo(logType, traceId, msg), fields...) } func DPanic(logType string, traceId any, msg string, fields ...Field) { - resetLogInfo(logType, traceId) - defaultGlobalLogger.DPanic(msg, fields...) + defaultGlobalLogger.DPanic(AddInfo(logType, traceId, msg), fields...) } func Panic(logType string, traceId any, msg string, fields ...Field) { - resetLogInfo(logType, traceId) - defaultGlobalLogger.Panic(msg, fields...) + defaultGlobalLogger.Panic(AddInfo(logType, traceId, msg), fields...) } func Fatal(logType string, traceId any, msg string, fields ...Field) { - resetLogInfo(logType, traceId) - defaultGlobalLogger.Fatal(msg, fields...) + defaultGlobalLogger.Fatal(AddInfo(logType, traceId, msg), fields...) } func Debug(logType string, traceId any, msg string, fields ...Field) { - resetLogInfo(logType, traceId) - defaultGlobalLogger.Debug(msg, fields...) + defaultGlobalLogger.Debug(AddInfo(logType, traceId, msg), fields...) } var ( @@ -332,8 +324,6 @@ func NewCustomEncoder() zapcore.EncoderConfig { // level customLevelEncoder := func(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) { enc.AppendString("[" + level.CapitalString() + "]") - enc.AppendString("[" + LogType + "]") - enc.AppendString("[" + TraceId + "]") } encoderConf := zapcore.EncoderConfig{ TimeKey: "ts", diff --git a/obkvrpc/rpc_server.go b/obkvrpc/rpc_server.go new file mode 100644 index 0000000..d4d7990 --- /dev/null +++ b/obkvrpc/rpc_server.go @@ -0,0 +1,194 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package obkvrpc + +import ( + "io" + "runtime/debug" + "sync" + + "github.com/oceanbase/obkv-table-client-go/log" + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" +) + +// CodecServer implement interfaces to read/decode request +// and write/encode response +type CodecServer interface { + ReadRequest(*Request) error + WriteResponse(*Response) error + Call(*Request, *Response) error + Close() + GetCloseChan() *chan struct{} +} + +// Server implement server frame +type Server struct { + methodRunPool *ants.Pool + reqObjPool *sync.Pool + respObjPool *sync.Pool + CloseChan *chan struct{} // to close and clean Server +} + +// Request is generated by a decoder +type Request struct { + Method string // use for mapping + Args [][]byte + ID string +} + +type Response struct { + ID string + RspContent []byte +} + +// NewServer init a server +func NewServer(routinePoolSize int, ch *chan struct{}) (*Server, error) { + var err error + s := &Server{CloseChan: ch} + s.methodRunPool, err = ants.NewPool(routinePoolSize, + ants.WithPanicHandler(func(p interface{}) { + if err := recover(); err != nil { + log.Error("RPCServer", nil, "catch panic", log.Any("error", err), log.String("stack", string(debug.Stack()))) + } + }), + ) + if err != nil { + log.Warn("RPCServer", nil, "create goroutine pool failed", zap.Error(err)) + return s, err + } + s.reqObjPool = &sync.Pool{ + New: func() interface{} { + return new(Request) + }, + } + s.respObjPool = &sync.Pool{ + New: func() interface{} { + return new(Response) + }, + } + return s, nil +} + +// PutRequest clear req and put back to req pool +func (s *Server) PutRequest(req *Request) { + req.Method = "" + req.ID = "" + for i := range req.Args { + req.Args[i] = req.Args[i][:0] + } + req.Args = req.Args[:0] + s.reqObjPool.Put(req) +} + +// PutRequest clear req and put back to req pool +func (s *Server) PutResponse(resp *Response) { + resp.RspContent = resp.RspContent[:0] + resp.ID = "" + s.respObjPool.Put(resp) +} + +// Close release resources +func (s *Server) Close() { + s.methodRunPool.Release() +} + +// ReadRequest read requests until an error occurs +func (s *Server) ReadRequest(reqChan chan<- *Request, cServer CodecServer) { + closeChan := cServer.GetCloseChan() + for { + isStop := false + select { + case <-*closeChan: + cServer.Close() + close(reqChan) + return + default: + // ReadRequest may include read and encode, depend on the cServer + req := s.reqObjPool.Get().(*Request) + err := cServer.ReadRequest(req) + if err != nil { + if err == io.EOF { + log.Info("RPCServer", req.ID, "connection closed", zap.Error(err)) + } else { + log.Warn("RPCServer", req.ID, "fail to read command", zap.Error(err)) + } + s.PutRequest(req) + close(reqChan) + isStop = true + } else { + reqChan <- req + } + } + if isStop { + break + } + } +} + +// RunWorkerWrapper wrap RunWorker for goroutine pool +type RunWorkerWrapper func() + +// RunWorker keep processing requests from reqChan until channel reqChan is closed +func (s *Server) RunWorker(wg *sync.WaitGroup, reqChan <-chan *Request, cServer CodecServer) RunWorkerWrapper { + return func() { + defer func() { + wg.Done() + if err := recover(); err != nil { + log.Error("RPCServer", nil, "RunWorker panic", log.Any("error", err), log.String("stack", string(debug.Stack()))) + } + }() + for req := range reqChan { + resp := s.respObjPool.Get().(*Response) + err := cServer.Call(req, resp) + if err != nil { + log.Warn("RPCServer", req.ID, "fail to call method", zap.Error(err)) + } + s.PutRequest(req) + err = cServer.WriteResponse(resp) + s.PutResponse(resp) + if err != nil { + log.Warn("RPCServer", nil, "fail to write response", zap.Error(err)) + break + } + } + } +} + +// ServeCodec serve a codec +func (s *Server) ServeCodec(cServer CodecServer, maxQueSize int) { + wg := new(sync.WaitGroup) + defer func() { + wg.Wait() + cServer.Close() + if err := recover(); err != nil { + log.Error("RPCServer", nil, "ServeCodec panic", log.Any("error", err), log.String("stack", string(debug.Stack()))) + } + }() + reqChan := make(chan *Request, maxQueSize) + var err error + + // exec(request) -> Response + wg.Add(1) + if err = s.methodRunPool.Submit(s.RunWorker(wg, reqChan, cServer)); err != nil { + wg.Done() + return + } + + s.ReadRequest(reqChan, cServer) +} diff --git a/test/obkvrpc/rpc_server_test.go b/test/obkvrpc/rpc_server_test.go new file mode 100644 index 0000000..7997ed8 --- /dev/null +++ b/test/obkvrpc/rpc_server_test.go @@ -0,0 +1,151 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package obkvrpc + +import ( + "fmt" + "io" + "net" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/oceanbase/obkv-table-client-go/obkvrpc" + "github.com/stretchr/testify/assert" +) + +var ( + MaxMsgNum = 3 + MaxClientNum = 10 + RoutinePoolSize = 10000 + ExpiredDuration = time.Second * 60 +) + +type MockCodec struct { + Conn net.Conn + RunTime int + Wg *sync.WaitGroup + IDMap sync.Map + Test *testing.T + CloseChan chan struct{} +} + +func (mcs *MockCodec) GetCloseChan() *chan struct{} { + return &mcs.CloseChan +} + +func (mcs *MockCodec) ReadRequest(req *obkvrpc.Request) error { + if mcs.RunTime++; mcs.RunTime > MaxMsgNum { + // fmt.Println("ReadRequest return err", mcs.RunTime) + return io.EOF + } + req.Method = "testMethod" + req.ID = uuid.New().String() + + arg1 := []byte{'a', 'r', 'g'} + arg2 := []byte{'1', '2', '3'} + req.Args = append(req.Args, arg1) + req.Args = append(req.Args, arg2) + mcs.IDMap.Store(req.ID, true) + // fmt.Println("ReadRequest: ", req.ID, obkvrpc.GetGID()) + return nil +} + +func (mcs *MockCodec) WriteResponse(resp *obkvrpc.Response) error { + _, ok := mcs.IDMap.Load(resp.ID) + assert.Equal(mcs.Test, true, ok) + mcs.IDMap.Delete(resp.ID) + // fmt.Println("WriteResponse: ", resp.ID, obkvrpc.GetGID()) + return nil +} + +func (mcs *MockCodec) Call(req *obkvrpc.Request, resp *obkvrpc.Response) error { + resp.ID = req.ID + resp.RspContent = []byte("content") + // fmt.Println("Call: ", req.ID, obkvrpc.GetGID()) + return nil +} + +func (mcs *MockCodec) Close() { + // fmt.Println("MockCodec closed") + mcs.Conn.Close() + mcs.Wg.Done() + mcs.IDMap.Range(func(key any, value any) bool { + mcs.Test.Error("IDMap is not empty when MockCodec closing", key) + return false + }) +} + +func MockTCPServer(t *testing.T, lis net.Listener, doneChan chan struct{}) { + rpcSrv, err := obkvrpc.NewServer(RoutinePoolSize, new(chan struct{})) + assert.Equal(t, nil, err) + wg := new(sync.WaitGroup) + for { + conn, err := lis.Accept() + if err != nil { + fmt.Println("listener closed") + break + } + codec := MockCodec{Conn: conn, RunTime: 0, Wg: wg, Test: t, CloseChan: make(chan struct{})} + wg.Add(1) + go rpcSrv.ServeCodec(&codec) + } + wg.Wait() + close(doneChan) + rpcSrv.Close() +} + +func Dial(t *testing.T) { + _, err := net.Dial("tcp", "127.0.0.1:1234") + assert.Equal(t, nil, err) +} + +func ServerAndClients(t *testing.T, maxMsgNum int, maxClientNum int) { + // fmt.Println("start to test tcp server") + MaxMsgNum = maxMsgNum + lis, err := net.Listen("tcp", "127.0.0.1:1234") + assert.Equal(t, nil, err) + doneChan := make(chan struct{}) + go MockTCPServer(t, lis, doneChan) + for runTime := 0; runTime < maxClientNum; runTime++ { + Dial(t) + } + lis.Close() + _, ok := <-doneChan + fmt.Println("finish test tcp server, isOpen = ", ok) +} + +func TestRPCServer(t *testing.T) { + // defer goleak.VerifyNone(t) + // single message single client + ServerAndClients(t, 1, 1) + time.Sleep(time.Millisecond * 500) + + // single message multi client + ServerAndClients(t, 1, 10000) + time.Sleep(time.Millisecond * 500) + + // multi message single client + ServerAndClients(t, 10000, 1) + time.Sleep(time.Millisecond * 500) + + // multi message multi client + ServerAndClients(t, 39000, 256) + time.Sleep(time.Millisecond) +}