Skip to content

Commit

Permalink
Defer packet ordering until building RTCP packet
Browse files Browse the repository at this point in the history
The previous implementation did a sorted insert (O(m)) on
every call to `Record`. If sorting is deferred until a
feedback packet is built, we can record in constant time
and build a packet in O(nlogn). Ordering isn't required
until building the packet anyway, and deferring nets a minor
performance gain (I say minor since its unlikely there are a
large number of received packets in the buffer prior to
building a feedback packet). If ordering is really needed on
record, we could use something like a B-tree to get O(logn)
sorted inserts.
  • Loading branch information
treyhakanson committed Aug 29, 2023
1 parent c37a592 commit a646d66
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 181 deletions.
61 changes: 28 additions & 33 deletions pkg/twcc/twcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package twcc

import (
"math"
"sort"

"github.com/pion/rtcp"
)
Expand Down Expand Up @@ -44,10 +45,8 @@ func (r *Recorder) Record(mediaSSRC uint32, sequenceNumber uint16, arrivalTime i
if sequenceNumber < 0x0fff && (r.lastSequenceNumber&0xffff) > 0xf000 {
r.cycles += 1 << 16
}
r.receivedPackets = insertSorted(r.receivedPackets, pktInfo{
sequenceNumber: r.cycles | uint32(sequenceNumber),
arrivalTime: arrivalTime,
})
pkt := pktInfo{r.cycles | uint32(sequenceNumber), arrivalTime}
r.receivedPackets = append(r.receivedPackets, pkt)
r.lastSequenceNumber = sequenceNumber
}

Expand All @@ -56,45 +55,41 @@ func (r *Recorder) PacketsHeld() int {
return len(r.receivedPackets)
}

func insertSorted(list []pktInfo, element pktInfo) []pktInfo {
if len(list) == 0 {
return append(list, element)
}
for i := len(list) - 1; i >= 0; i-- {
if list[i].sequenceNumber < element.sequenceNumber {
list = append(list, pktInfo{})
copy(list[i+2:], list[i+1:])
list[i+1] = element
return list
}
if list[i].sequenceNumber == element.sequenceNumber {
list[i] = element
return list
}
}
// element.sequenceNumber is between 0 and first ever received sequenceNumber
return append([]pktInfo{element}, list...)
}

// BuildFeedbackPacket creates a new RTCP packet containing a TWCC feedback report.
func (r *Recorder) BuildFeedbackPacket() []rtcp.Packet {
if len(r.receivedPackets) < 2 {
// sort received packets by sequence number, with earliest arrivalTime first in cases of duplicates
canBuild := false
sort.Slice(r.receivedPackets, func(i, j int) bool {
if r.receivedPackets[i].sequenceNumber == r.receivedPackets[j].sequenceNumber {
return r.receivedPackets[i].arrivalTime < r.receivedPackets[j].arrivalTime
}
canBuild = true // need at least 2 non-duplicate packets
return r.receivedPackets[i].sequenceNumber < r.receivedPackets[j].sequenceNumber
})
if !canBuild {
return nil
}

feedback := newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt)
r.fbPktCnt++
feedback.setBase(uint16(r.receivedPackets[0].sequenceNumber&0xffff), r.receivedPackets[0].arrivalTime)
feedback.setBase(
uint16(r.receivedPackets[0].sequenceNumber&0xffff),
r.receivedPackets[0].arrivalTime,
)

var pkts []rtcp.Packet
for _, pkt := range r.receivedPackets {
ok := feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
if !ok {
pkts = append(pkts, feedback.getRTCP())
feedback = newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt)
r.fbPktCnt++
feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
var prevSN uint32
for i, pkt := range r.receivedPackets {
if i == 0 || pkt.sequenceNumber != prevSN {
ok := feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
if !ok {
pkts = append(pkts, feedback.getRTCP())
feedback = newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt)
r.fbPktCnt++
feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
}

Check warning on line 90 in pkg/twcc/twcc.go

View check run for this annotation

Codecov / codecov/patch

pkg/twcc/twcc.go#L86-L90

Added lines #L86 - L90 were not covered by tests
}
prevSN = pkt.sequenceNumber
}
r.receivedPackets = []pktInfo{}
pkts = append(pkts, feedback.getRTCP())
Expand Down
156 changes: 8 additions & 148 deletions pkg/twcc/twcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package twcc

import (
"fmt"
"testing"

"github.com/pion/rtcp"
Expand Down Expand Up @@ -572,10 +571,10 @@ func TestDuplicatePackets(t *testing.T) {

arrivalTime := int64(scaleFactorReferenceTime)
addRun(t, r, []uint16{12, 13, 13, 14}, []int64{
arrivalTime,
arrivalTime,
arrivalTime,
arrivalTime,
increaseTime(&arrivalTime, rtcp.TypeTCCDeltaScaleFactor),
arrivalTime + rtcp.TypeTCCDeltaScaleFactor*256,
increaseTime(&arrivalTime, rtcp.TypeTCCDeltaScaleFactor),
increaseTime(&arrivalTime, rtcp.TypeTCCDeltaScaleFactor),
})

rtcpPackets := r.BuildFeedbackPacket()
Expand Down Expand Up @@ -603,9 +602,9 @@ func TestDuplicatePackets(t *testing.T) {
},
},
RecvDeltas: []*rtcp.RecvDelta{
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250},
},
}, rtcpToTwcc(t, rtcpPackets)[0])
marshalAll(t, rtcpPackets)
Expand Down Expand Up @@ -802,146 +801,7 @@ func TestReorderedPackets(t *testing.T) {
marshalAll(t, rtcpPackets)
}

func TestInsertSorted(t *testing.T) {
cases := []struct {
l []pktInfo
e pktInfo
expected []pktInfo
}{
{
l: []pktInfo{},
e: pktInfo{},
expected: []pktInfo{{
sequenceNumber: 0,
arrivalTime: 0,
}},
},
{
l: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
},
e: pktInfo{
sequenceNumber: 2,
arrivalTime: 0,
},
expected: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
},
{
l: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
e: pktInfo{
sequenceNumber: 1,
arrivalTime: 0,
},
expected: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
},
{
l: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
e: pktInfo{
sequenceNumber: 1,
arrivalTime: 0,
},
expected: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
},
{
l: []pktInfo{
{
sequenceNumber: 10,
arrivalTime: 0,
},
},
e: pktInfo{
sequenceNumber: 9,
arrivalTime: 0,
},
expected: []pktInfo{
{
sequenceNumber: 9,
arrivalTime: 0,
},
{
sequenceNumber: 10,
arrivalTime: 0,
},
},
},
}
for i, c := range cases {
c := c
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
assert.Equal(t, c.expected, insertSorted(c.l, c.e))
})
}
}

func TestPacketsHheld(t *testing.T) {
func TestPacketsHeld(t *testing.T) {
r := NewRecorder(5000)
assert.Zero(t, r.PacketsHeld())

Expand Down

0 comments on commit a646d66

Please sign in to comment.