-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_multiplexer.go
131 lines (116 loc) · 3.51 KB
/
client_multiplexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package quic
import (
"bytes"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
)
var (
clientMuxerOnce sync.Once
clientMuxer multiplexer
)
type multiplexer interface {
AddConn(net.PacketConn, int) (packetHandlerManager, error)
AddHandler(net.PacketConn, protocol.ConnectionID, packetHandler) error
}
type connManager struct {
connIDLen int
manager packetHandlerManager
}
// The clientMultiplexer listens on multiple net.PacketConns and dispatches
// incoming packets to the session handler.
type clientMultiplexer struct {
mutex sync.Mutex
conns map[net.PacketConn]connManager
newPacketHandlerManager func() packetHandlerManager // so it can be replaced in the tests
logger utils.Logger
}
var _ multiplexer = &clientMultiplexer{}
func getClientMultiplexer() multiplexer {
clientMuxerOnce.Do(func() {
clientMuxer = &clientMultiplexer{
conns: make(map[net.PacketConn]connManager),
logger: utils.DefaultLogger.WithPrefix("client muxer"),
newPacketHandlerManager: newPacketHandlerMap,
}
})
return clientMuxer
}
func (m *clientMultiplexer) AddConn(c net.PacketConn, connIDLen int) (packetHandlerManager, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
p, ok := m.conns[c]
if !ok {
manager := m.newPacketHandlerManager()
p = connManager{connIDLen: connIDLen, manager: manager}
m.conns[c] = p
// If we didn't know this packet conn before, listen for incoming packets
// and dispatch them to the right sessions.
go m.listen(c, p)
}
if p.connIDLen != connIDLen {
return nil, fmt.Errorf("cannot use %d byte connection IDs on a connection that is already using %d byte connction IDs", connIDLen, p.connIDLen)
}
return p.manager, nil
}
func (m *clientMultiplexer) AddHandler(c net.PacketConn, connID protocol.ConnectionID, handler packetHandler) error {
p, ok := m.conns[c]
if !ok {
return errors.New("unknown packet conn %s")
}
p.manager.Add(connID, handler)
return nil
}
func (m *clientMultiplexer) listen(c net.PacketConn, p connManager) {
panic("not supported")
for {
data := *getPacketBuffer()
data = data[:protocol.MaxReceivePacketSize]
// The packet size should not exceed protocol.MaxReceivePacketSize bytes
// If it does, we only read a truncated packet, which will then end up undecryptable
n, addr, err := c.ReadFrom(data)
if err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
p.manager.Close(err)
}
return
}
data = data[:n]
rcvTime := time.Now()
r := bytes.NewReader(data)
iHdr, err := wire.ParseInvariantHeader(r, p.connIDLen)
// drop the packet if we can't parse the header
if err != nil {
m.logger.Debugf("error parsing invariant header from %s: %s", addr, err)
continue
}
client, ok := p.manager.Get(iHdr.DestConnectionID)
if !ok {
m.logger.Debugf("received a packet with an unexpected connection ID %s", iHdr.DestConnectionID)
continue
}
if client == nil {
// Late packet for closed session
continue
}
hdr, err := iHdr.Parse(r, protocol.PerspectiveServer, client.GetVersion())
if err != nil {
m.logger.Debugf("error parsing header from %s: %s", addr, err)
continue
}
hdr.Raw = data[:len(data)-r.Len()]
packetData := data[len(data)-r.Len():]
client.handlePacket(&receivedPacket{
remoteAddr: addr,
header: hdr,
data: packetData,
rcvTime: rcvTime,
})
}
}