diff --git a/lib/backend/etcdbk/etcd.go b/lib/backend/etcdbk/etcd.go index 36052d0310e3e..41aec19d3a497 100644 --- a/lib/backend/etcdbk/etcd.go +++ b/lib/backend/etcdbk/etcd.go @@ -28,6 +28,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/coreos/go-semver/semver" @@ -265,6 +266,7 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke CleanupInterval: utils.SeventhJitter(time.Minute * 2), }) if err != nil { + cancel() return nil, trace.Wrap(err) } @@ -285,28 +287,19 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke } // Check that the etcd nodes are at least the minimum version supported - if err = b.reconnect(ctx); err != nil { + if err = b.reconnect(b.ctx); err != nil { + b.Close() return nil, trace.Wrap(err) } - timeout, cancel := context.WithTimeout(ctx, time.Second*3*time.Duration(len(cfg.Nodes))) - defer cancel() - for _, n := range cfg.Nodes { - status, err := b.clients.Next().Status(timeout, n) - if err != nil { - return nil, trace.Wrap(err) - } - - ver := semver.New(status.Version) - min := semver.New(teleport.MinimumEtcdVersion) - if ver.LessThan(*min) { - return nil, trace.BadParameter("unsupported version of etcd %v for node %v, must be %v or greater", - status.Version, n, teleport.MinimumEtcdVersion) - } + if err := b.checkVersion(b.ctx); err != nil { + b.Close() + return nil, trace.Wrap(err) } // Reconnect the etcd client to work around a data race in their code. // Upstream fix: https://github.com/etcd-io/etcd/pull/12992 - if err = b.reconnect(ctx); err != nil { + if err = b.reconnect(b.ctx); err != nil { + b.Close() return nil, trace.Wrap(err) } go b.asyncWatch() @@ -315,6 +308,57 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke return b, nil } +func (b *EtcdBackend) checkVersion(ctx context.Context) error { + // scope version check to one third the default I/O timeout since slowness that is + // anywhere near the default timeout is going to cause systemic issues. + ctx, cancel := context.WithTimeout(ctx, apidefaults.DefaultIOTimeout/3) + + results := make(chan error, len(b.cfg.Nodes)) + + var wg sync.WaitGroup + for _, nn := range b.cfg.Nodes { + wg.Add(1) + go func(n string) (err error) { + defer func() { + results <- err + wg.Done() + }() + status, err := b.clients.Next().Status(ctx, n) + if err != nil { + return trace.Wrap(err) + } + + ver, err := semver.NewVersion(status.Version) + if err != nil { + return trace.BadParameter("failed to parse etcd version %q: %v", status.Version, err) + } + + min := semver.New(teleport.MinimumEtcdVersion) + if ver.LessThan(*min) { + return trace.BadParameter("unsupported version of etcd %v for node %v, must be %v or greater", + status.Version, n, teleport.MinimumEtcdVersion) + } + + return nil + }(nn) + } + + // wait for results + var err error + for range b.cfg.Nodes { + err = <-results + if err == nil { + // stop on first success, we don't care about all endpoints + // being healthy, just that at least one is. + break + } + } + + cancel() + wg.Wait() + return trace.Wrap(err) +} + // Validate checks if all the parameters are present/valid func (cfg *Config) Validate() error { if len(cfg.Key) == 0 {