Skip to content

Commit

Permalink
router: Simplified the relationship between readUpTo and runForwarder.
Browse files Browse the repository at this point in the history
... may be.
  • Loading branch information
jiceatscion committed Oct 25, 2023
1 parent 90b0441 commit 231c59a
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions router/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,13 +917,11 @@ func (p *slowPathPacketProcessor) processPacket(pkt slowPacket) (processResult,
func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn,
cfg *RunConfig, c <-chan packet) {

fmt.Println("Initialize forwarder for", "interface")
log.Debug("Initialize forwarder for", "interface", ifID)
readPkts := make([]packet, cfg.BatchSize)
writeMsgs := make(underlayconn.Messages, cfg.BatchSize)

// The traffic type is part of the Packet structure which readUpTo consumes.
// readUpTo() has been persuaded to give it back to us.
trafficTypes := make([]trafficType, cfg.BatchSize)

for i := range writeMsgs {
writeMsgs[i].Buffers = make([][]byte, 1)
}
Expand All @@ -933,7 +931,17 @@ func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn,
remaining := 0
for d.running {
available := readUpTo(c, cfg.BatchSize-remaining, remaining == 0,
writeMsgs[remaining:], trafficTypes[remaining:])
readPkts[remaining:])
// Turn the packets into underlay messages that WriteBatch can send.
// Note: we could use a disposable array for that, but we'd have to
// redo the copies for the packets that we retry.
for i := remaining; i < remaining+available; i++ {
writeMsgs[i].Buffers[0] = readPkts[i].rawPacket
writeMsgs[i].Addr = nil
if readPkts[i].dstAddr != nil {
writeMsgs[i].Addr = readPkts[i].dstAddr
}
}
available += remaining
written, _ := conn.WriteBatch(writeMsgs[:available], 0)
if written < 0 {
Expand All @@ -948,11 +956,12 @@ func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn,
writtenPkts := [ttMax][maxSizeClass]int{}
writtenBytes := [ttMax][maxSizeClass]int{}
for i := range writeMsgs[:written] {
tt := trafficTypes[i]
sc := classOfSize(len(writeMsgs[i].Buffers[0]))
s := len(readPkts[i].rawPacket)
sc := classOfSize(s)
tt := readPkts[i].trafficType
writtenPkts[tt][sc]++
writtenBytes[tt][sc] += len(writeMsgs[i].Buffers[0])
d.returnPacketToPool(writeMsgs[i].Buffers[0])
writtenBytes[tt][sc] += s
d.returnPacketToPool(readPkts[i].rawPacket)
}
for t := ttOther; t < ttMax; t++ {
for sc := sizeClass(0); sc < maxSizeClass; sc++ {
Expand All @@ -967,39 +976,36 @@ func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn,

if written != available {
// Only one is dropped at this time. We'll retry the rest.
sc := classOfSize(len(writeMsgs[written].Buffers[0]))
sc := classOfSize(len(readPkts[written].rawPacket))
metrics[sc].DroppedPacketsInvalid.Inc()
d.returnPacketToPool(writeMsgs[written].Buffers[0])
d.returnPacketToPool(readPkts[written].rawPacket)
remaining = available - written - 1
// Shift the leftovers to the head of the buffers.
// Do not whipe originals, there's no garbage to be found there.
for i := 0; i < remaining; i++ {
// Can't just copy writeMsg[n] whole. Buffers is a slice.
// If we shallow copy, we end-up sharing the set of buffers
// between the elements of writeMsgs.
writeMsgs[i].Buffers[0] = writeMsgs[i+written+1].Buffers[0]
writeMsgs[i].Addr = writeMsgs[i+written+1].Addr
trafficTypes[i] = trafficTypes[i+written+1]
readPkts[i] = readPkts[i+written+1]
}

} else {
remaining = 0
}
}
}

func readUpTo(c <-chan packet,
n int, needsBlocking bool, msg []ipv4.Message, trafficTypes []trafficType) int {

assign := func(p packet, m *ipv4.Message) {
m.Buffers[0] = p.rawPacket
m.Addr = nil
if p.dstAddr != nil {
m.Addr = p.dstAddr
}
}
n int, needsBlocking bool, pkts []packet) int {
i := 0
if needsBlocking {
p, ok := <-c
if !ok {
return i
}
assign(p, &msg[i])
trafficTypes[i] = p.trafficType
pkts[i] = p
i++
}

Expand All @@ -1009,8 +1015,7 @@ func readUpTo(c <-chan packet,
if !ok {
return i
}
assign(p, &msg[i])
trafficTypes[i] = p.trafficType
pkts[i] = p
default:
return i
}
Expand Down

0 comments on commit 231c59a

Please sign in to comment.