From 8d6e789ddf011dfb75d093fce8450a1c3dbd31a4 Mon Sep 17 00:00:00 2001 From: hgouchet Date: Wed, 13 Feb 2019 08:26:38 +0100 Subject: [PATCH] saves work to continue somewhere else --- context.go | 155 +++++++++++++++++++++++---------------------- errors.go | 52 +++++++++++++++ request.go | 70 ++++++++++++++++++++ response.go | 62 ++++++++++++++++++ server.go | 179 ++++++++++++++++++++++++++++++++++++++-------------- 5 files changed, 394 insertions(+), 124 deletions(-) create mode 100644 errors.go create mode 100644 request.go create mode 100644 response.go diff --git a/context.go b/context.go index 3732703..6a3a27b 100644 --- a/context.go +++ b/context.go @@ -2,117 +2,118 @@ package tcp import ( "bufio" - "context" - "io" "net" + "strings" ) -// Conn is implemented by any Context. -type Conn interface { - io.WriteCloser - // Closed listening the cancellation context. - Closed() <-chan struct{} - // RawData returns the request's message. - RawData() []byte - // RemoteAddr returns the remote network address. - RemoteAddr() net.Addr -} - // Context ... type Context struct { - cancel context.CancelFunc - conn net.Conn - ctx context.Context - msg []byte - srv *Server + Request *Request + ResponseWriter + + conn net.Conn + errs Errors + index int + handlers []HandlerFunc + writer *responseWriter } -func (c *Context) closing() { - for _, f := range c.srv.out { - f(c) +// Close implements the Conn interface. +func (c *Context) Close() error { + if c.Request != nil { + c.Request.Cancel() } - if err := c.Close(); err != nil { - c.srv.errorf("context closing failed with %q", err) + if c.conn == nil { + return nil + } + return c.conn.Close() +} + +// Closed implements the Conn interface. +func (c *Context) Closed() <-chan struct{} { + if c.Request == nil { + return nil } + return c.Request.Closed() +} + +// Error reports a new error. +func (c *Context) Error(err error) { + c.errs = append(c.errs, err) } -func (c *Context) incoming() { - for _, f := range c.srv.in { - f(c) +// Err explains what failed during the request. +// The method name is inspired of the context package. +func (c *Context) Err() error { + return c.errs +} + +// Next should be used only inside middleware. +// It executes the pending handlers in the chain inside the calling handler. +func (c *Context) Next() { + c.index++ + for c.index < len(c.handlers) { + c.handlers[c.index](c) + c.index++ } } -func (c *Context) listening(d []byte) { - for _, f := range c.srv.msg { - f(c.copy(d)) +// String writes the given string into the connection. +func (c *Context) String(s string) { + if !strings.HasSuffix(s, "\n") { + // sends it now + s += "\n" + } + _, err := c.ResponseWriter.WriteString(s) + if err != nil { + c.Error(err) } } -func (c *Context) copy(d []byte) *Context { - var cc = *c - cc.msg = make([]byte, len(d)) - copy(cc.msg, d) - return &cc +// Write implements the Conn interface. +func (c *Context) Write(d []byte) (int, error) { + return c.ResponseWriter.Write(d) } -func (c *Context) handle(ctx context.Context) { - // Initiates the connection with a context by cancellation. - c.ctx, c.cancel = context.WithCancel(ctx) +func (c *Context) catch() { // Launches any handler waiting for new connection. - c.incoming() + c.applyHandlers(SYN) r := bufio.NewReader(c.conn) for { - // For each new message + x := c.copy() d, err := r.ReadBytes('\n') if err != nil { - // Closes the connection and the context. - c.closing() + x.close() return } - c.listening(d) + go x.handle(d) } } -// Close implements the Conn interface. -func (c *Context) Close() error { - if c.conn == nil { - return nil - } - c.cancel() - return c.conn.Close() -} - -// Closed implements the Conn interface. -func (c *Context) Closed() <-chan struct{} { - if c.ctx == nil { - return nil +func (c *Context) close() { + // launches any handler waiting for closed connection. + c.applyHandlers(FIN) + // tries to close the connection and the context + if err := c.Close(); err != nil { + c.Error(NewError("close", err)) } - return c.ctx.Done() } -// Data implements the Conn interface. -func (c *Context) RawData() []byte { - return c.msg +func (c *Context) copy() *Context { + var cc = *c + cc.handlers = nil + return &cc } -// RemoteAddr implements the Conn interface. -func (c *Context) RemoteAddr() net.Addr { - if c.conn == nil { - return nil - } - return c.conn.RemoteAddr() +func (c *Context) handle(d []byte) { + // use response as entry point + // launches any handler waiting for new message. + c.applyHandlers(ACK) } -// String writes the given string into the connection. -func (c *Context) String(s string) { - _, err := c.Write([]byte(s + "\n")) - if err != nil { - // todo panic - c.srv.errorf("failed to write: %s", err) - } -} - -// Write implements the Conn interface. -func (c *Context) Write(d []byte) (int, error) { - return c.conn.Write(d) +func (c *Context) reset() { + c.ResponseWriter = c.writer + c.handlers = nil + c.index = -1 + c.errs = c.errs[0:0] } diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..6ecbb6f --- /dev/null +++ b/errors.go @@ -0,0 +1,52 @@ +package tcp + +import ( + "strings" +) + +// ErrRequest ... +var ErrRequest = NewError("invalid request") + +// NewError ... +func NewError(msg string, cause ...error) error { + if cause == nil { + return &Error{msg: msg} + } + return &Error{msg: msg, cause: cause[0]} +} + +// Error ... +type Error struct { + msg string + cause error +} + +// Error implements the error interface. +func (e *Error) Error() string { + if e.cause == nil { + return "tcp: " + e.msg + } + return "tcp: " + e.msg + ": " + e.cause.Error() +} + +// Errors ... +type Errors []error + +// Error implements the error interface. +func (e Errors) Error() string { + var ( + b strings.Builder + err error + ) + for i, r := range e { + if i > 0 { + if _, err = b.WriteString(", "); err != nil { + return err.Error() + } + } + if _, err = b.WriteString(r.Error()); err != nil { + return err.Error() + } + } + return b.String() +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..e914da9 --- /dev/null +++ b/request.go @@ -0,0 +1,70 @@ +package tcp + +import ( + "context" + "io" + "io/ioutil" +) + +// Request represents an TCP request. +type Request struct { + // Method specifies the TCP step (SYN, ACK, FIN). + Method string + + // Body is the request's body. + Body io.ReadCloser + + // RemoteAddr returns the remote network address. + RemoteAddr string + + ctx context.Context + cancel context.CancelFunc +} + +// Close implements the io.Closer interface. +func (r *Request) Cancel() { + if r.cancel != nil { + r.cancel() + } +} + +// Closed implements the Conn interface. +func (r *Request) Closed() <-chan struct{} { + return r.Context().Done() +} + +// Context returns the request's context. +func (r *Request) Context() context.Context { + if r.ctx != nil { + return r.ctx + } + return context.Background() +} + +// WithCancel returns a shallow copy of the given request with its context changed to ctx. +func (r *Request) WithCancel(ctx context.Context) *Request { + if ctx == nil { + // awkward: nothing to do + return r + } + r2 := new(Request) + *r2 = *r + r2.ctx, r2.cancel = context.WithCancel(ctx) + return r2 +} + +// NewRequest ... +func NewRequest(method string, body io.Reader) (*Request, error) { + if method == "" { + return nil, ErrRequest + } + rc, ok := body.(io.ReadCloser) + if !ok && body != nil { + rc = ioutil.NopCloser(body) + } + req := &Request{ + Method: method, + Body: rc, + } + return req, nil +} diff --git a/response.go b/response.go new file mode 100644 index 0000000..85e0b1e --- /dev/null +++ b/response.go @@ -0,0 +1,62 @@ +package tcp + +import ( + "io" + "net" +) + +const noWritten = 1 + +// ResponseWriter interface is used by an TCP handler to construct the response. +type ResponseWriter interface { + // Writer is the interface that wraps the basic Write method. + io.Writer + // WriteString allow to directly write string. + WriteString(s string) (n int, err error) + // Size returns the number of bytes already written into the response body. + // -1: not already written + Size() int +} + +type responseWriter struct { + conn io.Writer + size int +} + +func newResponseWriter(c net.Conn) *responseWriter { + return &responseWriter{ + conn: c, + size: noWritten, + } +} + +// Write implements the ResponseWriter interface. +func (w *responseWriter) Size() int { + return w.size +} + +// Write implements the ResponseWriter interface. +func (w *responseWriter) Write(p []byte) (n int, err error) { + n, err = w.conn.Write(p) + w.incr(n) + return +} + +// Write implements the ResponseWriter interface. +func (w *responseWriter) WriteString(s string) (n int, err error) { + n, err = io.WriteString(w.conn, s) + w.incr(n) + return +} + +func (w *responseWriter) incr(n int) { + if n == noWritten { + n = 0 + } + w.size += n +} + +func (w *responseWriter) rebase(conn io.Writer) { + w.conn = conn + w.size = noWritten +} diff --git a/server.go b/server.go index c5ad56d..5e41a73 100644 --- a/server.go +++ b/server.go @@ -4,95 +4,180 @@ package tcp import ( "context" "net" - - "github.com/sirupsen/logrus" + "sync" + "time" ) // HandlerFunc defines the handler interface. type HandlerFunc func(c *Context) -// Logger must be implemented by any logger. -type Logger interface { - Errorf(format string, v ...interface{}) - Printf(format string, v ...interface{}) +// Router is implemented by the Server. +type Router interface { + // Any registers a route that matches one of supported method + Any(method string, handler ...HandlerFunc) Router + // Use adds middleware fo any context: start and end of connection and message. + Use(handler ...HandlerFunc) Router + // ACK is a shortcut for Any("ACK", ...HandlerFunc). + ACK(handler ...HandlerFunc) Router + // FIN is a shortcut for Any("FIN", ...HandlerFunc). + FIN(handler ...HandlerFunc) Router + // SYN is a shortcut for Any("SYN", ...HandlerFunc). + SYN(handler ...HandlerFunc) Router } -// Default returns an instance of TCP server with a Logger attached. +// List of supported "methods". +const ( + ANY = "" + ACK = "ACK" + FIN = "FIN" + SYN = "SYN" +) + +// Default returns an instance of TCP server with a Logger and a Recover on panic attached. func Default() *Server { - return &Server{log: logrus.New()} + // todo :) + return New() } // New returns a new instance of a TCP server. func New() *Server { - return &Server{} + s := &Server{ + handlers: map[string][]HandlerFunc{}, + } + s.pool.New = func() interface{} { + return s.allocateContext() + } + return s } // Server is the TCP server. It contains type Server struct { - in, out, msg []HandlerFunc - log Logger + // ReadTimeout is the maximum duration for reading the entire request, including the body. + // A zero value for t means Read will not time out. + ReadTimeout time.Duration + + handlers map[string][]HandlerFunc + pool sync.Pool } -func (s *Server) errorf(format string, v ...interface{}) { - if s.log == nil { - return - } - s.log.Errorf(format, v...) +func (s *Server) allocateContext() *Context { + return &Context{} } -func (s *Server) printf(format string, v ...interface{}) { - if s.log == nil { - return +func (s *Server) computeHandlers(handlers []HandlerFunc) []HandlerFunc { + m := make([]HandlerFunc, len(s.handlers[ANY])+len(handlers)) + copy(m, s.handlers[ANY]) + copy(m[len(s.handlers[ANY]):], handlers) + return m +} + +// Any +func (s *Server) Any(method string, f ...HandlerFunc) Router { + switch method { + case ACK: + return s.ACK(f...) + case FIN: + return s.FIN(f...) + case SYN: + return s.SYN(f...) + default: + return s.Use(f...) } - s.log.Printf(format, v...) } // ACK allows to handle each new message. -func (s *Server) ACK(f ...HandlerFunc) { - if f != nil { - s.msg = append(s.msg, f...) - } +func (s *Server) ACK(f ...HandlerFunc) Router { + s.handlers[ACK] = append(s.handlers[ACK], f...) + return s } -// FIN allows to handle when the Context connection is closed. -func (s *Server) FIN(f ...HandlerFunc) { - if f != nil { - s.out = append(s.out, f...) - } +// FIN allows to handle when the connection is closed. +func (s *Server) FIN(f ...HandlerFunc) Router { + s.handlers[FIN] = append(s.handlers[FIN], f...) + return s } -// SYN allows to handle each new connection / Context. -func (s *Server) SYN(f ...HandlerFunc) { - if f != nil { - s.in = append(s.in, f...) - } +// SYN allows to handle each new connection. +func (s *Server) SYN(f ...HandlerFunc) Router { + s.handlers[SYN] = append(s.handlers[SYN], f...) + return s +} + +// Use adds middleware(s). +func (s *Server) Use(f ...HandlerFunc) Router { + s.handlers[ANY] = append(s.handlers[ANY], f...) + return s } // Run starts listening on TCP address. // This method will block the calling goroutine indefinitely unless an error happens. -func (s *Server) Run(addr string) error { +func (s *Server) Run(addr string) (err error) { l, err := net.Listen("tcp", addr) if err != nil { - return err + return } defer func() { - if err := l.Close(); err != nil { - s.errorf("tcp closing failed with %q", err) + if err == nil { + err = l.Close() } }() - s.printf("tcp listens on %s", addr) - ctx := context.Background() for { - // new connection - c, err := l.Accept() + c, err := newConn(l, s.ReadTimeout) if err != nil { - return err + return } - x := &Context{ - conn: c, - srv: s, + r, err := newRequest(c, ctx) + if err != nil { + return } - go x.handle(ctx) + w := newResponseWriter(c) + // flag request as SYN and repeat the action... + go s.ServeTCP(w, r) + } +} + +func (s *Server) handle(c *Context) { + if c.handlers == nil { + return + } + c.Next() +} + +func newConn(l net.Listener, to time.Duration) (net.Conn, error) { + c, err := l.Accept() + if err != nil { + return nil, err } + if to == 0 { + // no read deadline required. + return c, err + } + err = c.SetReadDeadline(time.Now().Add(to)) + if err != nil { + return nil, err + } + return c, nil +} + +func newRequest(c net.Conn, ctx context.Context) (*Request, error) { + req, err := NewRequest(SYN, nil) + if err != nil || c == nil || ctx == nil { + return nil, err + } + // Retrieves the remote address of the client. + req.RemoteAddr = c.RemoteAddr().String() + + // Initiates the connection with a context by cancellation. + return req.WithCancel(ctx), nil +} + +// ServeTCP ... +func (s *Server) ServeTCP(w ResponseWriter, req *Request) { + c := s.pool.Get().(*Context) + c.writer.rebase(w) + c.Request = req + c.reset() + s.handle(c) + s.pool.Put(c) }