Skip to content

Commit

Permalink
DiscoveryConfig: Update status after each run (#40267)
Browse files Browse the repository at this point in the history
* DiscoveryConfig: Update status after each run

Signed-off-by: Tiago Silva <[email protected]>

* add unit tests

* drop method by reducing interface area

---------

Signed-off-by: Tiago Silva <[email protected]>
  • Loading branch information
tigrato authored Apr 10, 2024
1 parent a90dc6f commit 502879a
Show file tree
Hide file tree
Showing 12 changed files with 414 additions and 28 deletions.
13 changes: 13 additions & 0 deletions api/client/discoveryconfig/discoveryconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,16 @@ func (c *Client) DeleteAllDiscoveryConfigs(ctx context.Context) error {
_, err := c.grpcClient.DeleteAllDiscoveryConfigs(ctx, &discoveryconfigv1.DeleteAllDiscoveryConfigsRequest{})
return trace.Wrap(err)
}

// UpdateDiscoveryConfigStatus updates the DiscoveryConfig Status field.
func (c *Client) UpdateDiscoveryConfigStatus(ctx context.Context, name string, status discoveryconfig.Status) (*discoveryconfig.DiscoveryConfig, error) {
resp, err := c.grpcClient.UpdateDiscoveryConfigStatus(ctx, &discoveryconfigv1.UpdateDiscoveryConfigStatusRequest{
Name: name,
Status: conv.StatusToProto(status),
})
if err != nil {
return nil, trace.Wrap(err)
}
dc, err := conv.FromProto(resp)
return dc, trace.Wrap(err)
}
27 changes: 16 additions & 11 deletions api/types/discoveryconfig/convert/v1/discoveryconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,6 @@ func ToProto(discoveryConfig *discoveryconfig.DiscoveryConfig) *discoveryconfigv
kubeMatchers = append(kubeMatchers, &m)
}

var lastSyncTime *timestamppb.Timestamp
if !discoveryConfig.Status.LastSyncTime.IsZero() {
lastSyncTime = timestamppb.New(discoveryConfig.Status.LastSyncTime)
}
status := &discoveryconfigv1.DiscoveryConfigStatus{
State: discoveryconfigv1.DiscoveryConfigState(discoveryconfigv1.DiscoveryConfigState_value[discoveryConfig.Status.State]),
ErrorMessage: discoveryConfig.Status.ErrorMessage,
DiscoveredResources: discoveryConfig.Status.DiscoveredResources,
LastSyncTime: lastSyncTime,
}
return &discoveryconfigv1.DiscoveryConfig{
Header: headerv1.ToResourceHeaderProto(discoveryConfig.ResourceHeader),
Spec: &discoveryconfigv1.DiscoveryConfigSpec{
Expand All @@ -139,6 +129,21 @@ func ToProto(discoveryConfig *discoveryconfig.DiscoveryConfig) *discoveryconfigv
Kube: kubeMatchers,
AccessGraph: discoveryConfig.Spec.AccessGraph,
},
Status: status,
Status: StatusToProto(discoveryConfig.Status),
}
}

// StatusToProto converts a discovery config status into the protobuf discovery config status object.
func StatusToProto(status discoveryconfig.Status) *discoveryconfigv1.DiscoveryConfigStatus {
var lastSyncTime *timestamppb.Timestamp
if !status.LastSyncTime.IsZero() {
lastSyncTime = timestamppb.New(status.LastSyncTime)
}

return &discoveryconfigv1.DiscoveryConfigStatus{
State: discoveryconfigv1.DiscoveryConfigState(discoveryconfigv1.DiscoveryConfigState_value[status.State]),
ErrorMessage: status.ErrorMessage,
DiscoveredResources: status.DiscoveredResources,
LastSyncTime: lastSyncTime,
}
}
8 changes: 8 additions & 0 deletions lib/auth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,9 @@ type DiscoveryAccessPoint interface {

// Ping gets basic info about the auth server.
Ping(context.Context) (proto.PingResponse, error)

// UpdateDiscoveryConfigStatus updates the status of a discovery config.
UpdateDiscoveryConfigStatus(ctx context.Context, name string, status discoveryconfig.Status) (*discoveryconfig.DiscoveryConfig, error)
}

// ReadOktaAccessPoint is a read only API interface to be
Expand Down Expand Up @@ -1381,6 +1384,11 @@ func (w *DiscoveryWrapper) Ping(ctx context.Context) (proto.PingResponse, error)
return w.NoCache.Ping(ctx)
}

// UpdateDiscoveryConfigStatus updates the status of a discovery config.
func (w *DiscoveryWrapper) UpdateDiscoveryConfigStatus(ctx context.Context, name string, status discoveryconfig.Status) (*discoveryconfig.DiscoveryConfig, error) {
return w.NoCache.UpdateDiscoveryConfigStatus(ctx, name, status)
}

