-
Notifications
You must be signed in to change notification settings - Fork 9
/
conn.go
117 lines (94 loc) · 2.14 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package simplefixgo
import (
"bufio"
"bytes"
"context"
"fmt"
"net"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
// ErrConnClosed handles connection errors.
var ErrConnClosed = fmt.Errorf("the reader is closed")
const (
endOfMsgTag = "10="
)
// Conn is a net.Conn wrapper that is used for handling split messages.
type Conn struct {
reader chan []byte
writer chan []byte
conn net.Conn
ctx context.Context
cancel context.CancelFunc
writeDeadline time.Duration
closeOnce sync.Once
}
// NewConn is called to create a new connection.
func NewConn(ctx context.Context, conn net.Conn, msgBuffSize int, writeDeadline time.Duration) *Conn {
c := &Conn{
reader: make(chan []byte, msgBuffSize),
writer: make(chan []byte, msgBuffSize),
writeDeadline: writeDeadline,
conn: conn,
}
c.ctx, c.cancel = context.WithCancel(ctx)
return c
}
// Close is called to cancel a connection context and close a connection.
func (c *Conn) Close() {
c.closeOnce.Do(func() {
_ = c.conn.Close()
c.cancel()
})
}
func (c *Conn) serve() error {
defer close(c.writer)
defer close(c.reader)
eg := errgroup.Group{}
eg.Go(c.runReader)
return eg.Wait()
}
func (c *Conn) runReader() error {
defer c.cancel()
r := bufio.NewReader(c.conn)
var msg []byte
for {
select {
case <-c.ctx.Done():
return nil
default:
}
buff, err := r.ReadBytes(byte(1))
if err != nil {
return fmt.Errorf("read error: %w", err)
}
msg = append(msg, buff...)
if len(buff) >= 3 && bytes.Equal(buff[0:3], []byte(endOfMsgTag)) {
c.reader <- msg
msg = []byte{}
}
}
}
// Reader returns a separate channel for handing incoming messages.
func (c *Conn) Reader() <-chan []byte {
return c.reader
}
// Write is called to send messages to an outgoing socket.
func (c *Conn) Write(msg []byte) error {
select {
case <-c.ctx.Done():
return ErrConnClosed
default:
}
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeDeadline)); err != nil {
c.cancel()
return fmt.Errorf("set write deadline error: %w", err)
}
_, err := c.conn.Write(msg)
if err != nil {
c.cancel()
return fmt.Errorf("write error: %w", err)
}
return nil
}