diff --git a/server.conf.in b/server.conf.in index c3865983..55fb561a 100644 --- a/server.conf.in +++ b/server.conf.in @@ -99,12 +99,12 @@ connectionsperhost = 8 [mcu] # The type of the MCU to use. Currently only "janus" and "proxy" are supported. -type = janus +# Leave empty to disable MCU functionality. +#type = # For type "janus": the URL to the websocket endpoint of the MCU server. # For type "proxy": a space-separated list of proxy URLs to connect to. -# Leave empty to disable MCU functionality. -url = +#url = # For type "janus": the maximum bitrate per publishing stream (in bits per # second). @@ -116,6 +116,14 @@ url = # Default is 2 mbit/sec. #maxscreenbitrate = 2097152 +# For type "proxy": type of URL configuration for proxy servers. +# Defaults to "static". +# +# Possible values: +# - static: A space-separated list of proxy URLs is given in the "url" option. +# - etcd: Proxy URLs are retrieved from an etcd cluster (see below). +#urltype = "static" + # For type "proxy": the id of the token to use when connecting to proxy servers. #token_id = server1 @@ -123,6 +131,31 @@ url = # connecting to proxy servers. #token_key = privkey.pem +# For url type "etcd": Comma-separated list of static etcd endpoints to +# connect to. +#endpoints = 127.0.0.1:2379,127.0.0.1:22379,127.0.0.1:32379 + +# For url type "etcd": Options to perform endpoint discovery through DNS SRV. +# Only used if no endpoints are configured manually. +#discoverysrv = example.com +#discoveryservice = foo + +# For url type "etcd": Path to private key, client certificate and CA +# certificate if TLS authentication should be used. +#clientkey = /path/to/etcd-client.key +#clientcert = /path/to/etcd-client.crt +#cacert = /path/to/etcd-ca.crt + +# For url type "etcd": Key prefix of MCU proxy entries. All keys below will be +# watched and assumed to contain a JSON document. The entry "address" from this +# document will be used as proxy URL, other contents in the document will be +# ignored. +# +# Example: +# "/signaling/proxy/server/one" -> {"address": "https://proxy1.domain.invalid"} +# "/signaling/proxy/server/two" -> {"address": "https://proxy2.domain.invalid"} +#keyprefix = /signaling/proxy/server + [turn] # API key that the MCU will need to send when requesting TURN credentials. #apikey = the-api-key-for-the-rest-service diff --git a/src/server/main.go b/src/server/main.go index cd493458..05c4e632 100644 --- a/src/server/main.go +++ b/src/server/main.go @@ -91,7 +91,7 @@ func createTLSListener(addr string, certFile, keyFile string) (net.Listener, err } func main() { - log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds | log.Lshortfile) + log.SetFlags(log.Lshortfile) flag.Parse() if *showVersion { @@ -155,12 +155,13 @@ func main() { } mcuUrl, _ := config.GetString("mcu", "url") - if mcuUrl != "" { - mcuType, _ := config.GetString("mcu", "type") - if mcuType == "" { - mcuType = signaling.McuTypeDefault - } + mcuType, _ := config.GetString("mcu", "type") + if mcuType == "" && mcuUrl != "" { + log.Printf("WARNING: Old-style MCU configuration detected with url but no type, defaulting to type %s", signaling.McuTypeJanus) + mcuType = signaling.McuTypeJanus + } + if mcuType != "" { var mcu signaling.Mcu mcuRetry := initialMcuRetry mcuRetryTimer := time.NewTimer(mcuRetry) @@ -169,21 +170,21 @@ func main() { case signaling.McuTypeJanus: mcu, err = signaling.NewMcuJanus(mcuUrl, config, nats) case signaling.McuTypeProxy: - mcu, err = signaling.NewMcuProxy(mcuUrl, config) + mcu, err = signaling.NewMcuProxy(config) default: log.Fatal("Unsupported MCU type: ", mcuType) } if err == nil { err = mcu.Start() if err != nil { - log.Printf("Could not create %s MCU at %s: %s", mcuType, mcuUrl, err) + log.Printf("Could not create %s MCU: %s", mcuType, err) } } if err == nil { break } - log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, mcuUrl, err, mcuRetry) + log.Printf("Could not initialize %s MCU (%s) will retry in %s", mcuType, err, mcuRetry) mcuRetryTimer.Reset(mcuRetry) select { case sig := <-sigChan: @@ -197,8 +198,9 @@ func main() { } else { mcuUrl, _ = config.GetString("mcu", "url") mcuType, _ = config.GetString("mcu", "type") - if mcuType == "" { - mcuType = signaling.McuTypeDefault + if mcuType == "" && mcuUrl != "" { + log.Printf("WARNING: Old-style MCU configuration detected with url but no type, defaulting to type %s", signaling.McuTypeJanus) + mcuType = signaling.McuTypeJanus } } } @@ -212,7 +214,7 @@ func main() { } defer mcu.Stop() - log.Printf("Using MCU %s at %s\n", mcuType, mcuUrl) + log.Printf("Using %s MCU", mcuType) hub.SetMcu(mcu) } diff --git a/src/signaling/api_proxy.go b/src/signaling/api_proxy.go index ad78a4cc..8ba5b602 100644 --- a/src/signaling/api_proxy.go +++ b/src/signaling/api_proxy.go @@ -252,3 +252,19 @@ type EventProxyServerMessage struct { ClientId string `json:"clientId,omitempty"` Load int64 `json:"load,omitempty"` } + +// Information on a proxy in the etcd cluster. + +type ProxyInformationEtcd struct { + Address string `json:"address"` +} + +func (p *ProxyInformationEtcd) CheckValid() error { + if p.Address == "" { + return fmt.Errorf("address missing") + } + if p.Address[len(p.Address)-1] != '/' { + p.Address += "/" + } + return nil +} diff --git a/src/signaling/mcu_proxy.go b/src/signaling/mcu_proxy.go index 11fa113f..e9bb8a9a 100644 --- a/src/signaling/mcu_proxy.go +++ b/src/signaling/mcu_proxy.go @@ -39,6 +39,10 @@ import ( "github.com/dlintw/goconf" "github.com/gorilla/websocket" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/srv" + "go.etcd.io/etcd/pkg/transport" + "gopkg.in/dgrijalva/jwt-go.v3" ) @@ -53,6 +57,9 @@ const ( // Sort connections by load every 10 publishing requests or once per second. connectionSortRequests = 10 connectionSortInterval = time.Second + + proxyUrlTypeStatic = "static" + proxyUrlTypeEtcd = "etcd" ) type mcuProxyPubSubCommon struct { @@ -871,6 +878,12 @@ type mcuProxy struct { tokenId string tokenKey *rsa.PrivateKey + etcdMu sync.Mutex + client atomic.Value + keyPrefix atomic.Value + keyInfos map[string]*ProxyInformationEtcd + urlToKey map[string]string + connections []*mcuProxyConnection connectionsMap map[string]*mcuProxyConnection connectionsMu sync.RWMutex @@ -884,7 +897,9 @@ type mcuProxy struct { publisherWaiters map[uint64]chan bool } -func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) { +func NewMcuProxy(config *goconf.ConfigFile) (Mcu, error) { + urlType, _ := config.GetString("mcu", "urltype") + tokenId, _ := config.GetString("mcu", "token_id") if tokenId == "" { return nil, fmt.Errorf("No token id configured") @@ -913,22 +928,43 @@ func NewMcuProxy(baseUrl string, config *goconf.ConfigFile) (Mcu, error) { publisherWaiters: make(map[uint64]chan bool), } - for _, u := range strings.Split(baseUrl, " ") { - conn, err := newMcuProxyConnection(mcu, u) - if err != nil { + switch urlType { + case proxyUrlTypeStatic: + mcuUrl, _ := config.GetString("mcu", "url") + for _, u := range strings.Split(mcuUrl, " ") { + conn, err := newMcuProxyConnection(mcu, u) + if err != nil { + return nil, err + } + + mcu.connections = append(mcu.connections, conn) + mcu.connectionsMap[u] = conn + } + if len(mcu.connections) == 0 { + return nil, fmt.Errorf("No MCU proxy connections configured") + } + case proxyUrlTypeEtcd: + mcu.keyInfos = make(map[string]*ProxyInformationEtcd) + mcu.urlToKey = make(map[string]string) + if err := mcu.configureEtcd(config, false); err != nil { return nil, err } - - mcu.connections = append(mcu.connections, conn) - mcu.connectionsMap[u] = conn - } - if len(mcu.connections) == 0 { - return nil, fmt.Errorf("No MCU proxy connections configured") + default: + return nil, fmt.Errorf("Unsupported proxy URL type %s", urlType) } return mcu, nil } +func (m *mcuProxy) getEtcdClient() *clientv3.Client { + c := m.client.Load() + if c == nil { + return nil + } + + return c.(*clientv3.Client) +} + func (m *mcuProxy) Start() error { m.connectionsMu.RLock() defer m.connectionsMu.RUnlock() @@ -952,6 +988,104 @@ func (m *mcuProxy) Stop() { } } +func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) error { + keyPrefix, _ := config.GetString("mcu", "keyprefix") + if keyPrefix == "" { + keyPrefix = "/%s" + } + + var endpoints []string + if endpointsString, _ := config.GetString("mcu", "endpoints"); endpointsString != "" { + for _, ep := range strings.Split(endpointsString, ",") { + ep := strings.TrimSpace(ep) + if ep != "" { + endpoints = append(endpoints, ep) + } + } + } else if discoverySrv, _ := config.GetString("mcu", "discoverysrv"); discoverySrv != "" { + discoveryService, _ := config.GetString("mcu", "discoveryservice") + clients, err := srv.GetClient("etcd-client", discoverySrv, discoveryService) + if err != nil { + if !ignoreErrors { + return fmt.Errorf("Could not discover endpoints for %s: %s", discoverySrv, err) + } + } else { + endpoints = clients.Endpoints + } + } + + if len(endpoints) == 0 { + if !ignoreErrors { + return fmt.Errorf("No proxy URL endpoints configured") + } + + log.Printf("No proxy URL endpoints configured, not changing client") + } else { + cfg := clientv3.Config{ + Endpoints: endpoints, + + // set timeout per request to fail fast when the target endpoint is unavailable + DialTimeout: time.Second, + } + + clientKey, _ := config.GetString("mcu", "clientkey") + clientCert, _ := config.GetString("mcu", "clientcert") + caCert, _ := config.GetString("mcu", "cacert") + if clientKey != "" && clientCert != "" && caCert != "" { + tlsInfo := transport.TLSInfo{ + CertFile: clientCert, + KeyFile: clientKey, + TrustedCAFile: caCert, + } + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + if !ignoreErrors { + return fmt.Errorf("Could not setup TLS configuration: %s", err) + } + + log.Printf("Could not setup TLS configuration, will be disabled (%s)", err) + } else { + cfg.TLS = tlsConfig + } + } + + c, err := clientv3.New(cfg) + if err != nil { + if !ignoreErrors { + return err + } + + log.Printf("Could not create new client from proxy URL endpoints %+v: %s", endpoints, err) + } else { + prev := m.getEtcdClient() + if prev != nil { + prev.Close() + } + m.client.Store(c) + log.Printf("Using proxy URL endpoints %+v", endpoints) + + ch := c.Watch(clientv3.WithRequireLeader(context.Background()), keyPrefix, clientv3.WithPrefix()) + go m.processWatches(ch) + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + response, err := c.Get(ctx, keyPrefix, clientv3.WithPrefix()) + if err != nil { + log.Printf("Could not get initial list of proxy URLs: %s", err) + } else { + for _, ev := range response.Kvs { + m.addEtcdProxy(string(ev.Key), ev.Value) + } + } + }() + } + } + + return nil +} + func (m *mcuProxy) Reload(config *goconf.ConfigFile) { m.connectionsMu.Lock() defer m.connectionsMu.Unlock() @@ -1002,6 +1136,98 @@ func (m *mcuProxy) Reload(config *goconf.ConfigFile) { } } +func (m *mcuProxy) processWatches(ch clientv3.WatchChan) { + for response := range ch { + for _, ev := range response.Events { + switch ev.Type { + case clientv3.EventTypePut: + m.addEtcdProxy(string(ev.Kv.Key), ev.Kv.Value) + case clientv3.EventTypeDelete: + m.removeEtcdProxy(string(ev.Kv.Key)) + default: + log.Printf("Unsupported event %s %q -> %q", ev.Type, ev.Kv.Key, ev.Kv.Value) + } + } + } +} + +func (m *mcuProxy) addEtcdProxy(key string, data []byte) { + var info ProxyInformationEtcd + if err := json.Unmarshal(data, &info); err != nil { + log.Printf("Could not decode proxy information %s: %s", string(data), err) + return + } + if err := info.CheckValid(); err != nil { + log.Printf("Received invalid proxy information %s: %s", string(data), err) + return + } + + m.etcdMu.Lock() + defer m.etcdMu.Unlock() + + prev, found := m.keyInfos[key] + if found && info.Address != prev.Address { + // Address of a proxy has changed. + m.removeEtcdProxyLocked(key) + } + + if otherKey, found := m.urlToKey[info.Address]; found && otherKey != key { + log.Printf("Address %s is already registered for key %s, ignoring %s", info.Address, otherKey, key) + return + } + + m.connectionsMu.Lock() + defer m.connectionsMu.Unlock() + if conn, found := m.connectionsMap[info.Address]; found { + m.keyInfos[key] = &info + m.urlToKey[info.Address] = key + conn.stopCloseIfEmpty() + } else { + conn, err := newMcuProxyConnection(m, info.Address) + if err != nil { + log.Printf("Could not create proxy connection to %s: %s", info.Address, err) + return + } + + if err := conn.start(); err != nil { + log.Printf("Could not start new connection to %s: %s", info.Address, err) + return + } + + log.Printf("Adding new connection to %s (from %s)", info.Address, key) + m.keyInfos[key] = &info + m.urlToKey[info.Address] = key + m.connections = append(m.connections, conn) + m.connectionsMap[info.Address] = conn + atomic.StoreInt64(&m.nextSort, 0) + } +} + +func (m *mcuProxy) removeEtcdProxy(key string) { + m.etcdMu.Lock() + defer m.etcdMu.Unlock() + + m.removeEtcdProxyLocked(key) +} + +func (m *mcuProxy) removeEtcdProxyLocked(key string) { + info, found := m.keyInfos[key] + if !found { + return + } + + delete(m.keyInfos, key) + delete(m.urlToKey, info.Address) + + log.Printf("Removing connection to %s (from %s)", info.Address, key) + + m.connectionsMu.RLock() + defer m.connectionsMu.RUnlock() + if conn, found := m.connectionsMap[info.Address]; found { + go conn.closeIfEmpty() + } +} + func (m *mcuProxy) removeConnection(c *mcuProxyConnection) { m.connectionsMu.Lock() defer m.connectionsMu.Unlock()