Skip to content

Commit

Permalink
Handle case where etcd cluster is not available during startup.
Browse files Browse the repository at this point in the history
  • Loading branch information
fancycode committed Sep 9, 2020
1 parent adce451 commit 767d283
Showing 1 changed file with 66 additions and 8 deletions.
74 changes: 66 additions & 8 deletions src/signaling/mcu_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const (

proxyUrlTypeStatic = "static"
proxyUrlTypeEtcd = "etcd"

initialWaitDelay = time.Second
maxWaitDelay = 8 * time.Second
)

type mcuProxyPubSubCommon struct {
Expand Down Expand Up @@ -1084,20 +1087,38 @@ func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) e
m.client.Store(c)
log.Printf("Using proxy URL endpoints %+v", endpoints)

ch := c.Watch(clientv3.WithRequireLeader(context.Background()), keyPrefix, clientv3.WithPrefix())
go m.processWatches(ch)
go func(client *clientv3.Client) {
log.Printf("Wait for leader and start watching on %s", keyPrefix)
ch := client.Watch(clientv3.WithRequireLeader(context.Background()), keyPrefix, clientv3.WithPrefix())
log.Printf("Watch created for %s", keyPrefix)
m.processWatches(ch)
}(c)

go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
m.waitForConnection()

waitDelay := initialWaitDelay
for {
response, err := m.getProxyUrls(keyPrefix)
if err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout getting initial list of proxy URLs, retry in %s", waitDelay)
} else {
log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", waitDelay, err)
}

time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
continue
}

response, err := c.Get(ctx, keyPrefix, clientv3.WithPrefix())
if err != nil {
log.Printf("Could not get initial list of proxy URLs: %s", err)
} else {
for _, ev := range response.Kvs {
m.addEtcdProxy(string(ev.Key), ev.Value)
}
return
}
}()
}
Expand All @@ -1106,6 +1127,43 @@ func (m *mcuProxy) configureEtcd(config *goconf.ConfigFile, ignoreErrors bool) e
return nil
}

func (m *mcuProxy) getProxyUrls(keyPrefix string) (*clientv3.GetResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

return m.getEtcdClient().Get(ctx, keyPrefix, clientv3.WithPrefix())
}

func (m *mcuProxy) waitForConnection() {
waitDelay := initialWaitDelay
for {
if err := m.syncClient(); err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", waitDelay)
} else {
log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", waitDelay, err)
}

time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
continue
}

log.Printf("Client using endpoints %+v", m.getEtcdClient().Endpoints())
return
}
}

func (m *mcuProxy) syncClient() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

return m.getEtcdClient().Sync(ctx)
}

func (m *mcuProxy) Reload(config *goconf.ConfigFile) {
m.connectionsMu.Lock()
defer m.connectionsMu.Unlock()
Expand Down

0 comments on commit 767d283

Please sign in to comment.