diff --git a/api/types/usertasks/object.go b/api/types/usertasks/object.go index 278e956ea755d..c606ebbbf28af 100644 --- a/api/types/usertasks/object.go +++ b/api/types/usertasks/object.go @@ -21,6 +21,7 @@ package usertasks import ( "encoding/binary" "slices" + "strconv" "time" "github.com/google/uuid" @@ -72,6 +73,35 @@ func NewDiscoverEC2UserTask(spec *usertasksv1.UserTaskSpec, opts ...UserTaskOpti return ut, nil } +// NewDiscoverEKSUserTask creates a new DiscoverEKS User Task Type. +func NewDiscoverEKSUserTask(spec *usertasksv1.UserTaskSpec, opts ...UserTaskOption) (*usertasksv1.UserTask, error) { + taskName := TaskNameForDiscoverEKS(TaskNameForDiscoverEKSParts{ + Integration: spec.GetIntegration(), + IssueType: spec.GetIssueType(), + AccountID: spec.GetDiscoverEks().GetAccountId(), + Region: spec.GetDiscoverEks().GetRegion(), + AppAutoDiscover: spec.GetDiscoverEks().GetAppAutoDiscover(), + }) + + ut := &usertasksv1.UserTask{ + Kind: types.KindUserTask, + Version: types.V1, + Metadata: &headerv1.Metadata{ + Name: taskName, + }, + Spec: spec, + } + for _, o := range opts { + o(ut) + } + + if err := ValidateUserTask(ut); err != nil { + return nil, trace.Wrap(err) + } + + return ut, nil +} + const ( // TaskStateOpen identifies an issue with an instance that is not yet resolved. TaskStateOpen = "OPEN" @@ -86,6 +116,11 @@ const ( // when an auto-enrollment of an EC2 instance fails. // UserTasks that have this Task Type must include the DiscoverEC2 field. TaskTypeDiscoverEC2 = "discover-ec2" + + // TaskTypeDiscoverEKS identifies a User Tasks that is created + // when an auto-enrollment of an EKS cluster fails. + // UserTasks that have this Task Type must include the DiscoverEKS field. + TaskTypeDiscoverEKS = "discover-eks" ) // List of Auto Discover EC2 issues identifiers. @@ -128,6 +163,40 @@ var discoverEC2IssueTypes = []string{ AutoDiscoverEC2IssueSSMInvocationFailure, } +// List of Auto Discover EKS issues identifiers. +// This value is used to populate the UserTasks.Spec.IssueType for Discover EKS tasks. +const ( + // AutoDiscoverEKSIssueStatusNotActive is used to identify clusters that failed to auto-enroll + // because their Status is not Active. + AutoDiscoverEKSIssueStatusNotActive = "eks-status-not-active" + // AutoDiscoverEKSIssueMissingEndpoingPublicAccess is used to identify clusters that failed to auto-enroll + // because they don't have a public endpoint and this Teleport Cluster is running in Teleport Cloud. + AutoDiscoverEKSIssueMissingEndpoingPublicAccess = "eks-missing-endpoint-public-access" + // AutoDiscoverEKSIssueAuthenticationModeUnsupported is used to identify clusters that failed to auto-enroll + // because their Authentication Mode is not supported. + // Accepted values are API and API_AND_CONFIG_MAP. + AutoDiscoverEKSIssueAuthenticationModeUnsupported = "eks-authentication-mode-unsupported" + // AutoDiscoverEKSIssueClusterUnreachable is used to identify clusters that failed to auto-enroll + // because Teleport Cluster is not able to reach the cluster's API. + // Similar to AutoDiscoverEKSIssueMissingEndpoingPublicAccess, which is only used when Teleport is running in Teleport Cloud. + AutoDiscoverEKSIssueClusterUnreachable = "eks-cluster-unreachable" + // AutoDiscoverEKSIssueAgentNotConnecting is used to identify clusters that Teleport tried to + // install the HELM chart but the Kube Agent is not connecting to Teleport. + // This can be a transient issue (eg kube agent is in the process of joining), or some non-recoverable issue. + // To get more information, users can follow the following link: + // https://.console.aws.amazon.com/eks/home?#/clusters//statefulsets/teleport-kube-agent?namespace=teleport-agent + AutoDiscoverEKSIssueAgentNotConnecting = "eks-agent-not-connecting" +) + +// DiscoverEKSIssueTypes is a list of issue types that can occur when trying to auto enroll EKS clusters. +var DiscoverEKSIssueTypes = []string{ + AutoDiscoverEKSIssueStatusNotActive, + AutoDiscoverEKSIssueMissingEndpoingPublicAccess, + AutoDiscoverEKSIssueAuthenticationModeUnsupported, + AutoDiscoverEKSIssueClusterUnreachable, + AutoDiscoverEKSIssueAgentNotConnecting, +} + // ValidateUserTask validates the UserTask object without modifying it. func ValidateUserTask(ut *usertasksv1.UserTask) error { switch { @@ -152,6 +221,10 @@ func ValidateUserTask(ut *usertasksv1.UserTask) error { if err := validateDiscoverEC2TaskType(ut); err != nil { return trace.Wrap(err) } + case TaskTypeDiscoverEKS: + if err := validateDiscoverEKSTaskType(ut); err != nil { + return trace.Wrap(err) + } default: return trace.BadParameter("task type %q is not valid", ut.Spec.TaskType) } @@ -216,6 +289,62 @@ func validateDiscoverEC2TaskType(ut *usertasksv1.UserTask) error { return nil } +func validateDiscoverEKSTaskType(ut *usertasksv1.UserTask) error { + if ut.GetSpec().Integration == "" { + return trace.BadParameter("integration is required") + } + if ut.GetSpec().DiscoverEks == nil { + return trace.BadParameter("%s requires the discover_eks field", TaskTypeDiscoverEKS) + } + if ut.GetSpec().DiscoverEks.AccountId == "" { + return trace.BadParameter("%s requires the discover_eks.account_id field", TaskTypeDiscoverEKS) + } + if ut.GetSpec().DiscoverEks.Region == "" { + return trace.BadParameter("%s requires the discover_eks.region field", TaskTypeDiscoverEKS) + } + + expectedTaskName := TaskNameForDiscoverEKS(TaskNameForDiscoverEKSParts{ + Integration: ut.Spec.Integration, + IssueType: ut.Spec.IssueType, + AccountID: ut.Spec.DiscoverEks.AccountId, + Region: ut.Spec.DiscoverEks.Region, + AppAutoDiscover: ut.Spec.DiscoverEks.AppAutoDiscover, + }) + if ut.Metadata.GetName() != expectedTaskName { + return trace.BadParameter("task name is pre-defined for discover-eks types, expected %s, got %s", + expectedTaskName, + ut.Metadata.GetName(), + ) + } + + if !slices.Contains(DiscoverEKSIssueTypes, ut.GetSpec().IssueType) { + return trace.BadParameter("invalid issue type state, allowed values: %v", DiscoverEKSIssueTypes) + } + + if len(ut.Spec.DiscoverEks.Clusters) == 0 { + return trace.BadParameter("at least one cluster is required") + } + for clusterName, clusterIssue := range ut.Spec.DiscoverEks.Clusters { + if clusterName == "" { + return trace.BadParameter("cluster name in discover_eks.clusters map is required") + } + if clusterIssue.Name == "" { + return trace.BadParameter("cluster name in discover_eks.clusters field is required") + } + if clusterName != clusterIssue.Name { + return trace.BadParameter("cluster name in discover_eks.clusters map and field are different") + } + if clusterIssue.DiscoveryConfig == "" { + return trace.BadParameter("discovery config in discover_eks.clusters field is required") + } + if clusterIssue.DiscoveryGroup == "" { + return trace.BadParameter("discovery group in discover_eks.clusters field is required") + } + } + + return nil +} + // TaskNameForDiscoverEC2Parts are the fields that deterministically compute a Discover EC2 task name. // To be used with TaskNameForDiscoverEC2 function. type TaskNameForDiscoverEC2Parts struct { @@ -248,3 +377,34 @@ func TaskNameForDiscoverEC2(parts TaskNameForDiscoverEC2Parts) string { // discoverEC2Namespace is an UUID that represents the name space to be used for generating UUIDs for DiscoverEC2 User Task names. var discoverEC2Namespace = uuid.Must(uuid.Parse("6ba7b815-9dad-11d1-80b4-00c04fd430c8")) + +// TaskNameForDiscoverEKSParts are the fields that deterministically compute a Discover EKS task name. +// To be used with TaskNameForDiscoverEKS function. +type TaskNameForDiscoverEKSParts struct { + Integration string + IssueType string + AccountID string + Region string + AppAutoDiscover bool +} + +// TaskNameForDiscoverEKS returns a deterministic name for the DiscoverEKS task type. +// This method is used to ensure a single UserTask is created to report issues in enrolling EKS clusters for a given integration, issue type, account id and region. +func TaskNameForDiscoverEKS(parts TaskNameForDiscoverEKSParts) string { + var bs []byte + bs = append(bs, binary.LittleEndian.AppendUint64(nil, uint64(len(parts.Integration)))...) + bs = append(bs, []byte(parts.Integration)...) + bs = append(bs, binary.LittleEndian.AppendUint64(nil, uint64(len(parts.IssueType)))...) + bs = append(bs, []byte(parts.IssueType)...) + bs = append(bs, binary.LittleEndian.AppendUint64(nil, uint64(len(parts.AccountID)))...) + bs = append(bs, []byte(parts.AccountID)...) + bs = append(bs, binary.LittleEndian.AppendUint64(nil, uint64(len(parts.Region)))...) + bs = append(bs, []byte(parts.Region)...) + appAutoDiscoverString := strconv.FormatBool(parts.AppAutoDiscover) + bs = append(bs, binary.LittleEndian.AppendUint64(nil, uint64(len(appAutoDiscoverString)))...) + bs = append(bs, []byte(appAutoDiscoverString)...) + return uuid.NewSHA1(discoverEKSNamespace, bs).String() +} + +// discoverEKSNamespace is an UUID that represents the name space to be used for generating UUIDs for DiscoverEKS User Task names. +var discoverEKSNamespace = uuid.NewSHA1(uuid.Nil, []byte("discover-eks")) diff --git a/api/types/usertasks/object_test.go b/api/types/usertasks/object_test.go index f2298f2132829..396a18613502d 100644 --- a/api/types/usertasks/object_test.go +++ b/api/types/usertasks/object_test.go @@ -58,6 +58,30 @@ func TestValidateUserTask(t *testing.T) { return userTask } + exampleClusterName := "MyCluster" + baseEKSDiscoverTask := func(t *testing.T) *usertasksv1.UserTask { + userTask, err := usertasks.NewDiscoverEKSUserTask(&usertasksv1.UserTaskSpec{ + Integration: "my-integration", + TaskType: "discover-eks", + IssueType: "eks-agent-not-connecting", + State: "OPEN", + DiscoverEks: &usertasksv1.DiscoverEKS{ + AccountId: "123456789012", + Region: "us-east-1", + Clusters: map[string]*usertasksv1.DiscoverEKSCluster{ + exampleClusterName: { + Name: exampleClusterName, + DiscoveryConfig: "dc01", + DiscoveryGroup: "dg01", + SyncTime: timestamppb.Now(), + }, + }, + }, + }) + require.NoError(t, err) + return userTask + } + tests := []struct { name string task func(t *testing.T) *usertasksv1.UserTask @@ -201,6 +225,119 @@ func TestValidateUserTask(t *testing.T) { }, wantErr: require.Error, }, + { + name: "DiscoverEKS: valid", + task: baseEKSDiscoverTask, + wantErr: require.NoError, + }, + { + name: "DiscoverEKS: invalid issue type", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + ut.Spec.IssueType = "unknown error" + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: missing integration", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + ut.Spec.Integration = "" + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: missing discover eks field", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + ut.Spec.DiscoverEks = nil + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: wrong task name", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + ut.Metadata.Name = "another-name" + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: missing account id", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + ut.Spec.DiscoverEks.AccountId = "" + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: missing region", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + ut.Spec.DiscoverEks.Region = "" + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: clusters - missing cluster name in map key", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + origClusterMetadata := ut.Spec.DiscoverEks.Clusters[exampleClusterName] + ut.Spec.DiscoverEks.Clusters[""] = origClusterMetadata + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: clusters - missing cluster name in cluster metadata", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + origClusterMetadata := ut.Spec.DiscoverEks.Clusters[exampleClusterName] + origClusterMetadata.Name = "" + ut.Spec.DiscoverEks.Clusters[exampleClusterName] = origClusterMetadata + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: clusters - different cluster name", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + origClusterMetadata := ut.Spec.DiscoverEks.Clusters[exampleClusterName] + origClusterMetadata.Name = "another-cluster" + ut.Spec.DiscoverEks.Clusters[exampleClusterName] = origClusterMetadata + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: clusters - missing discovery config", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + origClusterMetadata := ut.Spec.DiscoverEks.Clusters[exampleClusterName] + origClusterMetadata.DiscoveryConfig = "" + ut.Spec.DiscoverEks.Clusters[exampleClusterName] = origClusterMetadata + return ut + }, + wantErr: require.Error, + }, + { + name: "DiscoverEKS: clusters - missing discovery group", + task: func(t *testing.T) *usertasksv1.UserTask { + ut := baseEKSDiscoverTask(t) + origClusterMetadata := ut.Spec.DiscoverEks.Clusters[exampleClusterName] + origClusterMetadata.DiscoveryGroup = "" + ut.Spec.DiscoverEks.Clusters[exampleClusterName] = origClusterMetadata + return ut + }, + wantErr: require.Error, + }, } for _, tt := range tests { @@ -269,3 +406,62 @@ func TestNewDiscoverEC2UserTask(t *testing.T) { }) } } + +func TestNewDiscoverEKSUserTask(t *testing.T) { + t.Parallel() + + userTaskExpirationTime := time.Now() + userTaskExpirationTimestamp := timestamppb.New(userTaskExpirationTime) + clusterSyncTimestamp := userTaskExpirationTimestamp + + baseEKSDiscoverTaskSpec := &usertasksv1.UserTaskSpec{ + Integration: "my-integration", + TaskType: "discover-eks", + IssueType: "eks-agent-not-connecting", + State: "OPEN", + DiscoverEks: &usertasksv1.DiscoverEKS{ + AccountId: "123456789012", + Region: "us-east-1", + Clusters: map[string]*usertasksv1.DiscoverEKSCluster{ + "MyKubeCluster": { + Name: "MyKubeCluster", + DiscoveryConfig: "dc01", + DiscoveryGroup: "dg01", + SyncTime: clusterSyncTimestamp, + }, + }, + }, + } + + tests := []struct { + name string + taskSpec *usertasksv1.UserTaskSpec + taskOption []usertasks.UserTaskOption + expectedTask *usertasksv1.UserTask + }{ + { + name: "options are applied task type", + taskSpec: baseEKSDiscoverTaskSpec, + expectedTask: &usertasksv1.UserTask{ + Kind: "user_task", + Version: "v1", + Metadata: &headerv1.Metadata{ + Name: "09b7d37e-3570-531a-b326-1860cafc23fb", + Expires: userTaskExpirationTimestamp, + }, + Spec: baseEKSDiscoverTaskSpec, + }, + taskOption: []usertasks.UserTaskOption{ + usertasks.WithExpiration(userTaskExpirationTime), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotTask, err := usertasks.NewDiscoverEKSUserTask(tt.taskSpec, tt.taskOption...) + require.NoError(t, err) + require.Equal(t, tt.expectedTask, gotTask) + }) + } +} diff --git a/lib/auth/integration/integrationv1/awsoidc.go b/lib/auth/integration/integrationv1/awsoidc.go index d903dd22914c6..00b28332bc857 100644 --- a/lib/auth/integration/integrationv1/awsoidc.go +++ b/lib/auth/integration/integrationv1/awsoidc.go @@ -548,6 +548,7 @@ func (s *AWSOIDCService) EnrollEKSClusters(ctx context.Context, req *integration EksClusterName: r.ClusterName, ResourceId: r.ResourceId, Error: trace.UserMessage(r.Error), + IssueType: r.IssueType, }) } diff --git a/lib/auth/usertasks/usertasksv1/service.go b/lib/auth/usertasks/usertasksv1/service.go index cf94dd57ac447..a383e55a70135 100644 --- a/lib/auth/usertasks/usertasksv1/service.go +++ b/lib/auth/usertasks/usertasksv1/service.go @@ -174,8 +174,11 @@ func userTaskToUserTaskStateEvent(ut *usertasksv1.UserTask) *usagereporter.UserT IssueType: ut.GetSpec().GetIssueType(), State: ut.GetSpec().GetState(), } - if ut.GetSpec().GetTaskType() == usertasks.TaskTypeDiscoverEC2 { + switch ut.GetSpec().GetTaskType() { + case usertasks.TaskTypeDiscoverEC2: ret.InstancesCount = int32(len(ut.GetSpec().GetDiscoverEc2().GetInstances())) + case usertasks.TaskTypeDiscoverEKS: + ret.InstancesCount = int32(len(ut.GetSpec().GetDiscoverEks().GetClusters())) } return ret } diff --git a/lib/integrations/awsoidc/eks_enroll_clusters.go b/lib/integrations/awsoidc/eks_enroll_clusters.go index 621837336f273..dbeb6f2385484 100644 --- a/lib/integrations/awsoidc/eks_enroll_clusters.go +++ b/lib/integrations/awsoidc/eks_enroll_clusters.go @@ -23,6 +23,7 @@ import ( "encoding/base64" "fmt" "log/slog" + "maps" "net/url" "slices" "strings" @@ -50,6 +51,7 @@ import ( "k8s.io/client-go/rest" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/api/types/usertasks" apiutils "github.com/gravitational/teleport/api/utils" awslib "github.com/gravitational/teleport/lib/cloud/aws" "github.com/gravitational/teleport/lib/defaults" @@ -84,6 +86,8 @@ type EnrollEKSClusterResult struct { ResourceId string // Error contains an error that happened during enrollment, if there was one. Error error + // IssueType contains the UserTask issue type for well-known errors. + IssueType string } // EnrollEKSClusterResponse contains result for enrollment . @@ -92,8 +96,8 @@ type EnrollEKSClusterResponse struct { Results []EnrollEKSClusterResult } -// EnrollEKSCLusterClient defines functions required for EKS cluster enrollment. -type EnrollEKSCLusterClient interface { +// EnrollEKSClusterClient defines functions required for EKS cluster enrollment. +type EnrollEKSClusterClient interface { // CreateAccessEntry creates an access entry. An access entry allows an IAM principal to access an EKS cluster. CreateAccessEntry(ctx context.Context, params *eks.CreateAccessEntryInput, optFns ...func(*eks.Options)) (*eks.CreateAccessEntryOutput, error) @@ -208,7 +212,7 @@ func (d *defaultEnrollEKSClustersClient) CreateToken(ctx context.Context, token type TokenCreator func(ctx context.Context, token types.ProvisionToken) error // NewEnrollEKSClustersClient returns new client that can be used to enroll EKS clusters into Teleport. -func NewEnrollEKSClustersClient(ctx context.Context, req *AWSClientRequest, tokenCreator TokenCreator) (EnrollEKSCLusterClient, error) { +func NewEnrollEKSClustersClient(ctx context.Context, req *AWSClientRequest, tokenCreator TokenCreator) (EnrollEKSClusterClient, error) { eksClient, err := newEKSClient(ctx, req) if err != nil { return nil, trace.Wrap(err) @@ -253,6 +257,9 @@ type EnrollEKSClustersRequest struct { // AgentVersion specifies version of the Helm chart that will be installed during enrollment. AgentVersion string + + // ExtraLabels added to the enrolled clusters. + ExtraLabels map[string]string } // CheckAndSetDefaults checks if the required fields are present. @@ -289,7 +296,7 @@ func (e *EnrollEKSClustersRequest) CheckAndSetDefaults() error { // During enrollment we create access entry for an EKS cluster if needed and cluster admin policy is associated with that entry, // so our AWS integration can access the target EKS cluster during the chart installation. After enrollment is done we remove // the access entry (if it was created by us), since we don't need it anymore. -func EnrollEKSClusters(ctx context.Context, log *slog.Logger, clock clockwork.Clock, proxyAddr string, clt EnrollEKSCLusterClient, req EnrollEKSClustersRequest) (*EnrollEKSClusterResponse, error) { +func EnrollEKSClusters(ctx context.Context, log *slog.Logger, clock clockwork.Clock, proxyAddr string, clt EnrollEKSClusterClient, req EnrollEKSClustersRequest) (*EnrollEKSClusterResponse, error) { var mu sync.Mutex var results []EnrollEKSClusterResult @@ -304,17 +311,23 @@ func EnrollEKSClusters(ctx context.Context, log *slog.Logger, clock clockwork.Cl eksClusterName := eksClusterName group.Go(func() error { - resourceId, err := enrollEKSCluster(ctx, log, clock, clt, proxyAddr, eksClusterName, req) + resourceId, issueType, err := enrollEKSCluster(ctx, log, clock, clt, proxyAddr, eksClusterName, req) if err != nil { log.WarnContext(ctx, "Failed to enroll EKS cluster", "error", err, "cluster", eksClusterName, + "issue_type", issueType, ) } mu.Lock() defer mu.Unlock() - results = append(results, EnrollEKSClusterResult{ClusterName: eksClusterName, ResourceId: resourceId, Error: trace.Wrap(err)}) + results = append(results, EnrollEKSClusterResult{ + ClusterName: eksClusterName, + ResourceId: resourceId, + Error: trace.Wrap(err), + IssueType: issueType, + }) return nil }) @@ -358,22 +371,28 @@ func presignCallerIdentityURL(ctx context.Context, stsClient *sts.Client, cluste return presigned.URL, nil } -func enrollEKSCluster(ctx context.Context, log *slog.Logger, clock clockwork.Clock, clt EnrollEKSCLusterClient, proxyAddr, clusterName string, req EnrollEKSClustersRequest) (string, error) { +// enrollEKSCluster tries to enroll a single EKS cluster using the EnrollEKSClusterClient. +// Returns the resource id or an error and an issue type which identifies the class of the error that occurred. +func enrollEKSCluster(ctx context.Context, log *slog.Logger, clock clockwork.Clock, clt EnrollEKSClusterClient, proxyAddr, clusterName string, req EnrollEKSClustersRequest) (string, string, error) { eksClusterInfo, err := clt.DescribeCluster(ctx, &eks.DescribeClusterInput{ Name: aws.String(clusterName), }) if err != nil { - return "", trace.Wrap(err, "unable to describe EKS cluster") + return "", "", trace.Wrap(err, "unable to describe EKS cluster") } eksCluster := eksClusterInfo.Cluster if eksCluster.Status != eksTypes.ClusterStatusActive { - return "", trace.BadParameter(`can't enroll EKS cluster %q - expected "ACTIVE" state, got %q.`, clusterName, eksCluster.Status) + return "", + usertasks.AutoDiscoverEKSIssueStatusNotActive, + trace.BadParameter(`can't enroll EKS cluster %q - expected "ACTIVE" state, got %q.`, clusterName, eksCluster.Status) } // We can't discover private EKS clusters for cloud clients, since we know that auth server is running in our VPC. if req.IsCloud && !eksCluster.ResourcesVpcConfig.EndpointPublicAccess { - return "", trace.AccessDenied(`can't enroll %q because it is not accessible from Teleport Cloud, please enable endpoint public access in your EKS cluster and try again.`, clusterName) + return "", + usertasks.AutoDiscoverEKSIssueMissingEndpoingPublicAccess, + trace.AccessDenied(`can't enroll %q because it is not accessible from Teleport Cloud, please enable endpoint public access in your EKS cluster and try again.`, clusterName) } // When clusters are using CONFIG_MAP, API is not acessible and thus Teleport can't install the Teleport's Helm chart. @@ -383,19 +402,21 @@ func enrollEKSCluster(ctx context.Context, log *slog.Logger, clock clockwork.Clo eksTypes.AuthenticationModeApiAndConfigMap, } if !slices.Contains(allowedAuthModes, eksCluster.AccessConfig.AuthenticationMode) { - return "", trace.BadParameter("can't enroll %q because its access config's authentication mode is %q, only %v are supported", clusterName, eksCluster.AccessConfig.AuthenticationMode, allowedAuthModes) + return "", + usertasks.AutoDiscoverEKSIssueAuthenticationModeUnsupported, + trace.BadParameter("can't enroll %q because its access config's authentication mode is %q, only %v are supported", clusterName, eksCluster.AccessConfig.AuthenticationMode, allowedAuthModes) } principalArn, err := getAccessEntryPrincipalArn(ctx, clt.GetCallerIdentity) if err != nil { - return "", trace.Wrap(err) + return "", "", trace.Wrap(err) } ownershipTags := tags.DefaultResourceCreationTags(req.TeleportClusterName, req.IntegrationName) wasAdded, err := maybeAddAccessEntry(ctx, log, clusterName, principalArn, clt, ownershipTags) if err != nil { - return "", trace.Wrap(err) + return "", "", trace.Wrap(err) } if wasAdded { // If we added access entry, we'll clean it up when function stops executing. @@ -423,37 +444,57 @@ func enrollEKSCluster(ctx context.Context, log *slog.Logger, clock clockwork.Clo PrincipalArn: aws.String(principalArn), }) if err != nil { - return "", trace.Wrap(err, "unable to associate EKS Access Policy to cluster %q", clusterName) + return "", "", trace.Wrap(err, "unable to associate EKS Access Policy to cluster %q", clusterName) } presignedURL, err := clt.PresignGetCallerIdentityURL(ctx, clusterName) if err != nil { - return "", trace.Wrap(err) + return "", "", trace.Wrap(err) } kubeClientGetter, err := getKubeClientGetter(presignedURL, aws.ToString(eksCluster.CertificateAuthority.Data), aws.ToString(eksCluster.Endpoint)) if err != nil { - return "", trace.Wrap(err, "unable to build kubernetes client for EKS cluster %q", clusterName) + return "", "", trace.Wrap(err, "unable to build kubernetes client for EKS cluster %q", clusterName) } if alreadyInstalled, err := clt.CheckAgentAlreadyInstalled(ctx, kubeClientGetter, log); err != nil { - return "", trace.Wrap(err, "could not check if teleport-kube-agent is already installed.") + return "", + issueTypeFromCheckAgentInstalledError(err), + trace.Wrap(err, "could not check if teleport-kube-agent is already installed.") + } else if alreadyInstalled { - // Web UI relies on the text of this error message. If changed, sync with EnrollEksCluster.tsx - return "", trace.AlreadyExists("teleport-kube-agent is already installed on the cluster %q", clusterName) + return "", + // When using EKS Auto Discovery, after the Kube Agent connects to the Teleport cluster, it is ignored in next discovery iterations. + // Given that this iteration is still hitting this EKS Cluster, it means that the agent can't connect to the Teleport Cluster or is taking too long. + usertasks.AutoDiscoverEKSIssueAgentNotConnecting, + // Web UI relies on the text of this error message. If changed, sync with EnrollEksCluster.tsx + trace.AlreadyExists("teleport-kube-agent is already installed on the cluster %q", clusterName) } joinToken, resourceId, err := getToken(ctx, clock, clt.CreateToken) if err != nil { - return "", trace.Wrap(err) + return "", "", trace.Wrap(err) } if err := clt.InstallKubeAgent(ctx, eksCluster, proxyAddr, joinToken, resourceId, kubeClientGetter, log, req); err != nil { - return "", trace.Wrap(err) + return "", "", trace.Wrap(err) } - return resourceId, nil + return resourceId, "", nil +} + +func issueTypeFromCheckAgentInstalledError(checkErr error) string { + // When the Auth Service fails to reach the EKS Cluster, it usually means that, either: + // - EKS does not have EndpointPublicAccess + // - EKS is not reachable by the Teleport Auth Service + // In the first case, it should be handled in a pre-install check, however, for the second one, we'll get the following message: + // > Kubernetes cluster unreachable: Get \"https://.gr7..eks.amazonaws.com/version\": dial tcp: lookup .gr7..eks.amazonaws.com: no such host" + if strings.Contains(checkErr.Error(), "Kubernetes cluster unreachable: Get") && strings.Contains(checkErr.Error(), "eks.amazonaws.com: no such host") { + return usertasks.AutoDiscoverEKSIssueClusterUnreachable + } + + return "" } // IdentityGetter returns AWS identity of the caller. @@ -475,7 +516,7 @@ func getAccessEntryPrincipalArn(ctx context.Context, identityGetter IdentityGett // maybeAddAccessEntry checks list of access entries for the EKS cluster and adds one for Teleport if it's missing. // If access entry was added by this function it will return true as a first value. -func maybeAddAccessEntry(ctx context.Context, log *slog.Logger, clusterName, roleArn string, clt EnrollEKSCLusterClient, ownershipTags tags.AWSTags) (bool, error) { +func maybeAddAccessEntry(ctx context.Context, log *slog.Logger, clusterName, roleArn string, clt EnrollEKSClusterClient, ownershipTags tags.AWSTags) (bool, error) { entries, err := clt.ListAccessEntries(ctx, &eks.ListAccessEntriesInput{ ClusterName: aws.String(clusterName), }) @@ -687,9 +728,7 @@ func installKubeAgent(ctx context.Context, cfg installKubeAgentParams) error { common.ApplyEKSNameSuffix(kubeCluster) vals["kubeClusterName"] = kubeCluster.GetName() - labels := kubeCluster.GetStaticLabels() - labels[types.InternalResourceIDLabel] = cfg.resourceID - vals["labels"] = labels + vals["labels"] = kubeAgentLabels(kubeCluster, cfg.resourceID, cfg.req.ExtraLabels) if _, err := installCmd.RunWithContext(ctx, agentChart, vals); err != nil { return trace.Wrap(err, "could not install Helm chart.") @@ -697,3 +736,12 @@ func installKubeAgent(ctx context.Context, cfg installKubeAgentParams) error { return nil } + +func kubeAgentLabels(kubeCluster types.KubeCluster, resourceID string, extraLabels map[string]string) map[string]string { + labels := make(map[string]string) + maps.Copy(labels, extraLabels) + maps.Copy(labels, kubeCluster.GetStaticLabels()) + labels[types.InternalResourceIDLabel] = resourceID + + return labels +} diff --git a/lib/integrations/awsoidc/eks_enroll_clusters_test.go b/lib/integrations/awsoidc/eks_enroll_clusters_test.go index ab05516d4cbd0..46c51351930aa 100644 --- a/lib/integrations/awsoidc/eks_enroll_clusters_test.go +++ b/lib/integrations/awsoidc/eks_enroll_clusters_test.go @@ -117,7 +117,7 @@ func TestEnrollEKSClusters(t *testing.T) { }, } - baseClient := func(t *testing.T, clusters []eksTypes.Cluster) EnrollEKSCLusterClient { + baseClient := func(t *testing.T, clusters []eksTypes.Cluster) EnrollEKSClusterClient { clt := &mockEnrollEKSClusterClient{} clt.describeCluster = func(ctx context.Context, params *eks.DescribeClusterInput, optFns ...func(*eks.Options)) (*eks.DescribeClusterOutput, error) { for _, cluster := range clusters { @@ -154,7 +154,7 @@ func TestEnrollEKSClusters(t *testing.T) { testCases := []struct { name string - enrollClient func(*testing.T, []eksTypes.Cluster) EnrollEKSCLusterClient + enrollClient func(*testing.T, []eksTypes.Cluster) EnrollEKSClusterClient eksClusters []eksTypes.Cluster request EnrollEKSClustersRequest requestClusterNames []string @@ -170,6 +170,7 @@ func TestEnrollEKSClusters(t *testing.T) { require.Len(t, response.Results, 1) require.Equal(t, "EKS1", response.Results[0].ClusterName) require.Empty(t, response.Results[0].Error) + require.Empty(t, response.Results[0].IssueType) require.NotEmpty(t, response.Results[0].ResourceId) }, }, @@ -186,9 +187,11 @@ func TestEnrollEKSClusters(t *testing.T) { }) require.Equal(t, "EKS1", response.Results[0].ClusterName) require.Empty(t, response.Results[0].Error) + require.Empty(t, response.Results[0].IssueType) require.NotEmpty(t, response.Results[0].ResourceId) require.Equal(t, "EKS2", response.Results[1].ClusterName) - require.Empty(t, response.Results[1].Error) + require.NoError(t, response.Results[1].Error) + require.Empty(t, response.Results[1].IssueType) require.NotEmpty(t, response.Results[1].ResourceId) }, }, @@ -241,6 +244,7 @@ func TestEnrollEKSClusters(t *testing.T) { require.Len(t, response.Results, 1) require.ErrorContains(t, response.Results[0].Error, `can't enroll EKS cluster "EKS1" - expected "ACTIVE" state, got "PENDING".`) + require.Equal(t, "eks-status-not-active", response.Results[0].IssueType) }, }, { @@ -264,6 +268,7 @@ func TestEnrollEKSClusters(t *testing.T) { require.Len(t, response.Results, 1) require.ErrorContains(t, response.Results[0].Error, `can't enroll "EKS1" because its access config's authentication mode is "CONFIG_MAP", only [API API_AND_CONFIG_MAP] are supported`) + require.Equal(t, "eks-authentication-mode-unsupported", response.Results[0].IssueType) }, }, { @@ -296,6 +301,7 @@ func TestEnrollEKSClusters(t *testing.T) { require.Len(t, response.Results, 1) require.ErrorContains(t, response.Results[0].Error, `can't enroll "EKS3" because it is not accessible from Teleport Cloud, please enable endpoint public access in your EKS cluster and try again.`) + require.Equal(t, "eks-missing-endpoint-public-access", response.Results[0].IssueType) }, }, { @@ -330,7 +336,7 @@ func TestEnrollEKSClusters(t *testing.T) { }, { name: "cluster with already present agent is not enrolled", - enrollClient: func(t *testing.T, clusters []eksTypes.Cluster) EnrollEKSCLusterClient { + enrollClient: func(t *testing.T, clusters []eksTypes.Cluster) EnrollEKSClusterClient { clt := baseClient(t, clusters) mockClt, ok := clt.(*mockEnrollEKSClusterClient) require.True(t, ok) @@ -346,11 +352,12 @@ func TestEnrollEKSClusters(t *testing.T) { require.Len(t, response.Results, 1) require.ErrorContains(t, response.Results[0].Error, `teleport-kube-agent is already installed on the cluster "EKS1"`) + require.Equal(t, "eks-agent-not-connecting", response.Results[0].IssueType) }, }, { name: "if access entry is already present we don't create another one and don't delete it", - enrollClient: func(t *testing.T, clusters []eksTypes.Cluster) EnrollEKSCLusterClient { + enrollClient: func(t *testing.T, clusters []eksTypes.Cluster) EnrollEKSClusterClient { clt := baseClient(t, clusters) mockClt, ok := clt.(*mockEnrollEKSClusterClient) require.True(t, ok) @@ -620,4 +627,4 @@ func (m *mockEnrollEKSClusterClient) PresignGetCallerIdentityURL(ctx context.Con return "", nil } -var _ EnrollEKSCLusterClient = &mockEnrollEKSClusterClient{} +var _ EnrollEKSClusterClient = &mockEnrollEKSClusterClient{} diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 9c8104157d5c8..24d50a7f86a28 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -347,6 +347,7 @@ type Server struct { awsRDSResourcesStatus awsResourcesStatus awsEKSResourcesStatus awsResourcesStatus awsEC2Tasks awsEC2Tasks + awsEKSTasks awsEKSTasks // caRotationCh receives nodes that need to have their CAs rotated. caRotationCh chan []types.Server diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 6b7ca3f4ff94d..208ddf662d272 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -277,9 +277,9 @@ func TestDiscoveryServer(t *testing.T) { ) require.NoError(t, err) - discoveryConfigForUserTaskTestName := uuid.NewString() - discoveryConfigForUserTaskTest, err := discoveryconfig.NewDiscoveryConfig( - header.Metadata{Name: discoveryConfigForUserTaskTestName}, + discoveryConfigForUserTaskEC2TestName := uuid.NewString() + discoveryConfigForUserTaskEC2Test, err := discoveryconfig.NewDiscoveryConfig( + header.Metadata{Name: discoveryConfigForUserTaskEC2TestName}, discoveryconfig.Spec{ DiscoveryGroup: defaultDiscoveryGroup, AWS: []types.AWSMatcher{{ @@ -297,6 +297,21 @@ func TestDiscoveryServer(t *testing.T) { ) require.NoError(t, err) + discoveryConfigForUserTaskEKSTestName := uuid.NewString() + discoveryConfigForUserTaskEKSTest, err := discoveryconfig.NewDiscoveryConfig( + header.Metadata{Name: discoveryConfigForUserTaskEKSTestName}, + discoveryconfig.Spec{ + DiscoveryGroup: defaultDiscoveryGroup, + AWS: []types.AWSMatcher{{ + Types: []string{"eks"}, + Regions: []string{"eu-west-2"}, + Tags: map[string]utils.Strings{"RunDiscover": {"Please"}}, + Integration: "my-integration", + }}, + }, + ) + require.NoError(t, err) + tcs := []struct { name string // presentInstances is a list of servers already present in teleport @@ -304,11 +319,13 @@ func TestDiscoveryServer(t *testing.T) { foundEC2Instances []*ec2.Instance ssm *mockSSMClient emitter *mockEmitter + eksEnroller eksClustersEnroller discoveryConfig *discoveryconfig.DiscoveryConfig staticMatchers Matchers wantInstalledInstances []string wantDiscoveryConfigStatus *discoveryconfig.Status - userTasksDiscoverEC2Check require.ValueAssertionFunc + userTasksDiscoverCheck require.ValueAssertionFunc + cloudClients cloud.Clients ssmRunError error }{ { @@ -613,9 +630,9 @@ func TestDiscoveryServer(t *testing.T) { }, }, staticMatchers: Matchers{}, - discoveryConfig: discoveryConfigForUserTaskTest, + discoveryConfig: discoveryConfigForUserTaskEC2Test, wantInstalledInstances: []string{}, - userTasksDiscoverEC2Check: func(tt require.TestingT, i1 interface{}, i2 ...interface{}) { + userTasksDiscoverCheck: func(tt require.TestingT, i1 interface{}, i2 ...interface{}) { existingTasks, ok := i1.([]*usertasksv1.UserTask) require.True(t, ok, "failed to get existing tasks: %T", i1) require.Len(t, existingTasks, 1) @@ -632,10 +649,80 @@ func TestDiscoveryServer(t *testing.T) { taskInstance := taskInstances["instance-id-1"] require.Equal(t, "instance-id-1", taskInstance.InstanceId) - require.Equal(t, discoveryConfigForUserTaskTestName, taskInstance.DiscoveryConfig) + require.Equal(t, discoveryConfigForUserTaskEC2TestName, taskInstance.DiscoveryConfig) require.Equal(t, defaultDiscoveryGroup, taskInstance.DiscoveryGroup) }, }, + { + name: "multiple EKS clusters failed to autoenroll and user tasks are created", + presentInstances: []types.Server{}, + foundEC2Instances: []*ec2.Instance{}, + ssm: &mockSSMClient{}, + cloudClients: &cloud.TestCloudClients{ + STS: &mocks.STSMock{}, + EKS: &mocks.EKSMock{ + Clusters: []*eks.Cluster{ + { + Name: aws.String("cluster01"), + Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster01"), + Status: aws.String(eks.ClusterStatusActive), + Tags: map[string]*string{ + "RunDiscover": aws.String("Please"), + }, + }, + { + Name: aws.String("cluster02"), + Arn: aws.String("arn:aws:eks:us-west-2:123456789012:cluster/cluster02"), + Status: aws.String(eks.ClusterStatusActive), + Tags: map[string]*string{ + "RunDiscover": aws.String("Please"), + }, + }, + }, + }, + }, + eksEnroller: &mockEKSClusterEnroller{ + resp: &integrationpb.EnrollEKSClustersResponse{ + Results: []*integrationpb.EnrollEKSClusterResult{ + { + EksClusterName: "cluster01", + Error: "access endpoint is not reachable", + IssueType: "eks-cluster-unreachable", + }, + { + EksClusterName: "cluster02", + Error: "access endpoint is not reachable", + IssueType: "eks-cluster-unreachable", + }, + }, + }, + err: nil, + }, + emitter: &mockEmitter{}, + staticMatchers: Matchers{}, + discoveryConfig: discoveryConfigForUserTaskEKSTest, + wantInstalledInstances: []string{}, + userTasksDiscoverCheck: func(tt require.TestingT, i1 interface{}, i2 ...interface{}) { + existingTasks, ok := i1.([]*usertasksv1.UserTask) + require.True(t, ok, "failed to get existing tasks: %T", i1) + require.Len(t, existingTasks, 1) + existingTask := existingTasks[0] + + require.Equal(t, "OPEN", existingTask.GetSpec().State) + require.Equal(t, "my-integration", existingTask.GetSpec().Integration) + require.Equal(t, "eks-cluster-unreachable", existingTask.GetSpec().IssueType) + require.Equal(t, "123456789012", existingTask.GetSpec().GetDiscoverEks().GetAccountId()) + require.Equal(t, "us-west-2", existingTask.GetSpec().GetDiscoverEks().GetRegion()) + + taskClusters := existingTask.GetSpec().GetDiscoverEks().Clusters + require.Contains(t, taskClusters, "cluster01") + taskCluster := taskClusters["cluster01"] + + require.Equal(t, "cluster01", taskCluster.Name) + require.Equal(t, discoveryConfigForUserTaskEKSTestName, taskCluster.DiscoveryConfig) + require.Equal(t, defaultDiscoveryGroup, taskCluster.DiscoveryGroup) + }, + }, } for _, tc := range tcs { @@ -643,7 +730,7 @@ func TestDiscoveryServer(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - testCloudClients := &cloud.TestCloudClients{ + var testCloudClients cloud.Clients = &cloud.TestCloudClients{ EC2: &mockEC2Client{ output: &ec2.DescribeInstancesOutput{ Reservations: []*ec2.Reservation{ @@ -695,11 +782,19 @@ func TestDiscoveryServer(t *testing.T) { require.NoError(t, err) } + var eksEnroller eksClustersEnroller = authClient.IntegrationAWSOIDCClient() + if tc.eksEnroller != nil { + eksEnroller = tc.eksEnroller + } + if tc.cloudClients != nil { + testCloudClients = tc.cloudClients + } + server, err := New(authz.ContextWithUser(context.Background(), identity.I), &Config{ CloudClients: testCloudClients, ClusterFeatures: func() proto.Features { return proto.Features{} }, KubernetesClient: fake.NewSimpleClientset(), - AccessPoint: getDiscoveryAccessPoint(tlsServer.Auth(), authClient), + AccessPoint: getDiscoveryAccessPointWithEKSEnroller(tlsServer.Auth(), authClient, eksEnroller), Matchers: tc.staticMatchers, Emitter: tc.emitter, Log: logger, @@ -742,7 +837,7 @@ func TestDiscoveryServer(t *testing.T) { return true }, 500*time.Millisecond, 50*time.Millisecond) } - if tc.userTasksDiscoverEC2Check != nil { + if tc.userTasksDiscoverCheck != nil { var allUserTasks []*usertasksv1.UserTask var nextToken string for { @@ -754,7 +849,7 @@ func TestDiscoveryServer(t *testing.T) { break } } - tc.userTasksDiscoverEC2Check(t, allUserTasks) + tc.userTasksDiscoverCheck(t, allUserTasks) } }) } @@ -3275,6 +3370,15 @@ type eksClustersEnroller interface { EnrollEKSClusters(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) } +type mockEKSClusterEnroller struct { + resp *integrationpb.EnrollEKSClustersResponse + err error +} + +func (m *mockEKSClusterEnroller) EnrollEKSClusters(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) { + return m.resp, m.err +} + type combinedDiscoveryClient struct { *auth.Server eksClustersEnroller @@ -3297,9 +3401,12 @@ func (d *combinedDiscoveryClient) UpdateDiscoveryConfigStatus(ctx context.Contex return nil, trace.BadParameter("not implemented.") } +func getDiscoveryAccessPointWithEKSEnroller(authServer *auth.Server, authClient authclient.ClientI, eksEnroller eksClustersEnroller) authclient.DiscoveryAccessPoint { + return &combinedDiscoveryClient{Server: authServer, eksClustersEnroller: eksEnroller, discoveryConfigStatusUpdater: authClient.DiscoveryConfigClient()} +} + func getDiscoveryAccessPoint(authServer *auth.Server, authClient authclient.ClientI) authclient.DiscoveryAccessPoint { return &combinedDiscoveryClient{Server: authServer, eksClustersEnroller: authClient.IntegrationAWSOIDCClient(), discoveryConfigStatusUpdater: authClient.DiscoveryConfigClient()} - } type fakeAccessPoint struct { diff --git a/lib/srv/discovery/kube_integration_watcher.go b/lib/srv/discovery/kube_integration_watcher.go index 444565bb6a299..f16cc549d6727 100644 --- a/lib/srv/discovery/kube_integration_watcher.go +++ b/lib/srv/discovery/kube_integration_watcher.go @@ -27,8 +27,10 @@ import ( "time" "github.com/gravitational/trace" + "google.golang.org/protobuf/types/known/timestamppb" integrationv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" + usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/automaticupgrades" kubeutils "github.com/gravitational/teleport/lib/kube/utils" @@ -75,6 +77,7 @@ func (s *Server) startKubeIntegrationWatchers() error { TriggerFetchC: s.newDiscoveryConfigChangedSub(), PreFetchHookFn: func() { s.awsEKSResourcesStatus.reset() + s.awsEKSTasks.reset() }, }) if err != nil { @@ -199,6 +202,7 @@ func (s *Server) enrollEKSClusters(region, integration, discoveryConfig string, mu.Unlock() s.updateDiscoveryConfigStatus(discoveryConfig) + s.upsertTasksForAWSEKSFailedEnrollments() }() // We sort input clusters into two batches - one that has Kubernetes App Discovery @@ -212,8 +216,10 @@ func (s *Server) enrollEKSClusters(region, integration, discoveryConfig string, var clusterNames []string for _, kubeAppDiscovery := range []bool{true, false} { + clustersByName := make(map[string]types.DiscoveredEKSCluster) for _, c := range batchedClusters[kubeAppDiscovery] { clusterNames = append(clusterNames, c.GetAWSConfig().Name) + clustersByName[c.GetAWSConfig().Name] = c } if len(clusterNames) == 0 { continue @@ -242,10 +248,28 @@ func (s *Server) enrollEKSClusters(region, integration, discoveryConfig string, integration: integration, }, 1) if !strings.Contains(r.Error, "teleport-kube-agent is already installed on the cluster") { - s.Log.ErrorContext(ctx, "Failed to enroll EKS cluster", "cluster_name", r.EksClusterName, "error", err) + s.Log.ErrorContext(ctx, "Failed to enroll EKS cluster", "cluster_name", r.EksClusterName, "issue_type", r.IssueType, "error", r.Error) } else { s.Log.DebugContext(ctx, "EKS cluster already has installed kube agent", "cluster_name", r.EksClusterName) } + + cluster := clustersByName[r.EksClusterName] + s.awsEKSTasks.addFailedEnrollment( + awsEKSTaskKey{ + integration: integration, + issueType: r.IssueType, + accountID: cluster.GetAWSConfig().AccountID, + region: cluster.GetAWSConfig().Region, + appAutoDiscover: kubeAppDiscovery, + }, + &usertasksv1.DiscoverEKSCluster{ + DiscoveryConfig: discoveryConfig, + DiscoveryGroup: s.DiscoveryGroup, + SyncTime: timestamppb.New(s.clock.Now()), + Name: cluster.GetAWSConfig().Name, + }, + ) + s.upsertTasksForAWSEKSFailedEnrollments() } else { s.Log.InfoContext(ctx, "Successfully enrolled EKS cluster", "cluster_name", r.EksClusterName) } diff --git a/lib/srv/discovery/kube_integration_watcher_test.go b/lib/srv/discovery/kube_integration_watcher_test.go index 197032884e13c..566d587635ff6 100644 --- a/lib/srv/discovery/kube_integration_watcher_test.go +++ b/lib/srv/discovery/kube_integration_watcher_test.go @@ -586,4 +586,4 @@ func (m *mockEnrollEKSClusterClient) PresignGetCallerIdentityURL(ctx context.Con return "", nil } -var _ awsoidc.EnrollEKSCLusterClient = &mockEnrollEKSClusterClient{} +var _ awsoidc.EnrollEKSClusterClient = &mockEnrollEKSClusterClient{} diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index ebf1c0b32ee92..e2f6841843bf8 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -363,7 +363,7 @@ func (s *Server) ReportEC2SSMInstallationResult(ctx context.Context, result *ser type awsEC2Tasks struct { mu sync.RWMutex // instancesIssues maps the Discover EC2 User Task grouping parts to a set of instances metadata. - instancesIssues map[awsEC2TaskKey]map[string]*usertasksv1.DiscoverEC2Instance + instancesIssues map[awsEC2TaskKey]*usertasksv1.DiscoverEC2 // issuesSyncQueue is used to register which groups were changed in memory but were not yet sent to the cluster. // When upserting User Tasks, if the group is not in issuesSyncQueue, // then the cluster already has the latest version of this particular group. @@ -380,13 +380,13 @@ type awsEC2TaskKey struct { installerScript string } -// iterationStarted clears out any in memory issues that were recorded. +// reset clears out any in memory issues that were recorded. // This is used when starting a new Auto Discover EC2 watcher iteration. func (d *awsEC2Tasks) reset() { d.mu.Lock() defer d.mu.Unlock() - d.instancesIssues = make(map[awsEC2TaskKey]map[string]*usertasksv1.DiscoverEC2Instance) + d.instancesIssues = make(map[awsEC2TaskKey]*usertasksv1.DiscoverEC2) d.issuesSyncQueue = make(map[awsEC2TaskKey]struct{}) } @@ -397,16 +397,25 @@ func (d *awsEC2Tasks) addFailedEnrollment(g awsEC2TaskKey, instance *usertasksv1 if g.integration == "" { return } + if g.issueType == "" { + return + } d.mu.Lock() defer d.mu.Unlock() if d.instancesIssues == nil { - d.instancesIssues = make(map[awsEC2TaskKey]map[string]*usertasksv1.DiscoverEC2Instance) + d.instancesIssues = make(map[awsEC2TaskKey]*usertasksv1.DiscoverEC2) } if _, ok := d.instancesIssues[g]; !ok { - d.instancesIssues[g] = make(map[string]*usertasksv1.DiscoverEC2Instance) + d.instancesIssues[g] = &usertasksv1.DiscoverEC2{ + Instances: make(map[string]*usertasksv1.DiscoverEC2Instance), + AccountId: g.accountID, + Region: g.region, + SsmDocument: g.ssmDocument, + InstallerScript: g.installerScript, + } } - d.instancesIssues[g][instance.InstanceId] = instance + d.instancesIssues[g].Instances[instance.InstanceId] = instance if d.issuesSyncQueue == nil { d.issuesSyncQueue = make(map[awsEC2TaskKey]struct{}) @@ -414,6 +423,70 @@ func (d *awsEC2Tasks) addFailedEnrollment(g awsEC2TaskKey, instance *usertasksv1 d.issuesSyncQueue[g] = struct{}{} } +// awsEKSTasks contains the Discover EKS User Tasks that must be reported to the user. +type awsEKSTasks struct { + mu sync.RWMutex + // clusterIssues maps the EKS Task Key to a set of clusters. + // Each Task Key represents a single User Task that is going to be created for a set of EKS Clusters that suffer from the same issue. + clusterIssues map[awsEKSTaskKey]*usertasksv1.DiscoverEKS + // issuesSyncQueue is used to register which groups were changed in memory but were not yet sent to the cluster. + // When upserting User Tasks, if the group is not in issuesSyncQueue, + // then the cluster already has the latest version of this particular group. + issuesSyncQueue map[awsEKSTaskKey]struct{} +} + +// awsEKSTaskKey identifies a UserTask group. +type awsEKSTaskKey struct { + integration string + issueType string + accountID string + region string + appAutoDiscover bool +} + +// reset clears out any in memory issues that were recorded. +// This is used when starting a new Auto Discover EKS watcher iteration. +func (d *awsEKSTasks) reset() { + d.mu.Lock() + defer d.mu.Unlock() + + d.clusterIssues = make(map[awsEKSTaskKey]*usertasksv1.DiscoverEKS) + d.issuesSyncQueue = make(map[awsEKSTaskKey]struct{}) +} + +// addFailedEnrollment adds an enrollment failure of a given cluster. +func (d *awsEKSTasks) addFailedEnrollment(g awsEKSTaskKey, cluster *usertasksv1.DiscoverEKSCluster) { + // Only failures associated with an Integration are reported. + // There's no major blocking for showing non-integration User Tasks, but this keeps scope smaller. + if g.integration == "" { + return + } + + if g.issueType == "" { + return + } + + d.mu.Lock() + defer d.mu.Unlock() + if d.clusterIssues == nil { + d.clusterIssues = make(map[awsEKSTaskKey]*usertasksv1.DiscoverEKS) + } + if _, ok := d.clusterIssues[g]; !ok { + d.clusterIssues[g] = &usertasksv1.DiscoverEKS{ + Clusters: make(map[string]*usertasksv1.DiscoverEKSCluster), + AccountId: g.accountID, + Region: g.region, + AppAutoDiscover: g.appAutoDiscover, + } + } + d.clusterIssues[g].Clusters[cluster.Name] = cluster + + if d.issuesSyncQueue == nil { + d.issuesSyncQueue = make(map[awsEKSTaskKey]struct{}) + } + d.issuesSyncQueue[g] = struct{}{} +} + // acquireSemaphoreForUserTask tries to acquire a semaphore lock for this user task. // It returns a func which must be called to release the lock. // It also returns a context which is tied to the lease and will be canceled if the lease ends. @@ -470,7 +543,11 @@ func (s *Server) acquireSemaphoreForUserTask(userTaskName string) (releaseFn fun // merges them against the ones that exist in the cluster. // // All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices. -func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) error { +func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInstances *usertasksv1.DiscoverEC2) error { + if len(failedInstances.Instances) == 0 { + return nil + } + userTaskName := usertasks.TaskNameForDiscoverEC2(usertasks.TaskNameForDiscoverEC2Parts{ Integration: taskGroup.integration, IssueType: taskGroup.issueType, @@ -506,11 +583,7 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta TaskType: usertasks.TaskTypeDiscoverEC2, IssueType: taskGroup.issueType, State: usertasks.TaskStateOpen, - DiscoverEc2: &usertasksv1.DiscoverEC2{ - AccountId: taskGroup.accountID, - Region: taskGroup.region, - Instances: failedInstances, - }, + DiscoverEc2: failedInstances, }, usertasks.WithExpiration(taskExpiration), ) @@ -526,7 +599,7 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta } // discoverEC2UserTaskAddExistingInstances takes the UserTask stored in the cluster and merges it into the existing map of failed instances. -func (s *Server) discoverEC2UserTaskAddExistingInstances(currentUserTask *usertasksv1.UserTask, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) map[string]*usertasksv1.DiscoverEC2Instance { +func (s *Server) discoverEC2UserTaskAddExistingInstances(currentUserTask *usertasksv1.UserTask, failedInstances *usertasksv1.DiscoverEC2) *usertasksv1.DiscoverEC2 { for existingInstanceID, existingInstance := range currentUserTask.Spec.DiscoverEc2.Instances { // Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup. // So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup. @@ -544,7 +617,7 @@ func (s *Server) discoverEC2UserTaskAddExistingInstances(currentUserTask *userta } // Merge existing cluster state into in-memory object. - failedInstances[existingInstanceID] = existingInstance + failedInstances.Instances[existingInstanceID] = existingInstance } return failedInstances } @@ -553,13 +626,27 @@ func (s *Server) upsertTasksForAWSEC2FailedEnrollments() { s.awsEC2Tasks.mu.Lock() defer s.awsEC2Tasks.mu.Unlock() for g := range s.awsEC2Tasks.issuesSyncQueue { - instancesIssueByID := s.awsEC2Tasks.instancesIssues[g] - if len(instancesIssueByID) == 0 { + if err := s.mergeUpsertDiscoverEC2Task(g, s.awsEC2Tasks.instancesIssues[g]); err != nil { + s.Log.WarnContext(s.ctx, "Failed to create discover ec2 user task", + "integration", g.integration, + "issue_type", g.issueType, + "aws_account_id", g.accountID, + "aws_region", g.region, + "error", err, + ) continue } - if err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID); err != nil { - s.Log.WarnContext(s.ctx, "Failed to create discover ec2 user task", + delete(s.awsEC2Tasks.issuesSyncQueue, g) + } +} + +func (s *Server) upsertTasksForAWSEKSFailedEnrollments() { + s.awsEKSTasks.mu.Lock() + defer s.awsEKSTasks.mu.Unlock() + for g := range s.awsEKSTasks.issuesSyncQueue { + if err := s.mergeUpsertDiscoverEKSTask(g, s.awsEKSTasks.clusterIssues[g]); err != nil { + s.Log.WarnContext(s.ctx, "Failed to create discover eks user task", "integration", g.integration, "issue_type", g.issueType, "aws_account_id", g.accountID, @@ -569,6 +656,88 @@ func (s *Server) upsertTasksForAWSEC2FailedEnrollments() { continue } - delete(s.awsEC2Tasks.issuesSyncQueue, g) + delete(s.awsEKSTasks.issuesSyncQueue, g) + } +} + +// mergeUpsertDiscoverEKSTask takes the current DiscoverEKS User Task issues stored in memory and +// merges them against the ones that exist in the cluster. +// +// All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices. +func (s *Server) mergeUpsertDiscoverEKSTask(taskGroup awsEKSTaskKey, failedClusters *usertasksv1.DiscoverEKS) error { + if len(failedClusters.Clusters) == 0 { + return nil + } + + userTaskName := usertasks.TaskNameForDiscoverEKS(usertasks.TaskNameForDiscoverEKSParts{ + Integration: taskGroup.integration, + IssueType: taskGroup.issueType, + AccountID: taskGroup.accountID, + Region: taskGroup.region, + AppAutoDiscover: taskGroup.appAutoDiscover, + }) + + releaseFn, ctxWithLease, err := s.acquireSemaphoreForUserTask(userTaskName) + if err != nil { + return trace.Wrap(err) + } + defer releaseFn() + + // Fetch the current task because it might have instances discovered by another group of DiscoveryServices. + currentUserTask, err := s.AccessPoint.GetUserTask(ctxWithLease, userTaskName) + switch { + case trace.IsNotFound(err): + case err != nil: + return trace.Wrap(err) + default: + failedClusters = s.discoverEKSUserTaskAddExistingClusters(currentUserTask, failedClusters) + } + + // If the DiscoveryService is stopped, or the issue does not happen again + // the task is removed to prevent users from working on issues that are no longer happening. + taskExpiration := s.clock.Now().Add(2 * s.PollInterval) + + task, err := usertasks.NewDiscoverEKSUserTask( + &usertasksv1.UserTaskSpec{ + Integration: taskGroup.integration, + TaskType: usertasks.TaskTypeDiscoverEKS, + IssueType: taskGroup.issueType, + State: usertasks.TaskStateOpen, + DiscoverEks: failedClusters, + }, + usertasks.WithExpiration(taskExpiration), + ) + if err != nil { + return trace.Wrap(err) + } + + if _, err := s.AccessPoint.UpsertUserTask(ctxWithLease, task); err != nil { + return trace.Wrap(err) + } + + return nil +} + +// discoverEKSUserTaskAddExistingClusters takes the UserTask stored in the cluster and merges it into the existing map of failed clusters. +func (s *Server) discoverEKSUserTaskAddExistingClusters(currentUserTask *usertasksv1.UserTask, failedClusters *usertasksv1.DiscoverEKS) *usertasksv1.DiscoverEKS { + for existingClusterName, existingCluster := range currentUserTask.Spec.DiscoverEks.Clusters { + // Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup. + // So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup. + // If other clusters exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup. + if existingCluster.DiscoveryGroup == s.DiscoveryGroup { + continue + } + + // For existing clusters whose sync time is too far in the past, just drop them. + // This ensures that if a cluster is removed from AWS, it will eventually disappear from the User Tasks' cluster list. + // It might also be the case that the DiscoveryConfig was changed and the cluster is no longer matched (because of labels/regions or other matchers). + clusterIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval) + if existingCluster.SyncTime.AsTime().Before(clusterIssueExpiration) { + continue + } + + // Merge existing cluster state into in-memory object. + failedClusters.Clusters[existingClusterName] = existingCluster } + return failedClusters }