Skip to content

Commit

Permalink
Merge pull request strukturag#56 from strukturag/proxy-client-cleanup
Browse files Browse the repository at this point in the history
Fix proxy client cleanup
  • Loading branch information
fancycode authored Oct 30, 2020
2 parents 4cd3370 + 2119993 commit 9206a56
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
24 changes: 12 additions & 12 deletions src/proxy/proxy_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ type ProxySession struct {

publishersLock sync.Mutex
publishers map[string]signaling.McuPublisher
publisherIds map[string]string
publisherIds map[signaling.McuPublisher]string

subscribersLock sync.Mutex
subscribers map[string]signaling.McuSubscriber
subscriberIds map[string]string
subscriberIds map[signaling.McuSubscriber]string
}

func NewProxySession(proxy *ProxyServer, sid uint64, id string) *ProxySession {
Expand All @@ -63,10 +63,10 @@ func NewProxySession(proxy *ProxyServer, sid uint64, id string) *ProxySession {
lastUsed: time.Now().UnixNano(),

publishers: make(map[string]signaling.McuPublisher),
publisherIds: make(map[string]string),
publisherIds: make(map[signaling.McuPublisher]string),

subscribers: make(map[string]signaling.McuSubscriber),
subscriberIds: make(map[string]string),
subscriberIds: make(map[signaling.McuSubscriber]string),
}
}

Expand Down Expand Up @@ -200,20 +200,20 @@ func (s *ProxySession) StorePublisher(ctx context.Context, id string, publisher
defer s.publishersLock.Unlock()

s.publishers[id] = publisher
s.publisherIds[publisher.Id()] = id
s.publisherIds[publisher] = id
}

func (s *ProxySession) DeletePublisher(publisher signaling.McuPublisher) string {
s.publishersLock.Lock()
defer s.publishersLock.Unlock()

id, found := s.publisherIds[publisher.Id()]
id, found := s.publisherIds[publisher]
if !found {
return ""
}

delete(s.publishers, id)
delete(s.publisherIds, publisher.Id())
delete(s.publisherIds, publisher)
return id
}

Expand All @@ -222,20 +222,20 @@ func (s *ProxySession) StoreSubscriber(ctx context.Context, id string, subscribe
defer s.subscribersLock.Unlock()

s.subscribers[id] = subscriber
s.subscriberIds[subscriber.Id()] = id
s.subscriberIds[subscriber] = id
}

func (s *ProxySession) DeleteSubscriber(subscriber signaling.McuSubscriber) string {
s.subscribersLock.Lock()
defer s.subscribersLock.Unlock()

id, found := s.subscriberIds[subscriber.Id()]
id, found := s.subscriberIds[subscriber]
if !found {
return ""
}

delete(s.subscribers, id)
delete(s.subscriberIds, subscriber.Id())
delete(s.subscriberIds, subscriber)
return id
}

Expand All @@ -249,7 +249,7 @@ func (s *ProxySession) clearPublishers() {
}
}(s.publishers)
s.publishers = make(map[string]signaling.McuPublisher)
s.publisherIds = make(map[string]string)
s.publisherIds = make(map[signaling.McuPublisher]string)
}

func (s *ProxySession) clearSubscribers() {
Expand All @@ -262,7 +262,7 @@ func (s *ProxySession) clearSubscribers() {
}
}(s.subscribers)
s.subscribers = make(map[string]signaling.McuSubscriber)
s.subscriberIds = make(map[string]string)
s.subscriberIds = make(map[signaling.McuSubscriber]string)
}

func (s *ProxySession) NotifyDisconnected() {
Expand Down
13 changes: 9 additions & 4 deletions src/signaling/mcu_janus.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type mcuJanus struct {

closeChan chan bool

clientId uint64
muClients sync.Mutex
clients map[clientInterface]bool

Expand Down Expand Up @@ -430,6 +431,7 @@ type mcuJanusClient struct {
listener McuListener
mu sync.Mutex

id uint64
session uint64
roomId uint64
streamType string
Expand All @@ -447,7 +449,7 @@ type mcuJanusClient struct {
}

func (c *mcuJanusClient) Id() string {
return strconv.FormatUint(c.handleId, 10)
return strconv.FormatUint(c.id, 10)
}

func (c *mcuJanusClient) StreamType() string {
Expand Down Expand Up @@ -652,6 +654,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
mcu: m,
listener: listener,

id: atomic.AddUint64(&m.clientId, 1),
session: session,
roomId: roomId,
streamType: streamType,
Expand All @@ -676,6 +679,7 @@ func (m *mcuJanus) NewPublisher(ctx context.Context, listener McuListener, id st
if err := client.publishNats("created"); err != nil {
log.Printf("Could not publish \"created\" event for publisher %s: %s\n", id, err)
}
log.Printf("Publisher %s is using handle %d", client.id, client.handleId)
go client.run(handle, client.closeChan)
return client, nil
}
Expand Down Expand Up @@ -743,7 +747,7 @@ func (p *mcuJanusPublisher) NotifyReconnected() {
p.mcu.mu.Lock()
p.mcu.publisherRoomIds[p.id+"|"+p.streamType] = roomId
p.mcu.mu.Unlock()
log.Printf("Publisher %s reconnected\n", p.id)
log.Printf("Publisher %s reconnected on handle %d", p.id, p.handleId)
}

func (p *mcuJanusPublisher) Close(ctx context.Context) {
Expand Down Expand Up @@ -945,6 +949,7 @@ func (m *mcuJanus) NewSubscriber(ctx context.Context, listener McuListener, publ
mcu: m,
listener: listener,

id: atomic.AddUint64(&m.clientId, 1),
roomId: roomId,
streamType: streamType,

Expand Down Expand Up @@ -1024,7 +1029,7 @@ func (p *mcuJanusSubscriber) NotifyReconnected() {
p.handle = handle
p.handleId = handle.Id
p.roomId = roomId
log.Printf("Reconnected subscriber for publisher %s\n", p.publisher)
log.Printf("Subscriber %d for publisher %s reconnected on handle %d", p.id, p.publisher, p.handleId)
}

func (p *mcuJanusSubscriber) Close(ctx context.Context) {
Expand Down Expand Up @@ -1093,7 +1098,7 @@ retry:
p.roomId = roomId
p.closeChan = make(chan bool, 1)
go p.run(p.handle, p.closeChan)
log.Printf("Already connected as subscriber for %s, leaving and re-joining", p.streamType)
log.Printf("Already connected subscriber %d for %s, leaving and re-joining on handle %d", p.id, p.streamType, p.handleId)
goto retry
case JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM:
fallthrough
Expand Down

0 comments on commit 9206a56

Please sign in to comment.