diff --git a/connection/connection.go b/connection/connection.go index 8e90fcb..72b4467 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -25,14 +25,16 @@ type CallBack interface { // Connection TCP 连接 type Connection struct { - fd int - connected atomic.Bool - outBuffer *ringbuffer.RingBuffer // write buffer - inBuffer *ringbuffer.RingBuffer // read buffer - callBack CallBack - loop *eventloop.EventLoop - peerAddr string - ctx interface{} + fd int + connected atomic.Bool + outBuffer *ringbuffer.RingBuffer // write buffer + outBufferLen atomic.Int64 + inBuffer *ringbuffer.RingBuffer // read buffer + inBufferLen atomic.Int64 + callBack CallBack + loop *eventloop.EventLoop + peerAddr string + ctx interface{} KeyValueContext idleTime time.Duration @@ -129,6 +131,16 @@ func (c *Connection) ShutdownWrite() error { return unix.Shutdown(c.fd, unix.SHUT_WR) } +// ReadBufferLength read buffer 当前积压的数据长度 +func (c *Connection) ReadBufferLength() int64 { + return c.inBufferLen.Get() +} + +// WriteBufferLength write buffer 当前积压的数据长度 +func (c *Connection) WriteBufferLength() int64 { + return c.outBufferLen.Get() +} + // HandleEvent 内部使用,event loop 回调 func (c *Connection) HandleEvent(fd int, events poller.Event) { if c.idleTime > 0 { @@ -147,6 +159,9 @@ func (c *Connection) HandleEvent(fd int, events poller.Event) { } else if events&poller.EventRead != 0 { c.handleRead(fd) } + + c.inBufferLen.Swap(int64(c.inBuffer.Length())) + c.outBufferLen.Swap(int64(c.outBuffer.Length())) } func (c *Connection) handlerProtocol(buffer *ringbuffer.RingBuffer) []byte { diff --git a/example/bufferlength/main.go b/example/bufferlength/main.go new file mode 100644 index 0000000..5f19a26 --- /dev/null +++ b/example/bufferlength/main.go @@ -0,0 +1,106 @@ +package main + +import ( + "container/list" + "log" + "sync" + "time" + + "github.com/Allenxuxu/gev" + "github.com/Allenxuxu/gev/connection" +) + +const clientsKey = "demo_push_message_key" + +// Server example +type Server struct { + conn *list.List + mu sync.RWMutex + server *gev.Server +} + +// New server +func New(ip, port string) (*Server, error) { + var err error + s := new(Server) + s.conn = list.New() + s.server, err = gev.NewServer(s, + gev.Address(ip+":"+port)) + if err != nil { + return nil, err + } + + return s, nil +} + +// Start server +func (s *Server) Start() { + s.server.RunEvery(1*time.Second, s.RunPush) + s.server.Start() +} + +// Stop server +func (s *Server) Stop() { + s.server.Stop() +} + +// RunPush push message +func (s *Server) RunPush() { + var next *list.Element + + s.mu.RLock() + defer s.mu.RUnlock() + + for e := s.conn.Front(); e != nil; e = next { + next = e.Next() + + c := e.Value.(*connection.Connection) + if c.WriteBufferLength() > 1024*10 { + log.Printf("write buffer length > 1024*10") + continue + } + _ = c.Send([]byte("hello\n")) + } +} + +// OnConnect callback +func (s *Server) OnConnect(c *connection.Connection) { + log.Println(" OnConnect : ", c.PeerAddr()) + + s.mu.Lock() + e := s.conn.PushBack(c) + s.mu.Unlock() + c.Set(clientsKey, e) +} + +// OnMessage callback +func (s *Server) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) { + log.Printf("OnMessage, read buffer len %d, write buffer len %d \n", c.ReadBufferLength(), c.WriteBufferLength()) + + out = data + return +} + +// OnClose callback +func (s *Server) OnClose(c *connection.Connection) { + log.Println("OnClose") + v, ok := c.Get(clientsKey) + if !ok { + log.Println("OnClose : get key fail") + return + } + + s.mu.Lock() + s.conn.Remove(v.(*list.Element)) + s.mu.Unlock() +} + +func main() { + s, err := New("", "1833") + if err != nil { + panic(err) + } + defer s.Stop() + + s.Start() +}