Skip to content

Commit

Permalink
Check combining while building pending messages instead of while send…
Browse files Browse the repository at this point in the history
…ing out.
  • Loading branch information
fancycode committed Oct 21, 2020
1 parent dd327a8 commit e6f6f0a
Showing 1 changed file with 31 additions and 33 deletions.
64 changes: 31 additions & 33 deletions src/signaling/clientsession.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ type ClientSession struct {
publishers map[string]McuPublisher
subscribers map[string]McuSubscriber

pendingClientMessages []*ServerMessage
pendingClientMessages []*ServerMessage
hasPendingChat bool
hasPendingParticipantsUpdate bool
}

func NewClientSession(hub *Hub, privateId string, publicId string, data *SessionIdData, backend *Backend, hello *HelloClientMessage, auth *BackendClientAuthResponse) (*ClientSession, error) {
Expand Down Expand Up @@ -511,6 +513,16 @@ func (s *ClientSession) SendMessage(message *ServerMessage) bool {
return s.sendMessageUnlocked(message)
}

func (s *ClientSession) SendMessages(messages []*ServerMessage) bool {
s.mu.Lock()
defer s.mu.Unlock()

for _, message := range messages {
s.sendMessageUnlocked(message)
}
return true
}

func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -676,6 +688,17 @@ func (s *ClientSession) processClientMessage(msg *nats.Msg) {
}

func (s *ClientSession) storePendingMessage(message *ServerMessage) {
if message.IsChatRefresh() {
if s.hasPendingChat {
// Only send a single "chat-refresh" message on resume.
return
}

s.hasPendingChat = true
}
if !s.hasPendingParticipantsUpdate && message.IsParticipantsUpdate() {
s.hasPendingParticipantsUpdate = true
}
s.pendingClientMessages = append(s.pendingClientMessages, message)
if len(s.pendingClientMessages) >= warnPendingMessagesCount {
log.Printf("Session %s has %d pending messages", s.PublicId(), len(s.pendingClientMessages))
Expand Down Expand Up @@ -732,24 +755,6 @@ func (s *ClientSession) processNatsMessage(msg *NatsMessage) *ServerMessage {
}
}

func (s *ClientSession) combinePendingMessages(messages []*ServerMessage) ([]*ServerMessage, error) {
var result []*ServerMessage
has_chat := false
for _, message := range messages {
if message.IsChatRefresh() {
if has_chat {
// Only send a single chat refresh message to the client.
continue
}

has_chat = true
}

result = append(result, message)
}
return result, nil
}

func (s *ClientSession) NotifySessionResumed(client *Client) {
s.mu.Lock()
if len(s.pendingClientMessages) == 0 {
Expand All @@ -760,25 +765,18 @@ func (s *ClientSession) NotifySessionResumed(client *Client) {
return
}

messages, err := s.combinePendingMessages(s.pendingClientMessages)
messages := s.pendingClientMessages
hasPendingParticipantsUpdate := s.hasPendingParticipantsUpdate
s.pendingClientMessages = nil
s.hasPendingChat = false
s.hasPendingParticipantsUpdate = false
s.mu.Unlock()
if err != nil {
client.writeError(err)
return
}

log.Printf("Send %d pending messages to session %s", len(messages), s.PublicId())
had_participants_update := false
for _, message := range messages {
client.writeMessage(message)

if !had_participants_update {
had_participants_update = message.IsParticipantsUpdate()
}
}
// Send through session to handle connection interruptions.
s.SendMessages(messages)

if !had_participants_update {
if !hasPendingParticipantsUpdate {
// Only need to send initial participants list update if none was part of the pending messages.
if room := s.GetRoom(); room != nil {
room.NotifySessionResumed(client)
Expand Down

0 comments on commit e6f6f0a

Please sign in to comment.