Skip to content

Commit

Permalink
Use publisher/subscriber ids that stay the same across reconnections.
Browse files Browse the repository at this point in the history
Otherwise it could happen that objects were kept in maps (e.g. on the
proxy server) if they reconnected to the WebRTC gateway during their
lifetime. This resulted in incorrect load calculations.
  • Loading branch information
fancycode committed Oct 26, 2020
1 parent 7681e26 commit 2119993
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions src/signaling/mcu_janus.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type mcuJanus struct {

closeChan chan bool

clientId uint64
muClients sync.Mutex
clients map[clientInterface]bool

Expand Down Expand Up @@ -430,6 +431,7 @@ type mcuJanusClient struct {
listener McuListener
mu sync.Mutex

id uint64
session uint64
roomId uint64
streamType string
Expand All @@ -447,7 +449,7 @@ type mcuJanusClient struct {
}

func (c *mcuJanusClient) Id() string {
return strconv.FormatUint(c.handleId, 10)
return strconv.FormatUint(c.id, 10)
}

func (c *mcuJanusClient) StreamType() string {
Expand Down Expand Up @@ -652,6 +654,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
mcu: m,
listener: listener,

id: atomic.AddUint64(&m.clientId, 1),
session: session,
roomId: roomId,
streamType: streamType,
Expand All @@ -676,6 +679,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
if err := client.publishNats("created"); err != nil {
log.Printf("Could not publish \"created\" event for publisher %s: %s\n", id, err)
}
log.Printf("Publisher %s is using handle %d", client.id, client.handleId)
go client.run(handle, client.closeChan)
return client, nil
}
Expand Down Expand Up @@ -743,7 +747,7 @@ func (p *mcuJanusPublisher) NotifyReconnected() {
p.mcu.mu.Lock()
p.mcu.publisherRoomIds[p.id+"|"+p.streamType] = roomId
p.mcu.mu.Unlock()
log.Printf("Publisher %s reconnected\n", p.id)
log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId)
}

func (p *mcuJanusPublisher) Close(ctx context.Context) {
Expand Down Expand Up @@ -945,6 +949,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
mcu: m,
listener: listener,

id: atomic.AddUint64(&m.clientId, 1),
roomId: roomId,
streamType: streamType,

Expand Down Expand Up @@ -1024,7 +1029,7 @@ func (p *mcuJanusSubscriber) NotifyReconnected() {
p.handle = handle
p.handleId = handle.Id
p.roomId = roomId
log.Printf("Reconnected subscriber for publisher %s\n", p.publisher)
log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId)
}

func (p *mcuJanusSubscriber) Close(ctx context.Context) {
Expand Down Expand Up @@ -1093,7 +1098,7 @@ retry:
p.roomId = roomId
p.closeChan = make(chan bool, 1)
go p.run(p.handle, p.closeChan)
log.Printf("Already connected as subscriber for %s, leaving and re-joining", p.streamType)
log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
goto retry
case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM:
fallthrough
Expand Down

0 comments on commit 2119993

Please sign in to comment.