Skip to content

Commit

Permalink
Add static host user cache (#45573)
Browse files Browse the repository at this point in the history
This change adds cache support for static host users.
  • Loading branch information
atburke committed Sep 10, 2024
1 parent f9127a4 commit 7fffcd9
Show file tree
Hide file tree
Showing 16 changed files with 562 additions and 296 deletions.
8 changes: 8 additions & 0 deletions api/client/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
machineidv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
accesslistv1conv "github.com/gravitational/teleport/api/types/accesslist/convert/v1"
Expand Down Expand Up @@ -94,6 +95,10 @@ func EventToGRPC(in types.Event) (*proto.Event, error) {
out.Resource = &proto.Event_AccessGraphSettings{
AccessGraphSettings: r,
}
case *userprovisioningpb.StaticHostUser:
out.Resource = &proto.Event_StaticHostUser{
StaticHostUser: r,
}
default:
return nil, trace.BadParameter("resource type %T is not supported", r)
}
Expand Down Expand Up @@ -534,6 +539,9 @@ func EventFromGRPC(in *proto.Event) (*types.Event, error) {
} else if r := in.GetAccessGraphSettings(); r != nil {
out.Resource = types.Resource153ToLegacy(r)
return &out, nil
} else if r := in.GetStaticHostUser(); r != nil {
out.Resource = types.Resource153ToLegacy(r)
return &out, nil
} else {
return nil, trace.BadParameter("received unsupported resource %T", in.Resource)
}
Expand Down
608 changes: 318 additions & 290 deletions api/client/proto/event.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/proto/teleport/legacy/client/proto/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import "teleport/machineid/v1/federation.proto";
import "teleport/notifications/v1/notifications.proto";
import "teleport/secreports/v1/secreports.proto";
import "teleport/userloginstate/v1/userloginstate.proto";
import "teleport/userprovisioning/v1/statichostuser.proto";

option go_package = "github.com/gravitational/teleport/api/client/proto";

Expand Down Expand Up @@ -175,6 +176,8 @@ message Event {
teleport.clusterconfig.v1.AccessGraphSettings AccessGraphSettings = 61;
// SPIFFEFederation is a resource for SPIFFE federation.
teleport.machineid.v1.SPIFFEFederation SPIFFEFederation = 62;
// StaticHostUser is a resource for static host users.
teleport.userprovisioning.v1.StaticHostUser StaticHostUser = 63;
// AutoUpdateConfig is a resource for autoupdate config.
teleport.autoupdate.v1.AutoUpdateConfig AutoUpdateConfig = 64;
// AutoUpdateVersion is a resource for autoupdate version.
Expand Down
2 changes: 2 additions & 0 deletions lib/auth/accesspoint/accesspoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Config struct {
SecReports services.SecReports
SnowflakeSession services.SnowflakeSession
SPIFFEFederations cache.SPIFFEFederationReader
StaticHostUsers services.StaticHostUser
Trust services.Trust
UserGroups services.UserGroups
UserLoginStates services.UserLoginStates
Expand Down Expand Up @@ -186,6 +187,7 @@ func NewCache(cfg Config) (*cache.Cache, error) {
SecReports: cfg.SecReports,
SnowflakeSession: cfg.SnowflakeSession,
SPIFFEFederations: cfg.SPIFFEFederations,
StaticHostUsers: cfg.StaticHostUsers,
Trust: cfg.Trust,
UserGroups: cfg.UserGroups,
UserLoginStates: cfg.UserLoginStates,
Expand Down
6 changes: 6 additions & 0 deletions lib/auth/authclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
machineidv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1"
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v1"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
Expand Down Expand Up @@ -1182,6 +1183,11 @@ type Cache interface {
ListSPIFFEFederations(ctx context.Context, pageSize int, lastToken string) ([]*machineidv1.SPIFFEFederation, string, error)
// GetAccessGraphSettings returns the access graph settings.
GetAccessGraphSettings(context.Context) (*clusterconfigpb.AccessGraphSettings, error)

// ListStaticHostUsers lists static host users.
ListStaticHostUsers(ctx context.Context, pageSize int, startKey string) ([]*userprovisioningpb.StaticHostUser, string, error)
// GetStaticHostUser returns a static host user by name.
GetStaticHostUser(ctx context.Context, name string) (*userprovisioningpb.StaticHostUser, error)
}

type NodeWrapper struct {
Expand Down
10 changes: 10 additions & 0 deletions lib/auth/authclient/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@ func (c *Client) CrownJewelsClient() services.CrownJewels {
return c.APIClient.CrownJewelServiceClient()
}

// StaticHostUserClient returns a client for managing static host user resources.
func (c *Client) StaticHostUserClient() services.StaticHostUser {
return c.APIClient.StaticHostUserClient()
}

// DeleteStaticTokens deletes static tokens
func (c *Client) DeleteStaticTokens() error {
return trace.NotImplemented(notImplementedMessage)
Expand Down Expand Up @@ -1713,6 +1718,11 @@ type ClientI interface {
// will return "not implemented" errors (as per the default gRPC behavior).
VnetConfigServiceClient() vnet.VnetConfigServiceClient

// StaticHostUserClient returns a StaticHostUser client.
// Clients connecting to older Teleport versions still get a client when calling this method, but all RPCs
// will return "not implemented" errors (as per the default gRPC behavior).
StaticHostUserClient() services.StaticHostUser

// CloneHTTPClient creates a new HTTP client with the same configuration.
CloneHTTPClient(params ...roundtrip.ClientParam) (*HTTPClient, error)

Expand Down
1 change: 1 addition & 0 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5427,6 +5427,7 @@ func NewGRPCServer(cfg GRPCServerConfig) (*GRPCServer, error) {
staticHostUserServer, err := statichostuserv1.NewService(statichostuserv1.ServiceConfig{
Authorizer: cfg.Authorizer,
Backend: cfg.AuthServer.Services,
Cache: cfg.AuthServer.Cache,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
1 change: 1 addition & 0 deletions lib/auth/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) {
SecReports: svces.SecReports,
SnowflakeSession: svces.Identity,
SPIFFEFederations: svces.SPIFFEFederations,
StaticHostUsers: svces.StaticHostUser,
Trust: svces.TrustInternal,
UserGroups: svces.UserGroups,
UserLoginStates: svces.UserLoginStates,
Expand Down
21 changes: 16 additions & 5 deletions lib/auth/statichostuser/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@ type ServiceConfig struct {
Authorizer authz.Authorizer
// Backend is the backend used to store static host users.
Backend services.StaticHostUser
// TODO(atburke): add cache
// Cache is the cache used to store static host users.
Cache Cache
}

// Cache is a subset of the service interface for reading items from the cache.
type Cache interface {
// ListStaticHostUsers lists static host users.
ListStaticHostUsers(ctx context.Context, pageSize int, pageToken string) ([]*userprovisioningpb.StaticHostUser, string, error)
// GetStaticHostUser returns a static host user by name.
GetStaticHostUser(ctx context.Context, name string) (*userprovisioningpb.StaticHostUser, error)
}

// Service implements the static host user RPC service.
Expand All @@ -43,6 +52,7 @@ type Service struct {

authorizer authz.Authorizer
backend services.StaticHostUser
cache Cache
}

// NewService creates a new static host user gRPC service.
Expand All @@ -52,11 +62,14 @@ func NewService(cfg ServiceConfig) (*Service, error) {
return nil, trace.BadParameter("backend is required")
case cfg.Authorizer == nil:
return nil, trace.BadParameter("authorizer is required")
case cfg.Cache == nil:
return nil, trace.BadParameter("cache is required")
}

return &Service{
authorizer: cfg.Authorizer,
backend: cfg.Backend,
cache: cfg.Cache,
}, nil
}

Expand All @@ -73,8 +86,7 @@ func (s *Service) ListStaticHostUsers(ctx context.Context, req *userprovisioning
return nil, trace.Wrap(err)
}

// TODO(atburke): Switch to using the cache after static host users have been added to the cache.
users, nextToken, err := s.backend.ListStaticHostUsers(ctx, int(req.PageSize), req.PageToken)
users, nextToken, err := s.cache.ListStaticHostUsers(ctx, int(req.PageSize), req.PageToken)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -100,8 +112,7 @@ func (s *Service) GetStaticHostUser(ctx context.Context, req *userprovisioningpb
return nil, trace.Wrap(err)
}

// TODO(atburke): Switch to using the cache after static host users have been added to the cache.
out, err := s.backend.GetStaticHostUser(ctx, req.Name)
out, err := s.cache.GetStaticHostUser(ctx, req.Name)
return out, trace.Wrap(err)
}

Expand Down
1 change: 1 addition & 0 deletions lib/auth/statichostuser/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func initSvc(t *testing.T, authorizerFn func(t *testing.T, client localClient) a
resourceSvc, err := NewService(ServiceConfig{
Authorizer: authorizerFn(t, client),
Backend: localResourceService,
Cache: localResourceService,
})
require.NoError(t, err)

Expand Down
39 changes: 39 additions & 0 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
dbobjectv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/dbobject/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v1"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
"github.com/gravitational/teleport/api/internalutils/stream"
apitracing "github.com/gravitational/teleport/api/observability/tracing"
Expand Down Expand Up @@ -183,6 +184,7 @@ func ForAuth(cfg Config) Config {
{Kind: types.KindDatabaseObject},
{Kind: types.KindSPIFFEFederation},
{Kind: types.KindAccessGraphSettings},
{Kind: types.KindStaticHostUser},
}
cfg.QueueSize = defaults.AuthQueueSize
// We don't want to enable partial health for auth cache because auth uses an event stream
Expand Down Expand Up @@ -294,6 +296,7 @@ func ForNode(cfg Config) Config {
// data about other namespaces or node events
{Kind: types.KindNamespace, Name: apidefaults.Namespace},
{Kind: types.KindNetworkRestrictions},
{Kind: types.KindStaticHostUser},
}

cfg.QueueSize = defaults.NodeQueueSize
Expand Down Expand Up @@ -521,6 +524,7 @@ type Cache struct {
notificationsCache services.Notifications
accessMontoringRuleCache services.AccessMonitoringRules
spiffeFederationCache spiffeFederationCacher
staticHostUsersCache *local.StaticHostUserService

// closed indicates that the cache has been closed
closed atomic.Bool
Expand Down Expand Up @@ -695,6 +699,8 @@ type Config struct {
AccessMonitoringRules services.AccessMonitoringRules
// SPIFFEFederations is the SPIFFE federations service.
SPIFFEFederations SPIFFEFederationReader
// StaticHostUsers is the static host user service.
StaticHostUsers services.StaticHostUser
// Backend is a backend for local cache
Backend backend.Backend
// MaxRetryPeriod is the maximum period between cache retries on failures
Expand Down Expand Up @@ -936,6 +942,12 @@ func New(config Config) (*Cache, error) {
return nil, trace.Wrap(err)
}

staticHostUserCache, err := local.NewStaticHostUserService(config.Backend)
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

cs := &Cache{
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -977,6 +989,7 @@ func New(config Config) (*Cache, error) {
lowVolumeEventsFanout: utils.NewRoundRobin(lowVolumeFanouts),
kubeWaitingContsCache: kubeWaitingContsCache,
spiffeFederationCache: spiffeFederationCache,
staticHostUsersCache: staticHostUserCache,
Logger: log.WithFields(log.Fields{
teleport.ComponentKey: config.Component,
}),
Expand Down Expand Up @@ -2296,6 +2309,32 @@ func (c *Cache) GetKubernetesWaitingContainer(ctx context.Context, req *kubewait
return rg.reader.GetKubernetesWaitingContainer(ctx, req)
}

// ListStaticHostUsers lists static host users.
func (c *Cache) ListStaticHostUsers(ctx context.Context, pageSize int, pageToken string) ([]*userprovisioningpb.StaticHostUser, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListStaticHostUsers")
defer span.End()

rg, err := readCollectionCache(c, c.collections.staticHostUsers)
if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
return rg.reader.ListStaticHostUsers(ctx, pageSize, pageToken)
}

// GetStaticHostUser returns a static host user by name.
func (c *Cache) GetStaticHostUser(ctx context.Context, name string) (*userprovisioningpb.StaticHostUser, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetStaticHostUser")
defer span.End()

rg, err := readCollectionCache(c, c.collections.staticHostUsers)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
return rg.reader.GetStaticHostUser(ctx, name)
}

// GetApplicationServers returns all registered application servers.
func (c *Cache) GetApplicationServers(ctx context.Context, namespace string) ([]types.AppServer, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetApplicationServers")
Expand Down
Loading

0 comments on commit 7fffcd9

Please sign in to comment.