Skip to content

Commit

Permalink
Move buffer for RTP packets into internal
Browse files Browse the repository at this point in the history
Can be used by NACK and JitterBuffer now

Relates to #278
  • Loading branch information
Sean-Der committed Oct 14, 2024
1 parent 0eab188 commit fa94c8c
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 237 deletions.
15 changes: 15 additions & 0 deletions internal/rtpbuffer/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpbuffer

import "errors"

// ErrInvalidSize is returned by newReceiveLog/newRTPBuffer, when an incorrect buffer size is supplied.
var ErrInvalidSize = errors.New("invalid buffer size")

var (
errPacketReleased = errors.New("could not retain packet, already released")
errFailedToCastHeaderPool = errors.New("could not access header pool, failed cast")
errFailedToCastPayloadPool = errors.New("could not access payload pool, failed cast")
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package nack
package rtpbuffer

import (
"encoding/binary"
Expand All @@ -11,16 +11,22 @@ import (
"github.com/pion/rtp"
)

const maxPayloadLen = 1460
// PacketFactory allows custom logic around the handle of RTP Packets before they added to the RTPBuffer.
// The NoOpPacketFactory doesn't copy packets, while the RetainablePacket will take a copy before adding
type PacketFactory interface {
NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error)
}

type packetManager struct {
// PacketFactoryCopy is PacketFactory that takes a copy of packets when added to the RTPBuffer
type PacketFactoryCopy struct {
headerPool *sync.Pool
payloadPool *sync.Pool
rtxSequencer rtp.Sequencer
}

func newPacketManager() *packetManager {
return &packetManager{
// NewPacketFactoryCopy constructs a PacketFactory that takes a copy of packets when added to the RTPBuffer
func NewPacketFactoryCopy() *PacketFactoryCopy {
return &PacketFactoryCopy{
headerPool: &sync.Pool{
New: func() interface{} {
return &rtp.Header{}
Expand All @@ -36,12 +42,13 @@ func newPacketManager() *packetManager {
}
}

func (m *packetManager) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*retainablePacket, error) {
// NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer
func (m *PacketFactoryCopy) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error) {
if len(payload) > maxPayloadLen {
return nil, io.ErrShortBuffer
}

p := &retainablePacket{
p := &RetainablePacket{
onRelease: m.releasePacket,
sequenceNumber: header.SequenceNumber,
// new packets have retain count of 1
Expand Down Expand Up @@ -92,17 +99,19 @@ func (m *packetManager) NewPacket(header *rtp.Header, payload []byte, rtxSsrc ui
return p, nil
}

func (m *packetManager) releasePacket(header *rtp.Header, payload *[]byte) {
func (m *PacketFactoryCopy) releasePacket(header *rtp.Header, payload *[]byte) {
m.headerPool.Put(header)
if payload != nil {
m.payloadPool.Put(payload)
}
}

type noOpPacketFactory struct{}
// PacketFactoryNoOp is a PacketFactory implementation that doesn't copy packets
type PacketFactoryNoOp struct{}

func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte, _ uint32, _ uint8) (*retainablePacket, error) {
return &retainablePacket{
// NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer
func (f *PacketFactoryNoOp) NewPacket(header *rtp.Header, payload []byte, _ uint32, _ uint8) (*RetainablePacket, error) {
return &RetainablePacket{
onRelease: f.releasePacket,
count: 1,
header: header,
Expand All @@ -111,52 +120,6 @@ func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte, _ uint
}, nil
}

func (f *noOpPacketFactory) releasePacket(_ *rtp.Header, _ *[]byte) {
func (f *PacketFactoryNoOp) releasePacket(_ *rtp.Header, _ *[]byte) {

Check warning on line 123 in internal/rtpbuffer/packet_factory.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/packet_factory.go#L123

Added line #L123 was not covered by tests
// no-op
}

type retainablePacket struct {
onRelease func(*rtp.Header, *[]byte)

countMu sync.Mutex
count int

header *rtp.Header
buffer *[]byte
payload []byte

sequenceNumber uint16
}

func (p *retainablePacket) Header() *rtp.Header {
return p.header
}

func (p *retainablePacket) Payload() []byte {
return p.payload
}

func (p *retainablePacket) Retain() error {
p.countMu.Lock()
defer p.countMu.Unlock()
if p.count == 0 {
// already released
return errPacketReleased
}
p.count++
return nil
}

func (p *retainablePacket) Release() {
p.countMu.Lock()
defer p.countMu.Unlock()
p.count--

if p.count == 0 {
// release back to pool
p.onRelease(p.header, p.buffer)
p.header = nil
p.buffer = nil
p.payload = nil
}
}
61 changes: 61 additions & 0 deletions internal/rtpbuffer/retainable_packet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpbuffer

import (
"sync"

"github.com/pion/rtp"
)

// RetainablePacket is a referenced counted RTP packet
type RetainablePacket struct {
onRelease func(*rtp.Header, *[]byte)

countMu sync.Mutex
count int

header *rtp.Header
buffer *[]byte
payload []byte

sequenceNumber uint16
}

// Header returns the RTP Header of the RetainablePacket
func (p *RetainablePacket) Header() *rtp.Header {
return p.header
}

// Payload returns the RTP Payload of the RetainablePacket
func (p *RetainablePacket) Payload() []byte {
return p.payload
}

// Retain increases the reference count of the RetainablePacket
func (p *RetainablePacket) Retain() error {
p.countMu.Lock()
defer p.countMu.Unlock()
if p.count == 0 {
// already released
return errPacketReleased

Check warning on line 42 in internal/rtpbuffer/retainable_packet.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/retainable_packet.go#L42

Added line #L42 was not covered by tests
}
p.count++
return nil
}

// Release decreases the reference count of the RetainablePacket and frees if needed
func (p *RetainablePacket) Release() {
p.countMu.Lock()
defer p.countMu.Unlock()
p.count--

if p.count == 0 {
// release back to pool
p.onRelease(p.header, p.buffer)
p.header = nil
p.buffer = nil
p.payload = nil
}
}
103 changes: 103 additions & 0 deletions internal/rtpbuffer/rtpbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package rtpbuffer provides a buffer for storing RTP packets
package rtpbuffer

import (
"fmt"
)

const (
// Uint16SizeHalf is half of a math.Uint16
Uint16SizeHalf = 1 << 15

maxPayloadLen = 1460
)

// RTPBuffer stores RTP packets and allows custom logic around the lifetime of them via the PacketFactory
type RTPBuffer struct {
packets []*RetainablePacket
size uint16
lastAdded uint16
started bool
}

// NewRTPBuffer constructs a new RTPBuffer
func NewRTPBuffer(size uint16) (*RTPBuffer, error) {
allowedSizes := make([]uint16, 0)
correctSize := false
for i := 0; i < 16; i++ {
if size == 1<<i {
correctSize = true
break
}
allowedSizes = append(allowedSizes, 1<<i)
}

if !correctSize {
return nil, fmt.Errorf("%w: %d is not a valid size, allowed sizes: %v", ErrInvalidSize, size, allowedSizes)
}

return &RTPBuffer{
packets: make([]*RetainablePacket, size),
size: size,
}, nil
}

// Add places the RetainablePacket in the RTPBuffer
func (r *RTPBuffer) Add(packet *RetainablePacket) {
seq := packet.sequenceNumber
if !r.started {
r.packets[seq%r.size] = packet
r.lastAdded = seq
r.started = true
return
}

diff := seq - r.lastAdded
if diff == 0 {
return

Check warning on line 60 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L60

Added line #L60 was not covered by tests
} else if diff < Uint16SizeHalf {
for i := r.lastAdded + 1; i != seq; i++ {
idx := i % r.size
prevPacket := r.packets[idx]
if prevPacket != nil {
prevPacket.Release()
}
r.packets[idx] = nil
}
}

idx := seq % r.size
prevPacket := r.packets[idx]
if prevPacket != nil {
prevPacket.Release()
}
r.packets[idx] = packet
r.lastAdded = seq
}

// Get returns the RetainablePacket for the requested sequence number
func (r *RTPBuffer) Get(seq uint16) *RetainablePacket {
diff := r.lastAdded - seq
if diff >= Uint16SizeHalf {
return nil

Check warning on line 85 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L85

Added line #L85 was not covered by tests
}

if diff >= r.size {
return nil
}

pkt := r.packets[seq%r.size]
if pkt != nil {
if pkt.sequenceNumber != seq {
return nil

Check warning on line 95 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L95

Added line #L95 was not covered by tests
}
// already released
if err := pkt.Retain(); err != nil {
return nil

Check warning on line 99 in internal/rtpbuffer/rtpbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/rtpbuffer/rtpbuffer.go#L99

Added line #L99 was not covered by tests
}
}
return pkt
}
Loading

0 comments on commit fa94c8c

Please sign in to comment.