Skip to content

Commit

Permalink
saves the work of the evening
Browse files Browse the repository at this point in the history
  • Loading branch information
rvflash committed Feb 18, 2019
1 parent 656f761 commit 0827fd1
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 31 deletions.
3 changes: 1 addition & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"context"
"io"
"io/ioutil"
"net"
)

Expand All @@ -27,7 +26,7 @@ func (c *conn) ServeTCP(w ResponseWriter, req *Request) {
}

func (c *conn) bySegment(segment string, body io.Reader) {
req := c.newRequest(segment, ioutil.NopCloser(body))
req := c.newRequest(segment, body)
w := c.newResponseWriter()
c.ServeTCP(w, req)
}
Expand Down
10 changes: 7 additions & 3 deletions context.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tcp

import (
"io"
"io/ioutil"
"strings"
"time"
Expand All @@ -11,7 +12,7 @@ type M map[string]interface{}

// Context allows us to pass variables between middleware and manage the flow.
type Context struct {
//Request contains information about the TCP request.
// Request contains information about the TCP request.
Request *Request
// ResponseWriter writes the response on the connection.
ResponseWriter
Expand Down Expand Up @@ -46,7 +47,7 @@ func (c *Context) Error(err error) {

// Err explains what failed during the request.
// The method name is inspired by the context package.
func (c *Context) Err() error {
func (c *Context) Err() Errors {
return c.errs
}

Expand Down Expand Up @@ -126,6 +127,9 @@ func (c *Context) ReadAll() ([]byte, error) {
if c.Request == nil {
return nil, ErrRequest
}
if c.Request.Body == nil {
return nil, io.EOF
}
return ioutil.ReadAll(c.Request.Body)
}

Expand All @@ -151,5 +155,5 @@ func (c *Context) reset() {
c.Shared = make(M)
c.handlers = nil
c.index = -1
c.errs = c.errs[0:0]
c.errs = nil
}
32 changes: 28 additions & 4 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import (
"strings"
)

// Err represents a TCP error.
type Err interface {
// Recovered returns true if the error comes from a panic recovering.
Recovered() bool
error
}

// ErrRequest is returned if the request is invalid.
var ErrRequest = NewError("invalid request")

Expand All @@ -18,22 +25,28 @@ func NewError(msg string, cause ...error) error {
// Error represents a error message.
// It can wraps another error, its cause.
type Error struct {
msg string
cause error
msg string
cause error
recover bool
}

// Error implements the error interface.
// Error implements the Err interface.
func (e *Error) Error() string {
if e.cause == nil {
return "tcp: " + e.msg
}
return "tcp: " + e.msg + ": " + e.cause.Error()
}

// Recovered implements the Err interface.
func (e *Error) Recovered() bool {
return e.recover
}

// Errors contains the list of errors occurred during the request.
type Errors []error

// Error implements the error interface.
// Error implements the Err interface.
func (e Errors) Error() string {
var (
b strings.Builder
Expand All @@ -51,3 +64,14 @@ func (e Errors) Error() string {
}
return b.String()
}

// Recovered implements the Err interface.
func (e Errors) Recovered() (ok bool) {
for _, r := range e {
_, ok = r.(*Error)
if ok {
return
}
}
return
}
10 changes: 3 additions & 7 deletions examples/ack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,15 @@ func main() {
// new message received
body, err := c.ReadAll()
if err != nil {
log.Println("err:", err)
c.Error(err)
}
log.Println("request:", body)
log.Println("say hi!")
c.String("hi!")
c.String(string(body))
})
r.SYN(func(c *tcp.Context) {
log.Println("say hello!")
c.String("hello")
})
r.FIN(func(c *tcp.Context) {
log.Println("remote addr")
log.Println(c.Request.RemoteAddr)
log.Println("bye")
})
log.Fatal(r.Run(":9090"))
}
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
86 changes: 86 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package tcp

import (
"bytes"
"io/ioutil"
"math"
"os"
"time"

"github.com/sirupsen/logrus"
)

const (
RemoteAddr = "addr"
RequestLength = "req_size"
ResponseLength = "resp_size"
Latency = "latency"
Hostname = "server"
)

func newMessage(req *Request) *message {
// starts the UTC timer.
m := &message{
start: time.Now().UTC(),
req: req,
}
// reads the request body without closing it to get its size.
if req.Body != nil {
buf, _ := ioutil.ReadAll(req.Body)
req.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
m.reqSize = len(buf)
}
return m
}

type message struct {
latency time.Duration
req *Request
reqSize int
start time.Time
}

func (m *message) fields(w ResponseWriter, f logrus.Fields) logrus.Fields {
d := make(logrus.Fields)
for k := range f {
switch k {
case RemoteAddr:
d[k] = m.req.RemoteAddr
case RequestLength:
d[k] = w.Size()
case ResponseLength:
d[k] = m.reqSize
case Latency:
m.latency = time.Since(m.start)
d[k] = int(math.Ceil(float64(m.latency.Nanoseconds()) / 1000.0))
case Hostname:
d[k], _ = os.Hostname()
}
}
return d
}

// String implements the fmt.Stringer interface.
func (m *message) String() string {
sep := " | "
return "[TCP] " + m.start.Format(time.RFC3339) + sep + m.req.Segment
}

// Logger returns a middleware to log each TCP request.
func Logger(log *logrus.Logger, fields logrus.Fields) HandlerFunc {
return func(c *Context) {
// Initiates the timer
m := newMessage(c.Request)
// Processes the request
c.Next()
// Logs it.
entry := logrus.NewEntry(log).WithFields(m.fields(c.ResponseWriter, fields))
if e := c.Err(); e == nil {
entry.Info(m.String())
} else if e.Recovered() {
entry.Error(m.String() + " " + e.Error())
} else {
entry.Warn(m.String() + " " + e.Error())
}
}
}
18 changes: 9 additions & 9 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"io/ioutil"
)

// Request represents an TCP request.
// req represents an TCP request.
type Request struct {
// Segment specifies the TCP segment (SYN, ACK, FIN).
Segment string
Expand Down Expand Up @@ -52,20 +52,20 @@ func (r *Request) WithCancel(ctx context.Context) *Request {
}

// NewRequest returns a new instance of request.
// A segment is mandatory as input, an error is returned if it missing.
// A segment is mandatory as input. If empty, a SYN segment is used.
// If the body is missing, a no-op reader with closing is used.
func NewRequest(segment string, body io.Reader) *Request {
if segment == "" {
// by default, we use the SYN segment.
segment = SYN
}
rc, ok := body.(io.ReadCloser)
if !ok && body != nil {
rc = ioutil.NopCloser(body)
}
req := &Request{
Segment: segment,
Body: rc,
req := &Request{Segment: segment}
if body != nil {
rc, ok := body.(io.ReadCloser)
if !ok {
rc = ioutil.NopCloser(body)
}
req.Body = rc
}
return req
}
10 changes: 6 additions & 4 deletions response.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
// ResponseWriter interface is used by a TCP handler to write the response.
type ResponseWriter interface {
io.WriteCloser
// Size returns the number of bytes already written into the response body.
// -1: not already written
Size() int
}

type responseWriter struct {
ResponseWriter
size int
ResponseWriter io.WriteCloser
size int
}

const noWritten = -1
Expand All @@ -21,8 +24,7 @@ func (r *responseWriter) Close() error {
return r.ResponseWriter.Close()
}

// Size returns the number of bytes already written into the response body.
// -1: not already written
// Size implements the ResponseWriter interface.
func (r *responseWriter) Size() int {
return r.size
}
Expand Down
17 changes: 15 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net"
"sync"
"time"

"github.com/sirupsen/logrus"
)

// Handler responds to a TCP request.
Expand Down Expand Up @@ -40,8 +42,19 @@ const (

// Default returns an instance of TCP server with a Logger and a Recover on panic attached.
func Default() *Server {
// todo :)
return New()
// Adds a logger.
l := logrus.New()
l.Formatter = &logrus.TextFormatter{DisableTimestamp: true}
f := logrus.Fields{
Latency: 0,
Hostname: "",
RemoteAddr: "",
RequestLength: 0,
ResponseLength: 0,
}
h := New()
h.Use(Logger(l, f))
return h
}

// New returns a new instance of a TCP server.
Expand Down

0 comments on commit 0827fd1

Please sign in to comment.