Skip to content

Commit

Permalink
router: minor cleanup
Browse files Browse the repository at this point in the history
* Improved variables naming
* move metrics updates out of runForwarder
  • Loading branch information
jiceatscion committed Oct 26, 2023
1 parent 8f3acaf commit 7d17454
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 45 deletions.
88 changes: 47 additions & 41 deletions router/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,82 +910,88 @@ func (p *slowPathPacketProcessor) processPacket(pkt slowPacket) (processResult,
}
}

func updateOutputMetrics(metrics interfaceMetrics, packets []packet) {
// We need to collect stats by traffic type and size class.
// Try to reduce the metrics lookup penalty by using some
// simpler staging data structure.
writtenPkts := [ttMax][maxSizeClass]int{}
writtenBytes := [ttMax][maxSizeClass]int{}
for _, p := range packets {
s := len(p.rawPacket)
sc := classOfSize(s)
tt := p.trafficType
writtenPkts[tt][sc]++
writtenBytes[tt][sc] += s
}
for t := ttOther; t < ttMax; t++ {
for sc := sizeClass(0); sc < maxSizeClass; sc++ {
if writtenPkts[t][sc] > 0 {
metrics[sc].Output[t].OutputPacketsTotal.Add(
float64(writtenPkts[t][sc]))
metrics[sc].Output[t].OutputBytesTotal.Add(
float64(writtenBytes[t][sc]))
}
}
}
}

func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn,
cfg *RunConfig, c <-chan packet) {

fmt.Println("Initialize forwarder for", "interface")
log.Debug("Initialize forwarder for", "interface", ifID)

// We use this somewhat like a ring buffer.
readPkts := make([]packet, cfg.BatchSize)
pkts := make([]packet, cfg.BatchSize)

// We use this as a temporary buffer, but allocate it just once
// to save on garbage handling.
writeMsgs := make(underlayconn.Messages, cfg.BatchSize)
for i := range writeMsgs {
writeMsgs[i].Buffers = make([][]byte, 1)
msgs := make(underlayconn.Messages, cfg.BatchSize)
for i := range msgs {
msgs[i].Buffers = make([][]byte, 1)
}

metrics := d.forwardingMetrics[ifID]

remaining := 0
toWrite := 0
for d.running {
available := readUpTo(c, cfg.BatchSize-remaining, remaining == 0,
readPkts[remaining:])
available += remaining
toWrite += readUpTo(c, cfg.BatchSize-toWrite, toWrite == 0,
pkts[toWrite:])

// Turn the packets into underlay messages that WriteBatch can send.
for i, p := range readPkts[:available] {
writeMsgs[i].Buffers[0] = p.rawPacket
writeMsgs[i].Addr = nil
for i, p := range pkts[:toWrite] {
msgs[i].Buffers[0] = p.rawPacket
msgs[i].Addr = nil
if p.dstAddr != nil {
writeMsgs[i].Addr = p.dstAddr
msgs[i].Addr = p.dstAddr
}
}
written, _ := conn.WriteBatch(writeMsgs[:available], 0)
written, _ := conn.WriteBatch(msgs[:toWrite], 0)
if written < 0 {
// WriteBatch returns -1 on error, we just consider this as
// 0 packets written
written = 0
}

// We need to collect stats by traffic type and size class.
// Try to reduce the metrics lookup penalty by using some
// simpler staging data structure.
writtenPkts := [ttMax][maxSizeClass]int{}
writtenBytes := [ttMax][maxSizeClass]int{}
for _, p := range readPkts[:written] {
s := len(p.rawPacket)
sc := classOfSize(s)
tt := p.trafficType
writtenPkts[tt][sc]++
writtenBytes[tt][sc] += s
updateOutputMetrics(metrics, pkts[:written])

for _, p := range pkts[:written] {
d.returnPacketToPool(p.rawPacket)
}
for t := ttOther; t < ttMax; t++ {
for sc := sizeClass(0); sc < maxSizeClass; sc++ {
if writtenPkts[t][sc] > 0 {
metrics[sc].Output[t].OutputPacketsTotal.Add(
float64(writtenPkts[t][sc]))
metrics[sc].Output[t].OutputBytesTotal.Add(
float64(writtenBytes[t][sc]))
}
}
}

if written != available {
if written != toWrite {
// Only one is dropped at this time. We'll retry the rest.
sc := classOfSize(len(readPkts[written].rawPacket))
sc := classOfSize(len(pkts[written].rawPacket))
metrics[sc].DroppedPacketsInvalid.Inc()
d.returnPacketToPool(readPkts[written].rawPacket)
remaining = available - written - 1
d.returnPacketToPool(pkts[written].rawPacket)
toWrite -= (written + 1)
// Shift the leftovers to the head of the buffers.
for i := 0; i < remaining; i++ {
readPkts[i] = readPkts[i+written+1]
for i := 0; i < toWrite; i++ {
pkts[i] = pkts[i+written+1]
}

} else {
remaining = 0
toWrite = 0
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions router/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ type sizeClass uint8
const maxSizeClass sizeClass = 15

// This will failto compile if bufSize cannot fit in (maxSizeClass - 1) bits.
const _ = uint(1 << (maxSizeClass - 1) - 1 - bufSize)
const _ = uint(1<<(maxSizeClass-1) - 1 - bufSize)

// minSizeClass is the smallest sizeClass that we care about.
// All smaller classes are conflated with this one.
Expand Down Expand Up @@ -262,11 +262,11 @@ func (t trafficType) String() string {
case ttOut:
return "out"
case ttInTransit:
return "inTransit"
return "in_transit"
case ttOutTransit:
return "outTransit"
return "out_transit"
case ttBrTransit:
return "brTransit"
return "br_transit"
}
return "other"
}
Expand Down

0 comments on commit 7d17454

Please sign in to comment.