Skip to content

Commit

Permalink
Unify getting agent version for EKS enrollment during manual and auto…
Browse files Browse the repository at this point in the history
…-discovery flows (#45661)
  • Loading branch information
AntonAM authored Aug 21, 2024
1 parent 410207b commit ea04605
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 98 deletions.
28 changes: 28 additions & 0 deletions lib/kube/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/hex"
"errors"
"slices"
"strings"

"github.com/gravitational/trace"
"k8s.io/client-go/kubernetes"
Expand All @@ -32,6 +33,8 @@ import (
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/automaticupgrades"
"github.com/gravitational/teleport/lib/automaticupgrades/version"
)

// GetKubeClient returns instance of client to the kubernetes cluster
Expand Down Expand Up @@ -218,6 +221,31 @@ func extractAndSortKubeClusters(kss []types.KubeServer) []types.KubeCluster {
return []types.KubeCluster(sorted)
}

type Pinger interface {
Ping(context.Context) (proto.PingResponse, error)
}

// GetKubeAgentVersion returns a version of the Kube agent appropriate for this Teleport cluster. Used for example when deciding version
// for enrolling EKS clusters.
func GetKubeAgentVersion(ctx context.Context, pinger Pinger, clusterFeatures proto.Features, releaseChannels automaticupgrades.Channels) (string, error) {
pingResponse, err := pinger.Ping(ctx)
if err != nil {
return "", trace.Wrap(err)
}
agentVersion := pingResponse.ServerVersion

if clusterFeatures.GetAutomaticUpgrades() && clusterFeatures.GetCloud() {
defaultVersion, err := releaseChannels.DefaultVersion(ctx)
if err == nil {
agentVersion = defaultVersion
} else if !errors.Is(err, &version.NoNewVersionError{}) {
return "", trace.Wrap(err)
}
}

return strings.TrimPrefix(agentVersion, "v"), nil
}

// CheckKubeCluster validates kubeClusterName is registered with this Teleport cluster.
func CheckKubeCluster(ctx context.Context, p KubeServicesPresence, kubeClusterName string) error {
if kubeClusterName == "" {
Expand Down
72 changes: 72 additions & 0 deletions lib/kube/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"context"
"testing"

"github.com/gravitational/trace"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/automaticupgrades"
)

func TestCheckKubeCluster(t *testing.T) {
Expand Down Expand Up @@ -91,6 +94,75 @@ func kubeServer(t *testing.T, kubeCluster, hostname, hostID string) types.KubeSe
return server
}

func TestGetAgentVersion(t *testing.T) {
t.Parallel()

ctx := context.Background()

testCases := []struct {
desc string
ping func(ctx context.Context) (proto.PingResponse, error)
clusterFeatures proto.Features
channelVersion string
expectedVersion string
errorAssert require.ErrorAssertionFunc
}{
{
desc: "ping error",
ping: func(ctx context.Context) (proto.PingResponse, error) {
return proto.PingResponse{}, trace.BadParameter("ping error")
},
expectedVersion: "",
errorAssert: require.Error,
},
{
desc: "no automatic upgrades",
ping: func(ctx context.Context) (proto.PingResponse, error) {
return proto.PingResponse{ServerVersion: "1.2.3"}, nil
},
expectedVersion: "1.2.3",
errorAssert: require.NoError,
},
{
desc: "automatic upgrades",
ping: func(ctx context.Context) (proto.PingResponse, error) {
return proto.PingResponse{ServerVersion: "10"}, nil
},
clusterFeatures: proto.Features{AutomaticUpgrades: true, Cloud: true},
channelVersion: "v1.2.3",
expectedVersion: "1.2.3",
errorAssert: require.NoError,
},
}

for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
p := &pinger{pingFn: tt.ping}

var channel *automaticupgrades.Channel
if tt.channelVersion != "" {
channel = &automaticupgrades.Channel{StaticVersion: tt.channelVersion}
err := channel.CheckAndSetDefaults()
require.NoError(t, err)
}
releaseChannels := automaticupgrades.Channels{automaticupgrades.DefaultChannelName: channel}

version, err := GetKubeAgentVersion(ctx, p, tt.clusterFeatures, releaseChannels)

tt.errorAssert(t, err)
require.Equal(t, tt.expectedVersion, version)
})
}
}

type pinger struct {
pingFn func(ctx context.Context) (proto.PingResponse, error)
}

func (p *pinger) Ping(ctx context.Context) (proto.PingResponse, error) {
return p.pingFn(ctx)
}

