Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve jitterbuffer performance and de-duplicate code #279

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions internal/rtpbuffer/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpbuffer

import "errors"

// ErrInvalidSize is returned by newReceiveLog/newRTPBuffer, when an incorrect buffer size is supplied.
var ErrInvalidSize = errors.New("invalid buffer size")

var (
errPacketReleased = errors.New("could not retain packet, already released")
errFailedToCastHeaderPool = errors.New("could not access header pool, failed cast")
errFailedToCastPayloadPool = errors.New("could not access payload pool, failed cast")
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package nack
package rtpbuffer

import (
"encoding/binary"
Expand All @@ -11,16 +11,22 @@ import (
"github.com/pion/rtp"
)

const maxPayloadLen = 1460
// PacketFactory allows custom logic around the handle of RTP Packets before they added to the RTPBuffer.
// The NoOpPacketFactory doesn't copy packets, while the RetainablePacket will take a copy before adding
type PacketFactory interface {
NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error)
}

type packetManager struct {
// PacketFactoryCopy is PacketFactory that takes a copy of packets when added to the RTPBuffer
type PacketFactoryCopy struct {
headerPool *sync.Pool
payloadPool *sync.Pool
rtxSequencer rtp.Sequencer
}

func newPacketManager() *packetManager {
return &packetManager{
// NewPacketFactoryCopy constructs a PacketFactory that takes a copy of packets when added to the RTPBuffer
func NewPacketFactoryCopy() *PacketFactoryCopy {
return &PacketFactoryCopy{
headerPool: &sync.Pool{
New: func() interface{} {
return &rtp.Header{}
Expand All @@ -36,12 +42,15 @@ func newPacketManager() *packetManager {
}
}

func (m *packetManager) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*retainablePacket, error) {
const maxPayloadLen = 1460

// NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer
func (m *PacketFactoryCopy) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error) {
if len(payload) > maxPayloadLen {
return nil, io.ErrShortBuffer
}

p := &retainablePacket{
p := &RetainablePacket{
onRelease: m.releasePacket,
sequenceNumber: header.SequenceNumber,
// new packets have retain count of 1
Expand Down Expand Up @@ -92,17 +101,19 @@ func (m *packetManager) NewPacket(header *rtp.Header, payload []byte, rtxSsrc ui
return p, nil
}

func (m *packetManager) releasePacket(header *rtp.Header, payload *[]byte) {
func (m *PacketFactoryCopy) releasePacket(header *rtp.Header, payload *[]byte) {
m.headerPool.Put(header)
if payload != nil {
m.payloadPool.Put(payload)
}
}

type noOpPacketFactory struct{}
// PacketFactoryNoOp is a PacketFactory implementation that doesn't copy packets
type PacketFactoryNoOp struct{}

func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte, _ uint32, _ uint8) (*retainablePacket, error) {
return &retainablePacket{
// NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer
func (f *PacketFactoryNoOp) NewPacket(header *rtp.Header, payload []byte, _ uint32, _ uint8) (*RetainablePacket, error) {
return &RetainablePacket{
onRelease: f.releasePacket,
count: 1,
header: header,
Expand All @@ -111,52 +122,17 @@ func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte, _ uint
}, nil
}

func (f *noOpPacketFactory) releasePacket(_ *rtp.Header, _ *[]byte) {
func (f *PacketFactoryNoOp) releasePacket(_ *rtp.Header, _ *[]byte) {
// no-op
}

type retainablePacket struct {
onRelease func(*rtp.Header, *[]byte)

countMu sync.Mutex
count int

header *rtp.Header
buffer *[]byte
payload []byte

sequenceNumber uint16
}

func (p *retainablePacket) Header() *rtp.Header {
return p.header
}

func (p *retainablePacket) Payload() []byte {
return p.payload
}

func (p *retainablePacket) Retain() error {
p.countMu.Lock()
defer p.countMu.Unlock()
if p.count == 0 {
// already released
return errPacketReleased
}
p.count++
return nil
}

func (p *retainablePacket) Release() {
p.countMu.Lock()
defer p.countMu.Unlock()
p.count--

if p.count == 0 {
// release back to pool
p.onRelease(p.header, p.buffer)
p.header = nil
p.buffer = nil
p.payload = nil
// NewRetainablePacketFromRTPPacket creates a RetainablePacket that embeds a RTP Packet directly
func NewRetainablePacketFromRTPPacket(pkt *rtp.Packet) *RetainablePacket {
return &RetainablePacket{
onRelease: func(*rtp.Header, *[]byte) {},
count: 1,
packet: pkt,
header: &pkt.Header,
sequenceNumber: pkt.Header.SequenceNumber,
}
}
79 changes: 79 additions & 0 deletions internal/rtpbuffer/retainable_packet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpbuffer

import (
"sync"

"github.com/pion/rtp"
)

// RetainablePacket is a referenced counted RTP packet
type RetainablePacket struct {
onRelease func(*rtp.Header, *[]byte)

countMu sync.Mutex
count int

packet *rtp.Packet
header *rtp.Header
buffer *[]byte
payload []byte

sequenceNumber uint16
}

// Header returns the RTP Header of the RetainablePacket
func (p *RetainablePacket) Header() *rtp.Header {
return p.header
}

// Payload returns the RTP Payload of the RetainablePacket
func (p *RetainablePacket) Payload() []byte {
return p.payload
}

// Packet returns a RTP Packet for a RetainablePacket
func (p *RetainablePacket) Packet() *rtp.Packet {
return p.packet
}

// Retain increases the reference count of the RetainablePacket
func (p *RetainablePacket) Retain() error {
p.countMu.Lock()
defer p.countMu.Unlock()
if p.count == 0 {
// already released
return errPacketReleased
}
p.count++
return nil
}

// Release decreases the reference count of the RetainablePacket and frees if needed
func (p *RetainablePacket) Release(force bool) {
p.countMu.Lock()
defer p.countMu.Unlock()

if !force {
p.count--
} else {
p.count = 0
}

if p.count == 0 {
// release back to pool
p.onRelease(p.header, p.buffer)
p.header = nil
p.buffer = nil
p.payload = nil
}
}

func (p *RetainablePacket) getCount() int {
p.countMu.Lock()
defer p.countMu.Unlock()

return p.count
}
96 changes: 96 additions & 0 deletions internal/rtpbuffer/rtpbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package rtpbuffer provides a buffer for storing RTP packets
package rtpbuffer

import (
"fmt"
)

// RTPBuffer stores RTP packets and allows custom logic around the lifetime of them via the PacketFactory
type RTPBuffer struct {
packets []*RetainablePacket
size uint16
lastAdded uint16
}

// NewRTPBuffer constructs a new RTPBuffer
func NewRTPBuffer(size uint16) (*RTPBuffer, error) {
allowedSizes := []uint16{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65535}
correctSize := false
for _, v := range allowedSizes {
if v == size {
correctSize = true
break
}
}

if !correctSize {
return nil, fmt.Errorf("%w: %d is not a valid size, allowed sizes: %v", ErrInvalidSize, size, allowedSizes)
}

return &RTPBuffer{
packets: make([]*RetainablePacket, size),
size: size,
}, nil
}

// Add places the RetainablePacket in the RTPBuffer
func (r *RTPBuffer) Add(packet *RetainablePacket) {
seq := packet.sequenceNumber
idx := seq % r.size

if prevPacket := r.packets[idx]; prevPacket != nil {
prevPacket.Release(false)
}

r.packets[idx] = packet
}

// Get returns the RetainablePacket for the requested sequence number
func (r *RTPBuffer) Get(seq uint16) *RetainablePacket {
pkt := r.packets[seq%r.size]
if pkt != nil {
if pkt.sequenceNumber != seq {
return nil
}
// already released
if err := pkt.Retain(); err != nil {
return nil
}
}
return pkt
}

// GetTimestamp returns a RetainablePacket for the requested timestamp
func (r *RTPBuffer) GetTimestamp(timestamp uint32) *RetainablePacket {
for i := range r.packets {
pkt := r.packets[i]
if pkt != nil && pkt.Header() != nil && pkt.Header().Timestamp == timestamp {
if err := pkt.Retain(); err != nil {
return nil
}

return pkt
}
}
return nil
}

// Length returns the count of valid RetainablePackets in the RTPBuffer
func (r *RTPBuffer) Length() (length uint16) {
for i := range r.packets {
if r.packets[i] != nil && r.packets[i].getCount() != 0 {
length++
}
}

return
}

// Clear erases all the packets in the RTPBuffer
func (r *RTPBuffer) Clear() {
r.packets = make([]*RetainablePacket, r.size)
r.lastAdded = 0
}
Loading
Loading