Skip to content

Commit

Permalink
allow teleport to start when some etcd nodes are unreachable (#32438)
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall authored Sep 28, 2023
1 parent 4564944 commit e3b574a
Showing 1 changed file with 60 additions and 16 deletions.
76 changes: 60 additions & 16 deletions lib/backend/etcdbk/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -265,6 +266,7 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke
CleanupInterval: utils.SeventhJitter(time.Minute * 2),
})
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

Expand All @@ -285,28 +287,19 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke
}

// Check that the etcd nodes are at least the minimum version supported
if err = b.reconnect(ctx); err != nil {
if err = b.reconnect(b.ctx); err != nil {
b.Close()
return nil, trace.Wrap(err)
}
timeout, cancel := context.WithTimeout(ctx, time.Second*3*time.Duration(len(cfg.Nodes)))
defer cancel()
for _, n := range cfg.Nodes {
status, err := b.clients.Next().Status(timeout, n)
if err != nil {
return nil, trace.Wrap(err)
}

ver := semver.New(status.Version)
min := semver.New(teleport.MinimumEtcdVersion)
if ver.LessThan(*min) {
return nil, trace.BadParameter("unsupported version of etcd %v for node %v, must be %v or greater",
status.Version, n, teleport.MinimumEtcdVersion)
}
if err := b.checkVersion(b.ctx); err != nil {
b.Close()
return nil, trace.Wrap(err)
}

// Reconnect the etcd client to work around a data race in their code.
// Upstream fix: https://github.com/etcd-io/etcd/pull/12992
if err = b.reconnect(ctx); err != nil {
if err = b.reconnect(b.ctx); err != nil {
b.Close()
return nil, trace.Wrap(err)
}
go b.asyncWatch()
Expand All @@ -315,6 +308,57 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke
return b, nil
}

func (b *EtcdBackend) checkVersion(ctx context.Context) error {
// scope version check to one third the default I/O timeout since slowness that is
// anywhere near the default timeout is going to cause systemic issues.
ctx, cancel := context.WithTimeout(ctx, apidefaults.DefaultIOTimeout/3)

results := make(chan error, len(b.cfg.Nodes))

var wg sync.WaitGroup
for _, nn := range b.cfg.Nodes {
wg.Add(1)
go func(n string) (err error) {
defer func() {
results <- err
wg.Done()
}()
status, err := b.clients.Next().Status(ctx, n)
if err != nil {
return trace.Wrap(err)
}

ver, err := semver.NewVersion(status.Version)
if err != nil {
return trace.BadParameter("failed to parse etcd version %q: %v", status.Version, err)
}

min := semver.New(teleport.MinimumEtcdVersion)
if ver.LessThan(*min) {
return trace.BadParameter("unsupported version of etcd %v for node %v, must be %v or greater",
status.Version, n, teleport.MinimumEtcdVersion)
}

return nil
}(nn)
}

// wait for results
var err error
for range b.cfg.Nodes {
err = <-results
if err == nil {
// stop on first success, we don't care about all endpoints
// being healthy, just that at least one is.
break
}
}

cancel()
wg.Wait()
return trace.Wrap(err)
}

// Validate checks if all the parameters are present/valid
func (cfg *Config) Validate() error {
if len(cfg.Key) == 0 {
Expand Down

0 comments on commit e3b574a

Please sign in to comment.