diff --git a/src/server/main.go b/src/server/main.go index f052aa77..cd493458 100644 --- a/src/server/main.go +++ b/src/server/main.go @@ -34,6 +34,7 @@ import ( "runtime" runtimepprof "runtime/pprof" "strings" + "syscall" "time" "github.com/dlintw/goconf" @@ -47,7 +48,7 @@ import ( var ( version = "unreleased" - config = flag.String("config", "server.conf", "config file to use") + configFlag = flag.String("config", "server.conf", "config file to use") cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") @@ -98,8 +99,9 @@ func main() { os.Exit(0) } - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + signal.Notify(sigChan, syscall.SIGHUP) if *cpuprofile != "" { f, err := os.Create(*cpuprofile) @@ -127,7 +129,7 @@ func main() { log.Printf("Starting up version %s/%s as pid %d", version, runtime.Version(), os.Getpid()) - config, err := goconf.ReadConfigFile(*config) + config, err := goconf.ReadConfigFile(*configFlag) if err != nil { log.Fatal("Could not read configuration: ", err) } @@ -184,8 +186,22 @@ func main() { log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, mcuUrl, err, mcuRetry) mcuRetryTimer.Reset(mcuRetry) select { - case <-interrupt: - log.Fatalf("Cancelled") + case sig := <-sigChan: + switch sig { + case os.Interrupt: + log.Fatalf("Cancelled") + case syscall.SIGHUP: + log.Printf("Received SIGHUP, reloading %s", *configFlag) + if config, err = goconf.ReadConfigFile(*configFlag); err != nil { + log.Printf("Could not read configuration from %s: %s", *configFlag, err) + } else { + mcuUrl, _ = config.GetString("mcu", "url") + mcuType, _ = config.GetString("mcu", "type") + if mcuType == "" { + mcuType = signaling.McuTypeDefault + } + } + } case <-mcuRetryTimer.C: // Retry connection mcuRetry = mcuRetry * 2 @@ -290,6 +306,22 @@ func main() { } } - <-interrupt - log.Println("Interrupted") +loop: + for { + select { + case sig := <-sigChan: + switch sig { + case os.Interrupt: + log.Println("Interrupted") + break loop + case syscall.SIGHUP: + log.Printf("Received SIGHUP, reloading %s", *configFlag) + if config, err := goconf.ReadConfigFile(*configFlag); err != nil { + log.Printf("Could not read configuration from %s: %s", *configFlag, err) + } else { + hub.Reload(config) + } + } + } + } } diff --git a/src/signaling/hub.go b/src/signaling/hub.go index d69453ea..a284900d 100644 --- a/src/signaling/hub.go +++ b/src/signaling/hub.go @@ -371,6 +371,12 @@ func (h *Hub) Stop() { } } +func (h *Hub) Reload(config *goconf.ConfigFile) { + if h.mcu != nil { + h.mcu.Reload(config) + } +} + func reverseSessionId(s string) (string, error) { // Note that we are assuming base64 encoded strings here. decoded, err := base64.URLEncoding.DecodeString(s) diff --git a/src/signaling/mcu_common.go b/src/signaling/mcu_common.go index c821ff0b..33c4d587 100644 --- a/src/signaling/mcu_common.go +++ b/src/signaling/mcu_common.go @@ -24,6 +24,8 @@ package signaling import ( "fmt" + "github.com/dlintw/goconf" + "golang.org/x/net/context" ) @@ -55,6 +57,7 @@ type McuInitiator interface { type Mcu interface { Start() error Stop() + Reload(config *goconf.ConfigFile) SetOnConnected(func()) SetOnDisconnected(func()) diff --git a/src/signaling/mcu_janus.go b/src/signaling/mcu_janus.go index 11ba02e1..8338fb42 100644 --- a/src/signaling/mcu_janus.go +++ b/src/signaling/mcu_janus.go @@ -359,6 +359,9 @@ func (m *mcuJanus) Stop() { m.reconnectTimer.Stop() } +func (m *mcuJanus) Reload(config *goconf.ConfigFile) { +} + func (m *mcuJanus) SetOnConnected(f func()) { if f == nil { f = emptyOnConnected diff --git a/src/signaling/mcu_proxy.go b/src/signaling/mcu_proxy.go index 145c4f96..1a9e39c3 100644 --- a/src/signaling/mcu_proxy.go +++ b/src/signaling/mcu_proxy.go @@ -242,8 +242,9 @@ func (s *mcuProxySubscriber) ProcessEvent(msg *EventProxyServerMessage) { } type mcuProxyConnection struct { - proxy *mcuProxy - url *url.URL + proxy *mcuProxy + rawUrl string + url *url.URL mu sync.Mutex closeChan chan bool @@ -255,6 +256,7 @@ type mcuProxyConnection struct { reconnectInterval int64 reconnectTimer *time.Timer shutdownScheduled uint32 + closeScheduled uint32 msgId int64 helloMsgId string @@ -280,6 +282,7 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string) (*mcuProxyConnection conn := &mcuProxyConnection{ proxy: proxy, + rawUrl: baseUrl, url: parsed, closeChan: make(chan bool, 1), closedChan: make(chan bool, 1), @@ -331,7 +334,7 @@ func (c *mcuProxyConnection) Country() string { } func (c *mcuProxyConnection) IsShutdownScheduled() bool { - return atomic.LoadUint32(&c.shutdownScheduled) != 0 + return atomic.LoadUint32(&c.shutdownScheduled) != 0 || atomic.LoadUint32(&c.closeScheduled) != 0 } func (c *mcuProxyConnection) readPump() { @@ -433,6 +436,38 @@ func (c *mcuProxyConnection) close() { } } +func (c *mcuProxyConnection) stopCloseIfEmpty() { + atomic.StoreUint32(&c.closeScheduled, 0) +} + +func (c *mcuProxyConnection) closeIfEmpty() bool { + atomic.StoreUint32(&c.closeScheduled, 1) + + var total int64 + c.publishersLock.RLock() + total += int64(len(c.publishers)) + c.publishersLock.RUnlock() + c.subscribersLock.RLock() + total += int64(len(c.subscribers)) + c.subscribersLock.RUnlock() + if total > 0 { + // Connection will be closed once all clients have disconnected. + log.Printf("Connection to %s is still used by %d clients, defer closing", c.url, total) + return false + } + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), closeTimeout) + defer cancel() + + log.Printf("All clients disconnected, closing connection to %s", c.url) + c.stop(ctx) + + c.proxy.removeConnection(c) + }() + return true +} + func (c *mcuProxyConnection) scheduleReconnect() { if err := c.sendClose(); err != nil && err != ErrNotConnected { log.Printf("Could not send close message to %s: %s", c.url, err) @@ -496,6 +531,10 @@ func (c *mcuProxyConnection) removePublisher(publisher *mcuProxyPublisher) { delete(c.publishers, publisher.proxyId) delete(c.publisherIds, publisher.id+"|"+publisher.StreamType()) + + if len(c.publishers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 { + go c.closeIfEmpty() + } } func (c *mcuProxyConnection) clearPublishers() { @@ -509,6 +548,10 @@ func (c *mcuProxyConnection) clearPublishers() { }(c.publishers) c.publishers = make(map[string]*mcuProxyPublisher) c.publisherIds = make(map[string]string) + + if atomic.LoadUint32(&c.closeScheduled) != 0 { + go c.closeIfEmpty() + } } func (c *mcuProxyConnection) removeSubscriber(subscriber *mcuProxySubscriber) { @@ -516,6 +559,10 @@ func (c *mcuProxyConnection) removeSubscriber(subscriber *mcuProxySubscriber) { defer c.subscribersLock.Unlock() delete(c.subscribers, subscriber.proxyId) + + if len(c.subscribers) == 0 && atomic.LoadUint32(&c.closeScheduled) != 0 { + go c.closeIfEmpty() + } } func (c *mcuProxyConnection) clearSubscribers() { @@ -528,6 +575,10 @@ func (c *mcuProxyConnection) clearSubscribers() { } }(c.subscribers) c.subscribers = make(map[string]*mcuProxySubscriber) + + if atomic.LoadUint32(&c.closeScheduled) != 0 { + go c.closeIfEmpty() + } } func (c *mcuProxyConnection) clearCallbacks() { @@ -821,9 +872,11 @@ type mcuProxy struct { tokenId string tokenKey *rsa.PrivateKey - connections atomic.Value - connRequests int64 - nextSort int64 + connections []*mcuProxyConnection + connectionsMap map[string]*mcuProxyConnection + connectionsMu sync.RWMutex + connRequests int64 + nextSort int64 mu sync.RWMutex publishers map[string]*mcuProxyConnection @@ -833,8 +886,6 @@ type mcuProxy struct { } func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) { - var connections []*mcuProxyConnection - tokenId, _ := config.GetString("mcu", "token_id") if tokenId == "" { return nil, fmt.Errorf("No token id configured") @@ -856,6 +907,8 @@ func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) { tokenId: tokenId, tokenKey: tokenKey, + connectionsMap: make(map[string]*mcuProxyConnection), + publishers: make(map[string]*mcuProxyConnection), publisherWaiters: make(map[uint64]chan bool), @@ -867,26 +920,21 @@ func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) { return nil, err } - connections = append(connections, conn) + mcu.connections = append(mcu.connections, conn) + mcu.connectionsMap[u] = conn } - if len(connections) == 0 { + if len(mcu.connections) == 0 { return nil, fmt.Errorf("No MCU proxy connections configured") } - mcu.setConnections(connections) return mcu, nil } -func (m *mcuProxy) setConnections(connections []*mcuProxyConnection) { - m.connections.Store(connections) -} - -func (m *mcuProxy) getConnections() []*mcuProxyConnection { - return m.connections.Load().([]*mcuProxyConnection) -} - func (m *mcuProxy) Start() error { - for _, c := range m.getConnections() { + m.connectionsMu.RLock() + defer m.connectionsMu.RUnlock() + + for _, c := range m.connections { if err := c.start(); err != nil { return err } @@ -895,13 +943,81 @@ func (m *mcuProxy) Start() error { } func (m *mcuProxy) Stop() { - for _, c := range m.getConnections() { + m.connectionsMu.RLock() + defer m.connectionsMu.RUnlock() + + for _, c := range m.connections { ctx, cancel := context.WithTimeout(context.Background(), closeTimeout) defer cancel() c.stop(ctx) } } +func (m *mcuProxy) Reload(config *goconf.ConfigFile) { + m.connectionsMu.Lock() + defer m.connectionsMu.Unlock() + + remove := make(map[string]*mcuProxyConnection) + for u, conn := range m.connectionsMap { + remove[u] = conn + } + created := make(map[string]*mcuProxyConnection) + changed := false + + mcuUrl, _ := config.GetString("mcu", "url") + for _, u := range strings.Split(mcuUrl, " ") { + if existing, found := remove[u]; found { + // Proxy connection still exists in new configuration + delete(remove, u) + existing.stopCloseIfEmpty() + continue + } + + conn, err := newMcuProxyConnection(m, u) + if err != nil { + log.Printf("Could not create proxy connection to %s: %s", u, err) + continue + } + + created[u] = conn + } + + for _, conn := range remove { + go conn.closeIfEmpty() + } + + for u, conn := range created { + if err := conn.start(); err != nil { + log.Printf("Could not start new connection to %s: %s", u, err) + continue + } + + log.Printf("Adding new connection to %s", u) + m.connections = append(m.connections, conn) + m.connectionsMap[u] = conn + changed = true + } + + if changed { + atomic.StoreInt64(&m.nextSort, 0) + } +} + +func (m *mcuProxy) removeConnection(c *mcuProxyConnection) { + m.connectionsMu.Lock() + defer m.connectionsMu.Unlock() + + if _, found := m.connectionsMap[c.rawUrl]; found { + delete(m.connectionsMap, c.rawUrl) + m.connections = nil + for _, conn := range m.connectionsMap { + m.connections = append(m.connections, conn) + } + + atomic.StoreInt64(&m.nextSort, 0) + } +} + func (m *mcuProxy) SetOnConnected(f func()) { // Not supported. } @@ -921,7 +1037,11 @@ func (m *mcuProxy) GetStats() interface{} { result := &mcuProxyStats{ Details: details, } - for _, conn := range m.getConnections() { + + m.connectionsMu.RLock() + defer m.connectionsMu.RUnlock() + + for _, conn := range m.connections { stats := conn.GetStats() result.Publishers += stats.Publishers result.Clients += stats.Clients @@ -998,7 +1118,9 @@ func sortConnectionsForCountry(connections []*mcuProxyConnection, country string } func (m *mcuProxy) getSortedConnections(initiator McuInitiator) []*mcuProxyConnection { - connections := m.getConnections() + m.connectionsMu.RLock() + connections := m.connections + m.connectionsMu.RUnlock() if len(connections) < 2 { return connections } @@ -1014,7 +1136,9 @@ func (m *mcuProxy) getSortedConnections(initiator McuInitiator) []*mcuProxyConne sorted.Sort() - m.setConnections(sorted) + m.connectionsMu.Lock() + m.connections = sorted + m.connectionsMu.Unlock() connections = sorted } diff --git a/src/signaling/mcu_test.go b/src/signaling/mcu_test.go index dbfe4856..84b4ed67 100644 --- a/src/signaling/mcu_test.go +++ b/src/signaling/mcu_test.go @@ -24,6 +24,8 @@ package signaling import ( "fmt" + "github.com/dlintw/goconf" + "golang.org/x/net/context" ) @@ -41,6 +43,9 @@ func (m *TestMCU) Start() error { func (m *TestMCU) Stop() { } +func (m *TestMCU) Reload(config *goconf.ConfigFile) { +} + func (m *TestMCU) SetOnConnected(f func()) { }