Skip to content

Commit

Permalink
Rate limit forwarding PLI requests
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 committed Aug 14, 2024
1 parent ff87ea6 commit 94a405b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
2 changes: 2 additions & 0 deletions service/rtc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/pion/webrtc/v3"
"golang.org/x/time/rate"

"github.com/mattermost/mattermost/server/public/shared/mlog"
)
Expand All @@ -16,6 +17,7 @@ type call struct {
id string
sessions map[string]*session
screenSession *session
pliLimiters map[webrtc.SSRC]*rate.Limiter
metrics Metrics

mut sync.RWMutex
Expand Down
41 changes: 32 additions & 9 deletions service/rtc/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/mattermost/rtcd/service/rtc/vad"

"github.com/pion/interceptor/pkg/cc"
Expand Down Expand Up @@ -89,9 +91,10 @@ func (s *Server) addSession(cfg SessionConfig, peerConn *webrtc.PeerConnection,
if c == nil {
// call is missing, creating one
c = &call{
id: cfg.CallID,
sessions: map[string]*session{},
metrics: s.metrics,
id: cfg.CallID,
sessions: map[string]*session{},
pliLimiters: map[webrtc.SSRC]*rate.Limiter{},
metrics: s.metrics,
}
g.calls[c.id] = c
}
Expand Down Expand Up @@ -284,7 +287,14 @@ func (s *session) handleSenderRTCP(sender *webrtc.RTPSender) {
return
}
for _, pkt := range pkts {
if _, ok := pkt.(*rtcp.PictureLossIndication); ok {
if p, ok := pkt.(*rtcp.PictureLossIndication); ok {
// When a PLI is received the request is forwarded
// to the peer generating the track (e.g. presenter).

for _, dstSSRC := range p.DestinationSSRC() {
s.log.Debug("received PLI request for track", mlog.String("sessionID", s.cfg.SessionID), mlog.Uint("SSRC", dstSSRC))
}

screenSession := s.call.getScreenSession()
if screenSession == nil {
s.log.Error("screenSession should not be nil", mlog.String("sessionID", s.cfg.SessionID))
Expand All @@ -308,11 +318,24 @@ func (s *session) handleSenderRTCP(sender *webrtc.RTPSender) {
return
}

// When a PLI is received the request is forwarded
// to the peer generating the track (e.g. presenter).
if err := screenSession.rtcConn.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(screenTrack.SSRC())}}); err != nil {
s.log.Error("failed to write RTCP packet", mlog.Err(err), mlog.String("sessionID", s.cfg.SessionID))
return
s.call.mut.Lock()
// We allow at most one PLI request per second for a given SSRC to avoid overloading the sender.
// If a receiving client were to miss it due to rate limiting (e.g. joining right in the second of backoff),
// it will request it again and eventually get it.
limiter, ok := s.call.pliLimiters[screenTrack.SSRC()]
if !ok {
s.log.Debug("creating new PLI limiter for track", mlog.Uint("SSRC", screenTrack.SSRC()))
limiter = rate.NewLimiter(1, 1)
s.call.pliLimiters[screenTrack.SSRC()] = limiter
}
s.call.mut.Unlock()

if limiter.Allow() {
s.log.Debug("forwarding PLI request for track", mlog.String("sessionID", s.cfg.SessionID), mlog.Uint("SSRC", screenTrack.SSRC()))
if err := screenSession.rtcConn.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(screenTrack.SSRC())}}); err != nil {
s.log.Error("failed to write RTCP packet", mlog.Err(err), mlog.String("sessionID", s.cfg.SessionID))
return
}
}
}
}
Expand Down

0 comments on commit 94a405b

Please sign in to comment.