func TestExtractAndSortKubeClusterNames(t *testing.T) {
t.Parallel()

Expand Down
21 changes: 2 additions & 19 deletions lib/srv/discovery/kube_integration_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package discovery

import (
"context"
"errors"
"fmt"
"slices"
"strings"
Expand All @@ -32,7 +31,7 @@ import (
integrationv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/automaticupgrades"
"github.com/gravitational/teleport/lib/automaticupgrades/version"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"
"github.com/gravitational/teleport/lib/srv/discovery/common"
)

Expand Down Expand Up @@ -215,23 +214,7 @@ func (s *Server) enrollEKSClusters(region, integration string, clusters []types.
}

func (s *Server) getKubeAgentVersion(releaseChannels automaticupgrades.Channels) (string, error) {
pingResponse, err := s.AccessPoint.Ping(s.ctx)
if err != nil {
return "", trace.Wrap(err)
}
agentVersion := pingResponse.ServerVersion

clusterFeatures := s.ClusterFeatures()
if clusterFeatures.GetAutomaticUpgrades() && clusterFeatures.GetCloud() {
defaultVersion, err := releaseChannels.DefaultVersion(s.ctx)
if err == nil {
agentVersion = defaultVersion
} else if !errors.Is(err, &version.NoNewVersionError{}) {
return "", trace.Wrap(err)
}
}

return strings.TrimPrefix(agentVersion, "v"), nil
return kubeutils.GetKubeAgentVersion(s.ctx, s.AccessPoint, s.ClusterFeatures(), releaseChannels)
}

type IntegrationFetcher interface {
Expand Down
70 changes: 0 additions & 70 deletions lib/srv/discovery/kube_integration_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/authz"
"github.com/gravitational/teleport/lib/automaticupgrades"
"github.com/gravitational/teleport/lib/cloud"
"github.com/gravitational/teleport/lib/cloud/mocks"
"github.com/gravitational/teleport/lib/integrations/awsoidc"
Expand All @@ -55,75 +54,6 @@ import (
"github.com/gravitational/teleport/lib/srv/discovery/fetchers"
)

func TestGetAgentVersion(t *testing.T) {
t.Parallel()

ctx := context.Background()

testCases := []struct {
desc string
ping func(ctx context.Context) (proto.PingResponse, error)
clusterFeatures proto.Features
channelVersion string
expectedVersion string
errorAssert require.ErrorAssertionFunc
}{
{
desc: "ping error",
ping: func(ctx context.Context) (proto.PingResponse, error) {
return proto.PingResponse{}, trace.BadParameter("ping error")
},
expectedVersion: "",
errorAssert: require.Error,
},
{
desc: "no automatic upgrades",
ping: func(ctx context.Context) (proto.PingResponse, error) {
return proto.PingResponse{ServerVersion: "1.2.3"}, nil
},
expectedVersion: "1.2.3",
errorAssert: require.NoError,
},
{
desc: "automatic upgrades",
ping: func(ctx context.Context) (proto.PingResponse, error) {
return proto.PingResponse{ServerVersion: "10"}, nil
},
clusterFeatures: proto.Features{AutomaticUpgrades: true, Cloud: true},
channelVersion: "v1.2.3",
expectedVersion: "1.2.3",
errorAssert: require.NoError,
},
}

for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
server := Server{
ctx: ctx,
Config: &Config{
AccessPoint: &fakeAccessPoint{ping: tt.ping},
ClusterFeatures: func() proto.Features {
return tt.clusterFeatures
},
},
}

var channel *automaticupgrades.Channel
if tt.channelVersion != "" {
channel = &automaticupgrades.Channel{StaticVersion: tt.channelVersion}
err := channel.CheckAndSetDefaults()
require.NoError(t, err)
}
releaseChannels := automaticupgrades.Channels{automaticupgrades.DefaultChannelName: channel}

version, err := server.getKubeAgentVersion(releaseChannels)

tt.errorAssert(t, err)
require.Equal(t, tt.expectedVersion, version)
})
}
}

func TestServer_getKubeFetchers(t *testing.T) {
eks1, err := fetchers.NewEKSFetcher(fetchers.EKSFetcherConfig{
ClientGetter: &cloud.TestCloudClients{STS: &mocks.STSMock{}},
Expand Down
13 changes: 4 additions & 9 deletions lib/web/integrations_awsoidc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/gravitational/teleport/lib/httplib"
"github.com/gravitational/teleport/lib/integrations/awsoidc"
"github.com/gravitational/teleport/lib/integrations/awsoidc/deployserviceconfig"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"
"github.com/gravitational/teleport/lib/reversetunnelclient"
"github.com/gravitational/teleport/lib/services"
libutils "github.com/gravitational/teleport/lib/utils"
Expand Down Expand Up @@ -480,15 +481,9 @@ func (h *Handler) awsOIDCEnrollEKSClusters(w http.ResponseWriter, r *http.Reques
return nil, trace.BadParameter("an integration name is required")
}

// todo(anton): get auth server version and use it instead of this proxy teleport.version.
agentVersion := teleport.Version
if h.ClusterFeatures.GetAutomaticUpgrades() {
upgradesVersion, err := h.cfg.AutomaticUpgradesChannels.DefaultVersion(ctx)
if err != nil {
return "", trace.Wrap(err)
}

agentVersion = strings.TrimPrefix(upgradesVersion, "v")
agentVersion, err := kubeutils.GetKubeAgentVersion(ctx, h.cfg.ProxyClient, h.ClusterFeatures, h.cfg.AutomaticUpgradesChannels)
if err != nil {
return nil, trace.Wrap(err)
}

response, err := clt.IntegrationAWSOIDCClient().EnrollEKSClusters(ctx, &integrationv1.EnrollEKSClustersRequest{
Expand Down

0 comments on commit ea04605

Please sign in to comment.