From fef49f75e610a0e57bacd4694b1e78c4996213bf Mon Sep 17 00:00:00 2001 From: "herve.gouchet" Date: Mon, 2 Sep 2019 14:36:12 +0200 Subject: [PATCH 1/8] [twgit] Init release 'release-0.3.0'. From 831d9e5524b0bcb5a3af56345beb8905f0184377 Mon Sep 17 00:00:00 2001 From: hgouchet Date: Mon, 2 Sep 2019 18:16:57 +0200 Subject: [PATCH 2/8] First try to implement graceful shutdown on the server. Stops listening but does not interrupt any active connections --- conn.go | 29 +++++++++++-------- server.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 89 insertions(+), 24 deletions(-) diff --git a/conn.go b/conn.go index fca2dc3..83199e0 100644 --- a/conn.go +++ b/conn.go @@ -10,13 +10,12 @@ import ( type conn struct { addr string - ctx context.Context - srv *Server rwc net.Conn + srv *Server } -func (c *conn) bySegment(segment string, body io.Reader) { - ctx, cancel := context.WithCancel(c.ctx) +func (c *conn) bySegment(ctx context.Context, segment string, body io.Reader) { + ctx, cancel := context.WithCancel(ctx) defer cancel() w := newWriter(c.rwc) @@ -30,20 +29,28 @@ func (c *conn) newRequest(segment string, body io.Reader) *Request { return req } -func (c *conn) serve() { - // deals with a new connection - go c.bySegment(SYN, nil) - // waiting for messages +func (c *conn) serve(ctx context.Context) { + // New connection + c.bySegment(ctx, SYN, nil) + + // Waiting for messages r := bufio.NewReader(c.rwc) for { + select { + case <-ctx.Done(): + // Connection closing, stops serving. + c.bySegment(ctx, FIN, r) + return + default: + } d, err := r.ReadBytes('\n') r := bytes.NewReader(d) if err != nil { - // unable to read on it: closing the connection. - c.bySegment(FIN, r) + // Unable to read on it: closing the connection. + c.bySegment(ctx, FIN, r) return } // new message received - go c.bySegment(ACK, r) + c.bySegment(ctx, ACK, r) } } diff --git a/server.go b/server.go index 205bbd8..dafd0c5 100644 --- a/server.go +++ b/server.go @@ -62,6 +62,8 @@ func Default() *Server { func New() *Server { s := &Server{ handlers: map[string][]HandlerFunc{}, + shutdown: make(chan struct{}), + closed: make(chan struct{}), } s.pool.New = func() interface{} { return s.allocateContext() @@ -79,8 +81,11 @@ type Server struct { // A zero value for t means Read will not time out. ReadTimeout time.Duration + cancel context.CancelFunc handlers map[string][]HandlerFunc pool sync.Pool + closed, + shutdown chan struct{} } // Any attaches handlers on the given segment. @@ -130,7 +135,14 @@ func (s *Server) Run(addr string) (err error) { if err != nil { return } - return s.serve(l) + defer func() { + cErr := l.Close() + if err != nil { + err = cErr + } + }() + err = s.serve(l) + return } // RunTLS acts identically to the Run method, except that it uses the TLS protocol. @@ -144,30 +156,49 @@ func (s *Server) RunTLS(addr, certFile, keyFile string) (err error) { if err != nil { return } - return s.serve(l) -} - -func (s *Server) serve(l net.Listener) (err error) { defer func() { - if err == nil { - err = l.Close() + cErr := l.Close() + if err != nil { + err = cErr } }() - ctx := context.Background() + err = s.serve(l) + return +} + +func (s *Server) serve(l net.Listener) error { + var ( + w8 sync.WaitGroup + ctx context.Context + ) + ctx, s.cancel = context.WithCancel(context.Background()) + defer s.cancel() for { + select { + case <-s.shutdown: + // Stops listening but does not interrupt any active connections. + // See the Shutdown method to gracefully shuts down the server. + w8.Wait() + close(s.closed) + return nil + default: + } c, err := read(l, s.ReadTimeout) if err != nil { return err } - rwc := s.newConn(ctx, c) - go rwc.serve() + rwc := s.newConn(c) + w8.Add(1) + go func() { + defer w8.Done() + rwc.serve(ctx) + }() } } -func (s *Server) newConn(ctx context.Context, c net.Conn) *conn { +func (s *Server) newConn(c net.Conn) *conn { return &conn{ addr: c.RemoteAddr().String(), - ctx: ctx, srv: s, rwc: c, } @@ -198,6 +229,33 @@ func (s *Server) computeHandlers(segment string) []HandlerFunc { return m } +// Shutdown gracefully shuts down the server without interrupting any +// active connections. Shutdown works by first closing all open listeners and +// then waiting indefinitely for connections to return to idle and then shut down. +// If the provided context expires before the shutdown is complete, +// Shutdown returns the context's error. +func (s *Server) Shutdown(ctx context.Context) error { + if s.shutdown == nil { + // Nothing to do + return nil + } + // Stops listening. + close(s.shutdown) + + // Stops all. + for { + select { + case <-ctx.Done(): + // Forces closing of actives connections. + s.cancel() + return ctx.Err() + case <-s.closed: + return nil + default: + } + } +} + func tlsConfig(certFile, keyFile string) (*tls.Config, error) { var err error c := make([]tls.Certificate, 1) @@ -211,7 +269,7 @@ func read(l net.Listener, to time.Duration) (net.Conn, error) { return nil, err } if to == 0 { - return c, err + return c, nil } err = c.SetReadDeadline(time.Now().Add(to)) if err != nil { From 8a573bda137d634b659172f5413014f492a39db8 Mon Sep 17 00:00:00 2001 From: hgouchet Date: Mon, 2 Sep 2019 18:19:40 +0200 Subject: [PATCH 3/8] First try to implement graceful shutdown on the server. Stops listening but does not interrupt any active connections. todo unit tests --- server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server.go b/server.go index dafd0c5..871914b 100644 --- a/server.go +++ b/server.go @@ -251,7 +251,6 @@ func (s *Server) Shutdown(ctx context.Context) error { return ctx.Err() case <-s.closed: return nil - default: } } } From d2ebeb421e568f02625a1a438bd78f56d6113d5d Mon Sep 17 00:00:00 2001 From: hgouchet Date: Mon, 2 Sep 2019 18:27:13 +0200 Subject: [PATCH 4/8] First try to implement graceful shutdown on the server. Stops listening but does not interrupt any active connections. todo unit tests --- server.go | 45 +++++++++++++++++++-------------------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/server.go b/server.go index 871914b..d4d198a 100644 --- a/server.go +++ b/server.go @@ -130,49 +130,41 @@ const network = "tcp" // Run starts listening on TCP address. // This method will block the calling goroutine indefinitely unless an error happens. -func (s *Server) Run(addr string) (err error) { +func (s *Server) Run(addr string) error { l, err := net.Listen(network, addr) if err != nil { - return + return err } - defer func() { - cErr := l.Close() - if err != nil { - err = cErr - } - }() - err = s.serve(l) - return + return s.serve(l) } // RunTLS acts identically to the Run method, except that it uses the TLS protocol. // This method will block the calling goroutine indefinitely unless an error happens. -func (s *Server) RunTLS(addr, certFile, keyFile string) (err error) { +func (s *Server) RunTLS(addr, certFile, keyFile string) error { c, err := tlsConfig(certFile, keyFile) if err != nil { - return + return err } l, err := tls.Listen(network, addr, c) if err != nil { - return + return err } - defer func() { - cErr := l.Close() - if err != nil { - err = cErr - } - }() - err = s.serve(l) - return + return s.serve(l) } -func (s *Server) serve(l net.Listener) error { +func (s *Server) serve(l net.Listener) (err error) { var ( w8 sync.WaitGroup ctx context.Context ) ctx, s.cancel = context.WithCancel(context.Background()) - defer s.cancel() + defer func() { + s.cancel() + cErr := l.Close() + if err != nil { + err = cErr + } + }() for { select { case <-s.shutdown: @@ -180,12 +172,13 @@ func (s *Server) serve(l net.Listener) error { // See the Shutdown method to gracefully shuts down the server. w8.Wait() close(s.closed) - return nil + return default: } - c, err := read(l, s.ReadTimeout) + var c net.Conn + c, err = read(l, s.ReadTimeout) if err != nil { - return err + return } rwc := s.newConn(c) w8.Add(1) From 7dceed7708efb47043054d309f598c0eb3c960d2 Mon Sep 17 00:00:00 2001 From: hgouchet Date: Tue, 3 Sep 2019 14:23:59 +0200 Subject: [PATCH 5/8] graceful shutdown: some improvements --- example/graceful_server/main.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 example/graceful_server/main.go diff --git a/example/graceful_server/main.go b/example/graceful_server/main.go new file mode 100644 index 0000000..7440a41 --- /dev/null +++ b/example/graceful_server/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "log" + + "github.com/rvflash/tcp" +) + +func main() { + r := tcp.Default() + r.ACK(func(c *tcp.Context) { + // new message received + body, err := c.ReadAll() + if err != nil { + c.Error(err) + return + } + log.Println(string(body)) + c.String("read") + }) + r.SYN(func(c *tcp.Context) { + c.String("hello") + }) + r.FIN(func(c *tcp.Context) { + log.Println("bye") + }) + log.Fatal(r.Run(":9090")) +} From 119b145398ed1a95a71c3fbe0b0df0309ca91aa7 Mon Sep 17 00:00:00 2001 From: hgouchet Date: Tue, 3 Sep 2019 14:24:03 +0200 Subject: [PATCH 6/8] graceful shutdown: some improvements --- .gitignore | 1 + conn.go | 25 +++++----- errors.go | 7 ++- example/graceful_server/main.go | 25 +++++++++- server.go | 88 +++++++++++++++++++++------------ 5 files changed, 98 insertions(+), 48 deletions(-) diff --git a/.gitignore b/.gitignore index 1285731..4d9bae3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea example/client/client example/server/server +example/graceful_server/graceful_server example/start/start \ No newline at end of file diff --git a/conn.go b/conn.go index 83199e0..45038c2 100644 --- a/conn.go +++ b/conn.go @@ -32,25 +32,24 @@ func (c *conn) newRequest(segment string, body io.Reader) *Request { func (c *conn) serve(ctx context.Context) { // New connection c.bySegment(ctx, SYN, nil) - + // Connection closed + defer c.bySegment(ctx, FIN, nil) // Waiting for messages r := bufio.NewReader(c.rwc) for { + cb := make(chan []byte, 1) + go func() { + d, err := r.ReadBytes('\n') + if err != nil { + return + } + cb <- d + }() select { case <-ctx.Done(): - // Connection closing, stops serving. - c.bySegment(ctx, FIN, r) - return - default: - } - d, err := r.ReadBytes('\n') - r := bytes.NewReader(d) - if err != nil { - // Unable to read on it: closing the connection. - c.bySegment(ctx, FIN, r) return + case b := <-cb: + c.bySegment(ctx, ACK, bytes.NewReader(b)) } - // new message received - c.bySegment(ctx, ACK, r) } } diff --git a/errors.go b/errors.go index f7abfa2..c176502 100644 --- a/errors.go +++ b/errors.go @@ -11,8 +11,11 @@ type Err interface { Recovered() bool } -// ErrRequest is returned if the request is invalid. -var ErrRequest = NewError("invalid request") +// List of common errors +var ( + // ErrRequest is returned if the request is invalid. + ErrRequest = NewError("invalid request") +) // NewError returns a new Error based of the given cause. func NewError(msg string, cause ...error) *Error { diff --git a/example/graceful_server/main.go b/example/graceful_server/main.go index 7440a41..5baa2ed 100644 --- a/example/graceful_server/main.go +++ b/example/graceful_server/main.go @@ -1,13 +1,22 @@ package main import ( + "context" "log" + "os" + "os/signal" + "syscall" + "time" "github.com/rvflash/tcp" ) func main() { + bye := make(chan os.Signal, 1) + signal.Notify(bye, os.Interrupt, syscall.SIGTERM) + r := tcp.Default() + r.ReadTimeout = 20 * time.Second r.ACK(func(c *tcp.Context) { // new message received body, err := c.ReadAll() @@ -24,5 +33,19 @@ func main() { r.FIN(func(c *tcp.Context) { log.Println("bye") }) - log.Fatal(r.Run(":9090")) + + go func() { + err := r.Run(":9090") + if err != nil { + log.Printf("server: %q\n", err) + } + }() + + <-bye + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + err := r.Shutdown(ctx) + cancel() + if err != nil { + log.Fatal(err) + } } diff --git a/server.go b/server.go index d4d198a..4f9d994 100644 --- a/server.go +++ b/server.go @@ -62,7 +62,7 @@ func Default() *Server { func New() *Server { s := &Server{ handlers: map[string][]HandlerFunc{}, - shutdown: make(chan struct{}), + closing: make(chan struct{}), closed: make(chan struct{}), } s.pool.New = func() interface{} { @@ -81,11 +81,14 @@ type Server struct { // A zero value for t means Read will not time out. ReadTimeout time.Duration - cancel context.CancelFunc + listener net.Listener handlers map[string][]HandlerFunc pool sync.Pool + + // graceful shutdown + cancelCtx context.CancelFunc closed, - shutdown chan struct{} + closing chan struct{} } // Any attaches handlers on the given segment. @@ -130,12 +133,12 @@ const network = "tcp" // Run starts listening on TCP address. // This method will block the calling goroutine indefinitely unless an error happens. -func (s *Server) Run(addr string) error { - l, err := net.Listen(network, addr) +func (s *Server) Run(addr string) (err error) { + s.listener, err = net.Listen(network, addr) if err != nil { return err } - return s.serve(l) + return s.serve() } // RunTLS acts identically to the Run method, except that it uses the TLS protocol. @@ -145,40 +148,58 @@ func (s *Server) RunTLS(addr, certFile, keyFile string) error { if err != nil { return err } - l, err := tls.Listen(network, addr, c) + s.listener, err = tls.Listen(network, addr, c) if err != nil { return err } - return s.serve(l) + return s.serve() } -func (s *Server) serve(l net.Listener) (err error) { +func (s *Server) close() { + select { + case <-s.closed: + // Already closed. + return + default: + close(s.closed) + } +} + +func (s *Server) closeListener() error { + if s.cancelCtx == nil { + return nil + } + s.cancelCtx() + return s.listener.Close() +} + +func (s *Server) serve() (err error) { var ( w8 sync.WaitGroup ctx context.Context ) - ctx, s.cancel = context.WithCancel(context.Background()) + ctx, s.cancelCtx = context.WithCancel(context.Background()) defer func() { - s.cancel() - cErr := l.Close() - if err != nil { - err = cErr - } - }() - for { select { - case <-s.shutdown: - // Stops listening but does not interrupt any active connections. - // See the Shutdown method to gracefully shuts down the server. - w8.Wait() - close(s.closed) - return + case <-s.closed: default: + err = s.closeListener() + return } + }() + for { var c net.Conn - c, err = read(l, s.ReadTimeout) + c, err = read(s.listener, s.ReadTimeout) if err != nil { - return + select { + case <-s.closing: + // Stops listening but does not interrupt any active connections. + w8.Wait() + s.close() + return nil + default: + return + } } rwc := s.newConn(c) w8.Add(1) @@ -225,22 +246,25 @@ func (s *Server) computeHandlers(segment string) []HandlerFunc { // Shutdown gracefully shuts down the server without interrupting any // active connections. Shutdown works by first closing all open listeners and // then waiting indefinitely for connections to return to idle and then shut down. -// If the provided context expires before the shutdown is complete, +// If the provided context expires before the closing is complete, // Shutdown returns the context's error. func (s *Server) Shutdown(ctx context.Context) error { - if s.shutdown == nil { + if s.closing == nil { // Nothing to do return nil } - // Stops listening. - close(s.shutdown) + close(s.closing) - // Stops all. + // Stops listening. + err := s.closeListener() + if err != nil { + return err + } for { select { case <-ctx.Done(): - // Forces closing of actives connections. - s.cancel() + // Forces closing of all actives connections. + s.close() return ctx.Err() case <-s.closed: return nil From 1ad9fdebfe98f5fc6faaf569230787bd248a674d Mon Sep 17 00:00:00 2001 From: hgouchet Date: Fri, 13 Sep 2019 22:40:35 +0200 Subject: [PATCH 7/8] redesign --- conn.go | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/conn.go b/conn.go index 45038c2..0c5eec1 100644 --- a/conn.go +++ b/conn.go @@ -31,25 +31,16 @@ func (c *conn) newRequest(segment string, body io.Reader) *Request { func (c *conn) serve(ctx context.Context) { // New connection - c.bySegment(ctx, SYN, nil) - // Connection closed - defer c.bySegment(ctx, FIN, nil) + go c.bySegment(ctx, SYN, nil) // Waiting for messages r := bufio.NewReader(c.rwc) for { - cb := make(chan []byte, 1) - go func() { - d, err := r.ReadBytes('\n') - if err != nil { - return - } - cb <- d - }() - select { - case <-ctx.Done(): - return - case b := <-cb: - c.bySegment(ctx, ACK, bytes.NewReader(b)) + d, err := r.ReadBytes('\n') + if err != nil { + break } + go c.bySegment(ctx, ACK, bytes.NewReader(d)) } + // Connection closed + c.bySegment(ctx, FIN, nil) } From a301cd9ce4354639e495d45a0ad3e7a29f26407a Mon Sep 17 00:00:00 2001 From: hgouchet Date: Fri, 13 Sep 2019 23:01:37 +0200 Subject: [PATCH 8/8] Adds graceful shutdown --- README.md | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 6f36276..65a3ee3 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,12 @@ The `Next` method on the `Context` should only be used inside middleware. Its al See the `Recovery` or `Logger` methods as sample code. +### Graceful shutdown + +By running the TCP server is in own go routine, you can gracefully shuts down the server without interrupting any active connections. +`Shutdown` works by first closing all open listeners and then waiting indefinitely for connections to return to idle and then shut down. + + ## Quick start Assuming the following code that runs a server on port 9090: @@ -76,28 +82,44 @@ Assuming the following code that runs a server on port 9090: package main import ( - "log" + "context" + "log" + "os" + "os/signal" "github.com/rvflash/tcp" ) func main() { - // creates a server with a logger and a recover on panic as middlewares. + bye := make(chan os.Signal, 1) + signal.Notify(bye, os.Interrupt, syscall.SIGTERM) + + // Creates a server with a logger and a recover on panic as middlewares. r := tcp.Default() r.ACK(func(c *tcp.Context) { - // new message received - // gets the request body + // New message received + // Gets the request body buf, err := c.ReadAll() if err != nil { c.Error(err) return } - // writes something as response + // Writes something as response c.String(string(buf)) }) - err := r.Run(":9090") // listen and serve on 0.0.0.0:9090 + go func() { + err := r.Run(":9090") + if err != nil { + log.Printf("server: %q\n", err) + } + }() + + <-bye + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + err := r.Shutdown(ctx) + cancel() if err != nil { - log.Fatalf("listen: %s", err) + log.Fatal(err) } } ``` \ No newline at end of file