diff --git a/pkg/twcc/twcc.go b/pkg/twcc/twcc.go index 5dd8fd31..7f7b857a 100644 --- a/pkg/twcc/twcc.go +++ b/pkg/twcc/twcc.go @@ -6,6 +6,7 @@ package twcc import ( "math" + "sort" "github.com/pion/rtcp" ) @@ -44,10 +45,8 @@ func (r *Recorder) Record(mediaSSRC uint32, sequenceNumber uint16, arrivalTime i if sequenceNumber < 0x0fff && (r.lastSequenceNumber&0xffff) > 0xf000 { r.cycles += 1 << 16 } - r.receivedPackets = insertSorted(r.receivedPackets, pktInfo{ - sequenceNumber: r.cycles | uint32(sequenceNumber), - arrivalTime: arrivalTime, - }) + pkt := pktInfo{r.cycles | uint32(sequenceNumber), arrivalTime} + r.receivedPackets = append(r.receivedPackets, pkt) r.lastSequenceNumber = sequenceNumber } @@ -56,45 +55,41 @@ func (r *Recorder) PacketsHeld() int { return len(r.receivedPackets) } -func insertSorted(list []pktInfo, element pktInfo) []pktInfo { - if len(list) == 0 { - return append(list, element) - } - for i := len(list) - 1; i >= 0; i-- { - if list[i].sequenceNumber < element.sequenceNumber { - list = append(list, pktInfo{}) - copy(list[i+2:], list[i+1:]) - list[i+1] = element - return list - } - if list[i].sequenceNumber == element.sequenceNumber { - list[i] = element - return list - } - } - // element.sequenceNumber is between 0 and first ever received sequenceNumber - return append([]pktInfo{element}, list...) -} - // BuildFeedbackPacket creates a new RTCP packet containing a TWCC feedback report. func (r *Recorder) BuildFeedbackPacket() []rtcp.Packet { - if len(r.receivedPackets) < 2 { + // sort received packets by sequence number, with earliest arrivalTime first in cases of duplicates + canBuild := false + sort.Slice(r.receivedPackets, func(i, j int) bool { + if r.receivedPackets[i].sequenceNumber == r.receivedPackets[j].sequenceNumber { + return r.receivedPackets[i].arrivalTime < r.receivedPackets[j].arrivalTime + } + canBuild = true // need at least 2 non-duplicate packets + return r.receivedPackets[i].sequenceNumber < r.receivedPackets[j].sequenceNumber + }) + if !canBuild { return nil } feedback := newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt) r.fbPktCnt++ - feedback.setBase(uint16(r.receivedPackets[0].sequenceNumber&0xffff), r.receivedPackets[0].arrivalTime) + feedback.setBase( + uint16(r.receivedPackets[0].sequenceNumber&0xffff), + r.receivedPackets[0].arrivalTime, + ) var pkts []rtcp.Packet - for _, pkt := range r.receivedPackets { - ok := feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime) - if !ok { - pkts = append(pkts, feedback.getRTCP()) - feedback = newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt) - r.fbPktCnt++ - feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime) + var prevSN uint32 + for i, pkt := range r.receivedPackets { + if i == 0 || pkt.sequenceNumber != prevSN { + ok := feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime) + if !ok { + pkts = append(pkts, feedback.getRTCP()) + feedback = newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt) + r.fbPktCnt++ + feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime) + } } + prevSN = pkt.sequenceNumber } r.receivedPackets = []pktInfo{} pkts = append(pkts, feedback.getRTCP()) diff --git a/pkg/twcc/twcc_test.go b/pkg/twcc/twcc_test.go index ac8df59b..5ada7a00 100644 --- a/pkg/twcc/twcc_test.go +++ b/pkg/twcc/twcc_test.go @@ -4,7 +4,6 @@ package twcc import ( - "fmt" "testing" "github.com/pion/rtcp" @@ -572,10 +571,10 @@ func TestDuplicatePackets(t *testing.T) { arrivalTime := int64(scaleFactorReferenceTime) addRun(t, r, []uint16{12, 13, 13, 14}, []int64{ - arrivalTime, - arrivalTime, - arrivalTime, - arrivalTime, + increaseTime(&arrivalTime, rtcp.TypeTCCDeltaScaleFactor), + arrivalTime + rtcp.TypeTCCDeltaScaleFactor*256, + increaseTime(&arrivalTime, rtcp.TypeTCCDeltaScaleFactor), + increaseTime(&arrivalTime, rtcp.TypeTCCDeltaScaleFactor), }) rtcpPackets := r.BuildFeedbackPacket() @@ -603,9 +602,9 @@ func TestDuplicatePackets(t *testing.T) { }, }, RecvDeltas: []*rtcp.RecvDelta{ - {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, - {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, - {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250}, }, }, rtcpToTwcc(t, rtcpPackets)[0]) marshalAll(t, rtcpPackets) @@ -802,146 +801,7 @@ func TestReorderedPackets(t *testing.T) { marshalAll(t, rtcpPackets) } -func TestInsertSorted(t *testing.T) { - cases := []struct { - l []pktInfo - e pktInfo - expected []pktInfo - }{ - { - l: []pktInfo{}, - e: pktInfo{}, - expected: []pktInfo{{ - sequenceNumber: 0, - arrivalTime: 0, - }}, - }, - { - l: []pktInfo{ - { - sequenceNumber: 0, - arrivalTime: 0, - }, - { - sequenceNumber: 1, - arrivalTime: 0, - }, - }, - e: pktInfo{ - sequenceNumber: 2, - arrivalTime: 0, - }, - expected: []pktInfo{ - { - sequenceNumber: 0, - arrivalTime: 0, - }, - { - sequenceNumber: 1, - arrivalTime: 0, - }, - { - sequenceNumber: 2, - arrivalTime: 0, - }, - }, - }, - { - l: []pktInfo{ - { - sequenceNumber: 0, - arrivalTime: 0, - }, - { - sequenceNumber: 2, - arrivalTime: 0, - }, - }, - e: pktInfo{ - sequenceNumber: 1, - arrivalTime: 0, - }, - expected: []pktInfo{ - { - sequenceNumber: 0, - arrivalTime: 0, - }, - { - sequenceNumber: 1, - arrivalTime: 0, - }, - { - sequenceNumber: 2, - arrivalTime: 0, - }, - }, - }, - { - l: []pktInfo{ - { - sequenceNumber: 0, - arrivalTime: 0, - }, - { - sequenceNumber: 1, - arrivalTime: 0, - }, - { - sequenceNumber: 2, - arrivalTime: 0, - }, - }, - e: pktInfo{ - sequenceNumber: 1, - arrivalTime: 0, - }, - expected: []pktInfo{ - { - sequenceNumber: 0, - arrivalTime: 0, - }, - { - sequenceNumber: 1, - arrivalTime: 0, - }, - { - sequenceNumber: 2, - arrivalTime: 0, - }, - }, - }, - { - l: []pktInfo{ - { - sequenceNumber: 10, - arrivalTime: 0, - }, - }, - e: pktInfo{ - sequenceNumber: 9, - arrivalTime: 0, - }, - expected: []pktInfo{ - { - sequenceNumber: 9, - arrivalTime: 0, - }, - { - sequenceNumber: 10, - arrivalTime: 0, - }, - }, - }, - } - for i, c := range cases { - c := c - t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - assert.Equal(t, c.expected, insertSorted(c.l, c.e)) - }) - } -} - -func TestPacketsHheld(t *testing.T) { +func TestPacketsHeld(t *testing.T) { r := NewRecorder(5000) assert.Zero(t, r.PacketsHeld())