Skip to content

Commit

Permalink
saves work to continue somewhere else
Browse files Browse the repository at this point in the history
  • Loading branch information
rvflash committed Feb 13, 2019
1 parent 78d1b50 commit 8d6e789
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 124 deletions.
155 changes: 78 additions & 77 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,117 +2,118 @@ package tcp

import (
"bufio"
"context"
"io"
"net"
"strings"
)

// Conn is implemented by any Context.
type Conn interface {
io.WriteCloser
// Closed listening the cancellation context.
Closed() <-chan struct{}
// RawData returns the request's message.
RawData() []byte
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
}

// Context ...
type Context struct {
cancel context.CancelFunc
conn net.Conn
ctx context.Context
msg []byte
srv *Server
Request *Request
ResponseWriter

conn net.Conn
errs Errors
index int
handlers []HandlerFunc
writer *responseWriter
}

func (c *Context) closing() {
for _, f := range c.srv.out {
f(c)
// Close implements the Conn interface.
func (c *Context) Close() error {
if c.Request != nil {
c.Request.Cancel()
}
if err := c.Close(); err != nil {
c.srv.errorf("context closing failed with %q", err)
if c.conn == nil {
return nil
}
return c.conn.Close()
}

// Closed implements the Conn interface.
func (c *Context) Closed() <-chan struct{} {
if c.Request == nil {
return nil
}
return c.Request.Closed()
}

// Error reports a new error.
func (c *Context) Error(err error) {
c.errs = append(c.errs, err)
}

func (c *Context) incoming() {
for _, f := range c.srv.in {
f(c)
// Err explains what failed during the request.
// The method name is inspired of the context package.
func (c *Context) Err() error {
return c.errs
}

// Next should be used only inside middleware.
// It executes the pending handlers in the chain inside the calling handler.
func (c *Context) Next() {
c.index++
for c.index < len(c.handlers) {
c.handlers[c.index](c)
c.index++
}
}

func (c *Context) listening(d []byte) {
for _, f := range c.srv.msg {
f(c.copy(d))
// String writes the given string into the connection.
func (c *Context) String(s string) {
if !strings.HasSuffix(s, "\n") {
// sends it now
s += "\n"
}
_, err := c.ResponseWriter.WriteString(s)
if err != nil {
c.Error(err)
}
}

func (c *Context) copy(d []byte) *Context {
var cc = *c
cc.msg = make([]byte, len(d))
copy(cc.msg, d)
return &cc
// Write implements the Conn interface.
func (c *Context) Write(d []byte) (int, error) {
return c.ResponseWriter.Write(d)
}

func (c *Context) handle(ctx context.Context) {
// Initiates the connection with a context by cancellation.
c.ctx, c.cancel = context.WithCancel(ctx)
func (c *Context) catch() {
// Launches any handler waiting for new connection.
c.incoming()
c.applyHandlers(SYN)
r := bufio.NewReader(c.conn)
for {
// For each new message
x := c.copy()
d, err := r.ReadBytes('\n')
if err != nil {
// Closes the connection and the context.
c.closing()
x.close()
return
}
c.listening(d)
go x.handle(d)
}
}

// Close implements the Conn interface.
func (c *Context) Close() error {
if c.conn == nil {
return nil
}
c.cancel()
return c.conn.Close()
}

// Closed implements the Conn interface.
func (c *Context) Closed() <-chan struct{} {
if c.ctx == nil {
return nil
func (c *Context) close() {
// launches any handler waiting for closed connection.
c.applyHandlers(FIN)
// tries to close the connection and the context
if err := c.Close(); err != nil {
c.Error(NewError("close", err))
}
return c.ctx.Done()
}

// Data implements the Conn interface.
func (c *Context) RawData() []byte {
return c.msg
func (c *Context) copy() *Context {
var cc = *c
cc.handlers = nil
return &cc
}

// RemoteAddr implements the Conn interface.
func (c *Context) RemoteAddr() net.Addr {
if c.conn == nil {
return nil
}
return c.conn.RemoteAddr()
func (c *Context) handle(d []byte) {
// use response as entry point
// launches any handler waiting for new message.
c.applyHandlers(ACK)
}

// String writes the given string into the connection.
func (c *Context) String(s string) {
_, err := c.Write([]byte(s + "\n"))
if err != nil {
// todo panic
c.srv.errorf("failed to write: %s", err)
}
}

// Write implements the Conn interface.
func (c *Context) Write(d []byte) (int, error) {
return c.conn.Write(d)
func (c *Context) reset() {
c.ResponseWriter = c.writer
c.handlers = nil
c.index = -1
c.errs = c.errs[0:0]
}
52 changes: 52 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package tcp

import (
"strings"
)

// ErrRequest ...
var ErrRequest = NewError("invalid request")

// NewError ...
func NewError(msg string, cause ...error) error {
if cause == nil {
return &Error{msg: msg}
}
return &Error{msg: msg, cause: cause[0]}
}

// Error ...
type Error struct {
msg string
cause error
}

// Error implements the error interface.
func (e *Error) Error() string {
if e.cause == nil {
return "tcp: " + e.msg
}
return "tcp: " + e.msg + ": " + e.cause.Error()
}

// Errors ...
type Errors []error

// Error implements the error interface.
func (e Errors) Error() string {
var (
b strings.Builder
err error
)
for i, r := range e {
if i > 0 {
if _, err = b.WriteString(", "); err != nil {
return err.Error()
}
}
if _, err = b.WriteString(r.Error()); err != nil {
return err.Error()
}
}
return b.String()
}
70 changes: 70 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package tcp

import (
"context"
"io"
"io/ioutil"
)

// Request represents an TCP request.
type Request struct {
// Method specifies the TCP step (SYN, ACK, FIN).
Method string

// Body is the request's body.
Body io.ReadCloser

// RemoteAddr returns the remote network address.
RemoteAddr string

ctx context.Context
cancel context.CancelFunc
}

// Close implements the io.Closer interface.
func (r *Request) Cancel() {
if r.cancel != nil {
r.cancel()
}
}

// Closed implements the Conn interface.
func (r *Request) Closed() <-chan struct{} {
return r.Context().Done()
}

// Context returns the request's context.
func (r *Request) Context() context.Context {
if r.ctx != nil {
return r.ctx
}
return context.Background()
}

// WithCancel returns a shallow copy of the given request with its context changed to ctx.
func (r *Request) WithCancel(ctx context.Context) *Request {
if ctx == nil {
// awkward: nothing to do
return r
}
r2 := new(Request)
*r2 = *r
r2.ctx, r2.cancel = context.WithCancel(ctx)
return r2
}

// NewRequest ...
func NewRequest(method string, body io.Reader) (*Request, error) {
if method == "" {
return nil, ErrRequest
}
rc, ok := body.(io.ReadCloser)
if !ok && body != nil {
rc = ioutil.NopCloser(body)
}
req := &Request{
Method: method,
Body: rc,
}
return req, nil
}
62 changes: 62 additions & 0 deletions response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package tcp

import (
"io"
"net"
)

const noWritten = 1

// ResponseWriter interface is used by an TCP handler to construct the response.
type ResponseWriter interface {
// Writer is the interface that wraps the basic Write method.
io.Writer
// WriteString allow to directly write string.
WriteString(s string) (n int, err error)
// Size returns the number of bytes already written into the response body.
// -1: not already written
Size() int
}

type responseWriter struct {
conn io.Writer
size int
}

func newResponseWriter(c net.Conn) *responseWriter {
return &responseWriter{
conn: c,
size: noWritten,
}
}

// Write implements the ResponseWriter interface.
func (w *responseWriter) Size() int {
return w.size
}

// Write implements the ResponseWriter interface.
func (w *responseWriter) Write(p []byte) (n int, err error) {
n, err = w.conn.Write(p)
w.incr(n)
return
}

// Write implements the ResponseWriter interface.
func (w *responseWriter) WriteString(s string) (n int, err error) {
n, err = io.WriteString(w.conn, s)
w.incr(n)
return
}

func (w *responseWriter) incr(n int) {
if n == noWritten {
n = 0
}
w.size += n
}

func (w *responseWriter) rebase(conn io.Writer) {
w.conn = conn
w.size = noWritten
}
Loading

0 comments on commit 8d6e789

Please sign in to comment.