// Close closes all associated resources
func (w *DiscoveryWrapper) Close() error {
err := w.NoCache.Close()
Expand Down
4 changes: 2 additions & 2 deletions lib/auth/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (c *Client) UpsertUser(ctx context.Context, user types.User) (types.User, e
}

// DiscoveryConfigClient returns a client for managing the DiscoveryConfig resource.
func (c *Client) DiscoveryConfigClient() services.DiscoveryConfigs {
func (c *Client) DiscoveryConfigClient() services.DiscoveryConfigWithStatusUpdater {
return c.APIClient.DiscoveryConfigClient()
}

Expand Down Expand Up @@ -1108,7 +1108,7 @@ type ClientI interface {
// Clients connecting to older Teleport versions, still get an DiscoveryConfig client
// when calling this method, but all RPCs will return "not implemented" errors
// (as per the default gRPC behavior).
DiscoveryConfigClient() services.DiscoveryConfigs
DiscoveryConfigClient() services.DiscoveryConfigWithStatusUpdater

// ResourceUsageClient returns a resource usage service client.
// Clients connecting to non-Enterprise clusters, or older Teleport versions,
Expand Down
28 changes: 23 additions & 5 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
kubeproto "github.com/gravitational/teleport/api/gen/proto/go/teleport/kube/v1"
transportpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/transport/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/discoveryconfig"
apievents "github.com/gravitational/teleport/api/types/events"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/api/utils/aws"
Expand Down Expand Up @@ -2360,19 +2361,24 @@ type eksClustersEnroller interface {
EnrollEKSClusters(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error)
}

type discoveryConfigClient interface {
UpdateDiscoveryConfigStatus(ctx context.Context, name string, status discoveryconfig.Status) (*discoveryconfig.DiscoveryConfig, error)
services.DiscoveryConfigsGetter
}

// combinedDiscoveryClient is an auth.Client client with other, specific, services added to it.
type combinedDiscoveryClient struct {
auth.ClientI
services.DiscoveryConfigsGetter
discoveryConfigClient
eksClustersEnroller
}

// newLocalCacheForDiscovery returns a new instance of access point for a discovery service.
func (process *TeleportProcess) newLocalCacheForDiscovery(clt auth.ClientI, cacheName []string) (auth.DiscoveryAccessPoint, error) {
client := combinedDiscoveryClient{
ClientI: clt,
DiscoveryConfigsGetter: clt.DiscoveryConfigClient(),
eksClustersEnroller: clt.IntegrationAWSOIDCClient(),
ClientI: clt,
discoveryConfigClient: clt.DiscoveryConfigClient(),
eksClustersEnroller: clt.IntegrationAWSOIDCClient(),
}

// if caching is disabled, return access point
Expand Down Expand Up @@ -2464,10 +2470,22 @@ func (process *TeleportProcess) newLocalCacheForWindowsDesktop(clt auth.ClientI,
return auth.NewWindowsDesktopWrapper(clt, cache), nil
}

// accessPointWrapper is a wrapper around auth.ClientI that reduces the surface area of the
// auth.ClientI.DiscoveryConfigClient interface to services.DiscoveryConfigs.
// Cache doesn't implement the full auth.ClientI interface, so we need to wrap auth.ClientI to
// to make it compatible with the services.DiscoveryConfigs interface.
type accessPointWrapper struct {
auth.ClientI
}

func (a accessPointWrapper) DiscoveryConfigClient() services.DiscoveryConfigs {
return a.ClientI.DiscoveryConfigClient()
}

// NewLocalCache returns new instance of access point
func (process *TeleportProcess) NewLocalCache(clt auth.ClientI, setupConfig cache.SetupConfigFn, cacheName []string) (*cache.Cache, error) {
return process.newAccessCache(accesspoint.AccessCacheConfig{
Services: clt,
Services: &accessPointWrapper{ClientI: clt},
Setup: setupConfig,
CacheName: cacheName,
})
Expand Down
7 changes: 7 additions & 0 deletions lib/services/discoveryconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ type DiscoveryConfigs interface {
DeleteAllDiscoveryConfigs(context.Context) error
}

// DiscoveryConfigWithStatusUpdater defines an interface for managing DiscoveryConfig resources including updating their status.
type DiscoveryConfigWithStatusUpdater interface {
DiscoveryConfigs
// UpdateDiscoveryConfigStatus updates the status of the specified DiscoveryConfig resource.
UpdateDiscoveryConfigStatus(context.Context, string, discoveryconfig.Status) (*discoveryconfig.DiscoveryConfig, error)
}

// DiscoveryConfigsGetter defines methods for List/Read operations on DiscoveryConfig Resources.
type DiscoveryConfigsGetter interface {
// ListDiscoveryConfigs returns a paginated list of all DiscoveryConfig resources.
Expand Down
60 changes: 54 additions & 6 deletions lib/srv/discovery/access_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"

discoveryconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/discoveryconfig/v1"
"github.com/gravitational/teleport/api/metadata"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/discoveryconfig"
accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
"github.com/gravitational/teleport/lib/services"
aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync"
Expand Down Expand Up @@ -65,6 +67,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
}
return trace.Wrap(errNoAccessGraphFetchers)
}
s.updateDiscoveryConfigStatus(allFetchers, nil, true /* preRun */)
resultsC := make(chan fetcherResult, len(allFetchers))
// Use a channel to limit the number of concurrent fetchers.
tokens := make(chan struct{}, 3)
Expand Down Expand Up @@ -102,6 +105,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
// Merge all results into a single result
upsert, toDel := aws_sync.ReconcileResults(currentTAGResources, result)
err = push(stream, upsert, toDel)
s.updateDiscoveryConfigStatus(allFetchers, err, false /* preRun */)
if err != nil {
s.Log.WithError(err).Error("Error pushing TAGs")
return nil
Expand Down Expand Up @@ -358,7 +362,7 @@ func grpcCredentials(config AccessGraphConfig, certs []tls.Certificate) (grpc.Di
}

func (s *Server) initAccessGraphWatchers(ctx context.Context, cfg *Config) error {
fetchers, err := s.accessGraphFetchersFromMatchers(ctx, cfg.Matchers)
fetchers, err := s.accessGraphFetchersFromMatchers(ctx, cfg.Matchers, "" /* discoveryConfigName */)
if err != nil {
s.Log.WithError(err).Error("Error initializing access graph fetchers")
}
Expand Down Expand Up @@ -402,7 +406,7 @@ func (s *Server) initAccessGraphWatchers(ctx context.Context, cfg *Config) error
}

// accessGraphFetchersFromMatchers converts Matchers into a set of AWS Sync Fetchers.
func (s *Server) accessGraphFetchersFromMatchers(ctx context.Context, matchers Matchers) ([]aws_sync.AWSSync, error) {
func (s *Server) accessGraphFetchersFromMatchers(ctx context.Context, matchers Matchers, discoveryConfigName string) ([]aws_sync.AWSSync, error) {
var fetchers []aws_sync.AWSSync
var errs []error
if matchers.AccessGraph == nil {
Expand All @@ -420,10 +424,11 @@ func (s *Server) accessGraphFetchersFromMatchers(ctx context.Context, matchers M
fetcher, err := aws_sync.NewAWSFetcher(
ctx,
aws_sync.Config{
CloudClients: s.CloudClients,
AssumeRole: assumeRole,
Regions: awsFetcher.Regions,
Integration: awsFetcher.Integration,
CloudClients: s.CloudClients,
AssumeRole: assumeRole,
Regions: awsFetcher.Regions,
Integration: awsFetcher.Integration,
DiscoveryConfigName: discoveryConfigName,
},
)
if err != nil {
Expand All @@ -435,3 +440,46 @@ func (s *Server) accessGraphFetchersFromMatchers(ctx context.Context, matchers M

return fetchers, trace.NewAggregate(errs...)
}

func (s *Server) updateDiscoveryConfigStatus(fetchers []aws_sync.AWSSync, pushErr error, preRun bool) {
lastUpdate := s.clock.Now()
for _, fetcher := range fetchers {
// Only update the status for fetchers that are from the discovery config.
if !fetcher.IsFromDiscoveryConfig() {
continue
}

status := buildFetcherStatus(fetcher, pushErr, lastUpdate)
if preRun {
// If this is a pre-run, the status is syncing.
status.State = discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String()
}
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
defer cancel()
_, err := s.AccessPoint.UpdateDiscoveryConfigStatus(ctx, fetcher.DiscoveryConfigName(), status)
switch {
case trace.IsNotImplemented(err):
s.Log.Warn("UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.")
case err != nil:
s.Log.WithError(err).Infof("Error updating discovery config %q status", fetcher.DiscoveryConfigName())
}
}
}

func buildFetcherStatus(fetcher aws_sync.AWSSync, pushErr error, lastUpdate time.Time) discoveryconfig.Status {
count, err := fetcher.Status()
err = trace.NewAggregate(err, pushErr)
var errStr *string
state := discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_RUNNING
if err != nil {
errStr = new(string)
*errStr = err.Error()
state = discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_ERROR
}
return discoveryconfig.Status{
State: state.String(),
ErrorMessage: errStr,
LastSyncTime: lastUpdate,
DiscoveredResources: count,
}
}
Loading

0 comments on commit 502879a

Please sign in to comment.