Skip to content

Commit

Permalink
adc: switch to lineproto for both reader and writer
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Mar 17, 2019
1 parent 2ce9493 commit f1ce6b1
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 77 deletions.
67 changes: 19 additions & 48 deletions adc/conn.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package adc

import (
"bufio"
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -21,6 +21,8 @@ var (
Debug bool
)

const lineDelim = 0x0a

type Route interface {
WriteMessage(msg Message) error
Flush() error
Expand Down Expand Up @@ -75,11 +77,16 @@ func NewConn(conn net.Conn) (*Conn, error) {
c := &Conn{
conn: conn,
}
c.write.w = bufio.NewWriter(conn)
c.read = lineproto.NewReader(conn, 0x0a)
c.write = lineproto.NewWriter(conn)
c.read = lineproto.NewReader(conn, lineDelim)
if Debug {
c.write.OnLine = func(line []byte) (bool, error) {
line = bytes.TrimSuffix(line, []byte{lineDelim})
log.Println("->", string(line))
return true, nil
}
c.read.OnLine = func(line []byte) (bool, error) {
line = bytes.TrimSuffix(line, []byte{0x0a})
line = bytes.TrimSuffix(line, []byte{lineDelim})
log.Println("<-", string(line))
return true, nil
}
Expand All @@ -97,12 +104,8 @@ type Conn struct {

conn net.Conn

write struct {
sync.Mutex
err error
w *bufio.Writer
}
read *lineproto.Reader
write *lineproto.Writer
read *lineproto.Reader
}

func (c *Conn) LocalAddr() net.Addr {
Expand Down Expand Up @@ -142,7 +145,7 @@ func (c *Conn) KeepAlive(interval time.Duration) {
case <-ticker.C:
}
// empty packet serves as keep-alive for ADC
err := c.writeRawPacket(nil)
err := c.writeRawPacket([]byte{lineDelim})
if err == nil {
err = c.Flush()
}
Expand Down Expand Up @@ -177,10 +180,10 @@ func (c *Conn) readPacket(deadline time.Time) ([]byte, error) {
s, err := c.read.ReadLine()
if err != nil {
return nil, err
} else if len(s) == 0 || s[len(s)-1] != lineDelim {
return nil, errors.New("invalid packet delimiter")
}
// trim delimiter
s = s[:len(s)-1]
if len(s) != 0 {
if len(s) > 1 {
return s, nil
}
// clients may send message containing only 0x0a byte
Expand Down Expand Up @@ -323,48 +326,16 @@ func (c *Conn) writeRawPacket(s []byte) error {
c.bin.RLock()
defer c.bin.RUnlock()

c.write.Lock()
defer c.write.Unlock()

if err := c.write.err; err != nil {
return err
}
if Debug {
log.Println("->", string(s))
}
_, err := c.write.w.Write(s)
if err != nil {
c.write.err = err
}
err = c.write.w.WriteByte(0x0a)
if err != nil {
c.write.err = err
}
return err
return c.write.WriteLine(s)
}

// Flush the underlying buffer. Should be called after each WritePacket batch.
func (c *Conn) Flush() error {
if Debug {
log.Println("-> [flushed]")
}

// make sure connection is not in binary mode
c.bin.RLock()
defer c.bin.RUnlock()

c.write.Lock()
defer c.write.Unlock()

if err := c.write.err; err != nil {
return err
}

err := c.write.w.Flush()
if err != nil {
c.write.err = err
}
return err
return c.write.Flush()
}

/*
Expand Down
Loading

0 comments on commit f1ce6b1

Please sign in to comment.