Skip to content

Commit

Permalink
port core CA ops to use revisions/atomics (#39845) (#40440)
Browse files Browse the repository at this point in the history
* port core CA ops to use revisions/atomics

* allow atomicwrite in mirror mode
  • Loading branch information
fspmarshall authored Apr 11, 2024
1 parent 769dc36 commit 6555e2a
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 215 deletions.
50 changes: 18 additions & 32 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {

closeCtx, cancelFunc := context.WithCancel(context.TODO())
services := &Services{
Trust: cfg.Trust,
TrustInternal: cfg.Trust,
PresenceInternal: cfg.Presence,
Provisioner: cfg.Provisioner,
Identity: cfg.Identity,
Expand Down Expand Up @@ -521,7 +521,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
}

type Services struct {
services.Trust
services.TrustInternal
services.PresenceInternal
services.Provisioner
services.Identity
Expand Down Expand Up @@ -6364,50 +6364,36 @@ func mergeKeySets(a, b types.CAKeySet) types.CAKeySet {
return newKeySet
}

// addAdditionalTrustedKeysAtomic performs an atomic CompareAndSwap to update
// addAdditionalTrustedKeysAtomic performs an atomic update to
// the given CA with newKeys added to the AdditionalTrustedKeys
func (a *Server) addAdditionalTrustedKeysAtomic(
ctx context.Context,
currentCA types.CertAuthority,
newKeys types.CAKeySet,
needsUpdate func(types.CertAuthority) (bool, error),
) error {
for {
select {
case <-ctx.Done():
return trace.Wrap(ctx.Err())
default:
}
updateRequired, err := needsUpdate(currentCA)
if err != nil {
return trace.Wrap(err)
}
if !updateRequired {
return nil
}
func (a *Server) addAdditionalTrustedKeysAtomic(ctx context.Context, ca types.CertAuthority, newKeys types.CAKeySet, needsUpdate func(types.CertAuthority) (bool, error)) error {
const maxIterations = 64

newCA := currentCA.Clone()
currentKeySet := newCA.GetAdditionalTrustedKeys()
mergedKeySet := mergeKeySets(currentKeySet, newKeys)
if err := newCA.SetAdditionalTrustedKeys(mergedKeySet); err != nil {
for i := 0; i < maxIterations; i++ {
if update, err := needsUpdate(ca); err != nil || !update {
return trace.Wrap(err)
}

err = a.CompareAndSwapCertAuthority(newCA, currentCA)
if err != nil && !trace.IsCompareFailed(err) {
err := ca.SetAdditionalTrustedKeys(mergeKeySets(
ca.GetAdditionalTrustedKeys(),
newKeys,
))
if err != nil {
return trace.Wrap(err)
}
if err == nil {
// success!

if _, err := a.UpdateCertAuthority(ctx, ca); err == nil {
return nil
} else if !errors.Is(err, backend.ErrIncorrectRevision) {
return trace.Wrap(err)
}
// else trace.IsCompareFailed(err) == true (CA was concurrently updated)

currentCA, err = a.Services.GetCertAuthority(ctx, currentCA.GetID(), true)
ca, err = a.Services.GetCertAuthority(ctx, ca.GetID(), true)
if err != nil {
return trace.Wrap(err)
}
}
return trace.Errorf("too many conflicts attempting to set additional trusted keys for ca %q of type %q", ca.GetClusterName(), ca.GetType())
}

// newKeySet generates a new sets of keys for a given CA type.
Expand Down
2 changes: 1 addition & 1 deletion lib/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3451,7 +3451,7 @@ func newTestServices(t *testing.T) Services {
require.NoError(t, err)

return Services{
Trust: local.NewCAService(bk),
TrustInternal: local.NewCAService(bk),
PresenceInternal: local.NewPresenceService(bk),
Provisioner: local.NewProvisioningService(bk),
Identity: local.NewTestIdentityService(bk),
Expand Down
4 changes: 2 additions & 2 deletions lib/auth/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type InitConfig struct {
OIDCConnectors []types.OIDCConnector

// Trust is a service that manages users and credentials
Trust services.Trust
Trust services.TrustInternal

// Presence service is a discovery and heartbeat tracker
Presence services.PresenceInternal
Expand Down Expand Up @@ -496,7 +496,7 @@ func initCluster(ctx context.Context, cfg InitConfig, asrv *Server) error {
return trace.Wrap(err, "applying migrations")
}
span.AddEvent("migrating db_client_authority")
err = migrateDBClientAuthority(ctx, asrv.Trust, cfg.ClusterName.GetClusterName())
err = migrateDBClientAuthority(ctx, asrv.Services, cfg.ClusterName.GetClusterName())
if err != nil {
return trace.Wrap(err)
}
Expand Down
14 changes: 6 additions & 8 deletions lib/auth/rotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (a *Server) RotateCertAuthority(ctx context.Context, req types.RotateReques
if err != nil {
return trace.Wrap(err)
}
if err := a.CompareAndSwapCertAuthority(rotated, existing); err != nil {
if _, err := a.UpdateCertAuthority(ctx, rotated); err != nil {
return trace.Wrap(err)
}
rotation := rotated.GetRotation()
Expand Down Expand Up @@ -213,22 +213,20 @@ func (a *Server) RotateExternalCertAuthority(ctx context.Context, ca types.CertA
// a rotation state of "" gets stored as "standby" after
// CheckAndSetDefaults, so if `ca` came in with a zeroed rotation we must do
// this before checking if `updated` is the same as `existing` or the check
// will fail for no reason (CheckAndSetDefaults is idempotent, so it's fine
// to call it both here and in CompareAndSwapCertAuthority)
// will fail for no reason.
updated.SetRotation(ca.GetRotation())
if err := services.CheckAndSetDefaults(updated); err != nil {
return trace.Wrap(err)
}

// CASing `updated` over `existing` if they're equivalent will only cause
// writing `updated` over `existing` if they're equivalent will only cause
// backend and watcher spam for no gain, so we exit early if that's the case
if services.CertAuthoritiesEquivalent(existing, updated) {
return nil
}

// use compare and swap to protect from concurrent updates
// by trusted cluster API
if err := a.CompareAndSwapCertAuthority(updated, existing); err != nil {
// use update rather than upsert to ensure we are protected from concurrent writes.
if _, err := a.UpdateCertAuthority(ctx, updated); err != nil {
return trace.Wrap(err)
}

Expand Down Expand Up @@ -329,7 +327,7 @@ func (a *Server) autoRotate(ctx context.Context, ca types.CertAuthority) error {
if err != nil {
return trace.Wrap(err)
}
if err := a.CompareAndSwapCertAuthority(rotated, ca); err != nil {
if _, err := a.UpdateCertAuthority(ctx, rotated); err != nil {
return trace.Wrap(err)
}
logger.Infof("Cert authority rotation request is completed")
Expand Down
2 changes: 1 addition & 1 deletion lib/auth/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4025,7 +4025,7 @@ func TestEvents(t *testing.T) {
LocalConfigS: testSrv.Auth(),
EventsS: clt,
PresenceS: clt,
CAS: clt,
CAS: testSrv.Auth(),
ProvisioningS: clt,
Access: clt,
UsersS: clt,
Expand Down
6 changes: 3 additions & 3 deletions lib/auth/trust/trustv1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type authServer interface {
type ServiceConfig struct {
Authorizer authz.Authorizer
Cache services.AuthorityGetter
Backend services.Trust
Backend services.TrustInternal
Logger *logrus.Entry
AuthServer authServer
}
Expand All @@ -61,7 +61,7 @@ type Service struct {
trustpb.UnimplementedTrustServiceServer
authorizer authz.Authorizer
cache services.AuthorityGetter
backend services.Trust
backend services.TrustInternal
authServer authServer
logger *logrus.Entry
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (s *Service) RotateExternalCertAuthority(ctx context.Context, req *trustpb.

// use compare and swap to protect from concurrent updates
// by trusted cluster API
if err := s.backend.CompareAndSwapCertAuthority(updated, existing); err != nil {
if _, err := s.backend.UpdateCertAuthority(ctx, updated); err != nil {
return nil, trace.Wrap(err)
}

Expand Down
7 changes: 6 additions & 1 deletion lib/auth/trust/trustv1/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,12 @@ func TestDeleteCertAuthority(t *testing.T) {
Domain: "unknown",
},
assertion: func(t *testing.T, err error) {
require.True(t, trace.IsNotFound(err))
// ca deletion doesn't generate not found errors. this is a quirk of
// the fact that deleting active and inactive CAs simultanesouly
// is difficult to do conditionally without introducing odd edge
// cases (e.g. having a delete fail while appearing to succeed if it
// races with a concurrent activation/deactivation).
require.NoError(t, err)
},
},
{
Expand Down
113 changes: 56 additions & 57 deletions lib/auth/trustedcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (a *Server) UpsertTrustedCluster(ctx context.Context, trustedCluster types.
}
log.Debugf("Enabling existing Trusted Cluster relationship.")

if err := a.activateCertAuthority(trustedCluster); err != nil {
if err := a.activateCertAuthority(ctx, trustedCluster); err != nil {
if trace.IsNotFound(err) {
return nil, trace.BadParameter("enable only supported for Trusted Clusters created with Teleport 2.3 and above")
}
Expand All @@ -116,7 +116,7 @@ func (a *Server) UpsertTrustedCluster(ctx context.Context, trustedCluster types.
}
log.Debugf("Disabling existing Trusted Cluster relationship.")

if err := a.deactivateCertAuthority(trustedCluster); err != nil {
if err := a.deactivateCertAuthority(ctx, trustedCluster); err != nil {
if trace.IsNotFound(err) {
return nil, trace.BadParameter("enable only supported for Trusted Clusters created with Teleport 2.3 and above")
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func (a *Server) UpsertTrustedCluster(ctx context.Context, trustedCluster types.
return nil, trace.Wrap(err)
}

if err := a.deactivateCertAuthority(trustedCluster); err != nil {
if err := a.deactivateCertAuthority(ctx, trustedCluster); err != nil {
return nil, trace.Wrap(err)
}
}
Expand Down Expand Up @@ -221,13 +221,17 @@ func (a *Server) DeleteTrustedCluster(ctx context.Context, name string) error {
return trace.BadParameter("trusted cluster %q is the name of this root cluster and cannot be removed.", name)
}

// Remove all CAs
for _, caType := range []types.CertAuthType{types.HostCA, types.UserCA, types.DatabaseCA, types.OpenSSHCA} {
if err := a.DeleteCertAuthority(ctx, types.CertAuthID{Type: caType, DomainName: name}); err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}
}
// err on the safe side and delete all possible CA types.
var ids []types.CertAuthID
for _, caType := range types.CertAuthTypes {
ids = append(ids, types.CertAuthID{
Type: caType,
DomainName: name,
})
}

if err := a.DeleteCertAuthorities(ctx, ids...); err != nil {
return trace.Wrap(err)
}

if err := a.DeleteReverseTunnel(name); err != nil {
Expand Down Expand Up @@ -336,52 +340,39 @@ func (a *Server) addCertAuthorities(ctx context.Context, trustedCluster types.Tr
}
remoteCertAuthority.SetRoleMap(trustedCluster.GetRoleMap())
}

// we use create here instead of upsert to prevent people from wiping out
// their own ca if it has the same name as the remote ca
err := a.CreateCertAuthority(ctx, remoteCertAuthority)
if err != nil {
return trace.Wrap(err)
}
}

return nil
// we use create here instead of upsert to prevent people from wiping out
// their own ca if it has the same name as the remote ca
_, err := a.CreateCertAuthorities(ctx, remoteCAs...)
return trace.Wrap(err)
}

// DeleteRemoteCluster deletes remote cluster resource, all certificate authorities
// associated with it
func (a *Server) DeleteRemoteCluster(ctx context.Context, clusterName string) error {
// To make sure remote cluster exists - to protect against random
// clusterName requests (e.g. when clusterName is set to local cluster name)
_, err := a.GetRemoteCluster(clusterName)
if err != nil {
if _, err := a.GetRemoteCluster(clusterName); err != nil {
return trace.Wrap(err)
}
// delete cert authorities associated with the cluster
err = a.DeleteCertAuthority(ctx, types.CertAuthID{
Type: types.HostCA,
DomainName: clusterName,
})
if err != nil {
// this method could have succeeded on the first call,
// but then if the remote cluster resource could not be deleted
// it would be impossible to delete the cluster after then
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}

// we only expect host CAs to be present for remote clusters, but it doesn't hurt
// to err on the side of paranoia and delete all CA types.
var ids []types.CertAuthID
for _, caType := range types.CertAuthTypes {
ids = append(ids, types.CertAuthID{
Type: caType,
DomainName: clusterName,
})
}
// there should be no User CA in trusted clusters on the main cluster side
// per standard automation but clean up just in case
err = a.DeleteCertAuthority(ctx, types.CertAuthID{
Type: types.UserCA,
DomainName: clusterName,
})
if err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}

// delete cert authorities associated with the cluster
if err := a.DeleteCertAuthorities(ctx, ids...); err != nil {
return trace.Wrap(err)
}
return a.Services.DeleteRemoteCluster(ctx, clusterName)

return trace.Wrap(a.Services.DeleteRemoteCluster(ctx, clusterName))
}

// GetRemoteCluster returns remote cluster by name
Expand Down Expand Up @@ -766,24 +757,32 @@ func (v *ValidateTrustedClusterResponseRaw) ToNative() (*ValidateTrustedClusterR

// activateCertAuthority will activate both the user and host certificate
// authority given in the services.TrustedCluster resource.
func (a *Server) activateCertAuthority(t types.TrustedCluster) error {
err := a.ActivateCertAuthority(types.CertAuthID{Type: types.UserCA, DomainName: t.GetName()})
if err != nil {
return trace.Wrap(err)
}

return trace.Wrap(a.ActivateCertAuthority(types.CertAuthID{Type: types.HostCA, DomainName: t.GetName()}))
func (a *Server) activateCertAuthority(ctx context.Context, t types.TrustedCluster) error {
return trace.Wrap(a.ActivateCertAuthorities(ctx, []types.CertAuthID{
{
Type: types.UserCA,
DomainName: t.GetName(),
},
{
Type: types.HostCA,
DomainName: t.GetName(),
},
}...))
}

// deactivateCertAuthority will deactivate both the user and host certificate
// authority given in the services.TrustedCluster resource.
func (a *Server) deactivateCertAuthority(t types.TrustedCluster) error {
err := a.DeactivateCertAuthority(types.CertAuthID{Type: types.UserCA, DomainName: t.GetName()})
if err != nil {
return trace.Wrap(err)
}

return trace.Wrap(a.DeactivateCertAuthority(types.CertAuthID{Type: types.HostCA, DomainName: t.GetName()}))
func (a *Server) deactivateCertAuthority(ctx context.Context, t types.TrustedCluster) error {
return trace.Wrap(a.DeactivateCertAuthorities(ctx, []types.CertAuthID{
{
Type: types.UserCA,
DomainName: t.GetName(),
},
{
Type: types.HostCA,
DomainName: t.GetName(),
},
}...))
}

// createReverseTunnel will create a services.ReverseTunnel givenin the
Expand Down
3 changes: 0 additions & 3 deletions lib/backend/memory/atomicwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ func (m *Memory) AtomicWrite(ctx context.Context, condacts []backend.Conditional

m.Lock()
defer m.Unlock()
if m.Mirror {
return "", trace.Errorf("atomic write not supported by mirror-mode memory backend")
}

m.removeExpired()

Expand Down
2 changes: 1 addition & 1 deletion lib/service/servicecfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type Config struct {
PIDFile string

// Trust is a service that manages certificate authorities
Trust services.Trust
Trust services.TrustInternal

// Presence service is a discovery and heartbeat tracker
Presence services.PresenceInternal
Expand Down
Loading

0 comments on commit 6555e2a

Please sign in to comment.