Skip to content

Commit

Permalink
First try to implement graceful shutdown on the server. Stops listeni…
Browse files Browse the repository at this point in the history
…ng but does not interrupt any active connections
  • Loading branch information
rvflash committed Sep 2, 2019
1 parent 3592e32 commit 831d9e5
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 24 deletions.
29 changes: 18 additions & 11 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ import (

type conn struct {
addr string
ctx context.Context
srv *Server
rwc net.Conn
srv *Server
}

func (c *conn) bySegment(segment string, body io.Reader) {
ctx, cancel := context.WithCancel(c.ctx)
func (c *conn) bySegment(ctx context.Context, segment string, body io.Reader) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

w := newWriter(c.rwc)
Expand All @@ -30,20 +29,28 @@ func (c *conn) newRequest(segment string, body io.Reader) *Request {
return req
}

func (c *conn) serve() {
// deals with a new connection
go c.bySegment(SYN, nil)
// waiting for messages
func (c *conn) serve(ctx context.Context) {
// New connection
c.bySegment(ctx, SYN, nil)

// Waiting for messages
r := bufio.NewReader(c.rwc)
for {
select {
case <-ctx.Done():
// Connection closing, stops serving.
c.bySegment(ctx, FIN, r)
return
default:
}
d, err := r.ReadBytes('\n')
r := bytes.NewReader(d)
if err != nil {
// unable to read on it: closing the connection.
c.bySegment(FIN, r)
// Unable to read on it: closing the connection.
c.bySegment(ctx, FIN, r)
return
}
// new message received
go c.bySegment(ACK, r)
c.bySegment(ctx, ACK, r)
}
}
84 changes: 71 additions & 13 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func Default() *Server {
func New() *Server {
s := &Server{
handlers: map[string][]HandlerFunc{},
shutdown: make(chan struct{}),
closed: make(chan struct{}),
}
s.pool.New = func() interface{} {
return s.allocateContext()
Expand All @@ -79,8 +81,11 @@ type Server struct {
// A zero value for t means Read will not time out.
ReadTimeout time.Duration

cancel context.CancelFunc
handlers map[string][]HandlerFunc
pool sync.Pool
closed,
shutdown chan struct{}
}

// Any attaches handlers on the given segment.
Expand Down Expand Up @@ -130,7 +135,14 @@ func (s *Server) Run(addr string) (err error) {
if err != nil {
return
}
return s.serve(l)
defer func() {
cErr := l.Close()
if err != nil {
err = cErr
}
}()
err = s.serve(l)
return
}

// RunTLS acts identically to the Run method, except that it uses the TLS protocol.
Expand All @@ -144,30 +156,49 @@ func (s *Server) RunTLS(addr, certFile, keyFile string) (err error) {
if err != nil {
return
}
return s.serve(l)
}

func (s *Server) serve(l net.Listener) (err error) {
defer func() {
if err == nil {
err = l.Close()
cErr := l.Close()
if err != nil {
err = cErr
}
}()
ctx := context.Background()
err = s.serve(l)
return
}

func (s *Server) serve(l net.Listener) error {
var (
w8 sync.WaitGroup
ctx context.Context
)
ctx, s.cancel = context.WithCancel(context.Background())
defer s.cancel()
for {
select {
case <-s.shutdown:
// Stops listening but does not interrupt any active connections.
// See the Shutdown method to gracefully shuts down the server.
w8.Wait()
close(s.closed)
return nil
default:
}
c, err := read(l, s.ReadTimeout)
if err != nil {
return err
}
rwc := s.newConn(ctx, c)
go rwc.serve()
rwc := s.newConn(c)
w8.Add(1)
go func() {
defer w8.Done()
rwc.serve(ctx)
}()
}
}

func (s *Server) newConn(ctx context.Context, c net.Conn) *conn {
func (s *Server) newConn(c net.Conn) *conn {
return &conn{
addr: c.RemoteAddr().String(),
ctx: ctx,
srv: s,
rwc: c,
}
Expand Down Expand Up @@ -198,6 +229,33 @@ func (s *Server) computeHandlers(segment string) []HandlerFunc {
return m
}

// Shutdown gracefully shuts down the server without interrupting any
// active connections. Shutdown works by first closing all open listeners and
// then waiting indefinitely for connections to return to idle and then shut down.
// If the provided context expires before the shutdown is complete,
// Shutdown returns the context's error.
func (s *Server) Shutdown(ctx context.Context) error {
if s.shutdown == nil {
// Nothing to do
return nil
}
// Stops listening.
close(s.shutdown)

// Stops all.
for {
select {
case <-ctx.Done():
// Forces closing of actives connections.
s.cancel()
return ctx.Err()
case <-s.closed:
return nil
default:
}
}
}

func tlsConfig(certFile, keyFile string) (*tls.Config, error) {
var err error
c := make([]tls.Certificate, 1)
Expand All @@ -211,7 +269,7 @@ func read(l net.Listener, to time.Duration) (net.Conn, error) {
return nil, err
}
if to == 0 {
return c, err
return c, nil
}
err = c.SetReadDeadline(time.Now().Add(to))
if err != nil {
Expand Down

0 comments on commit 831d9e5

Please sign in to comment.