Skip to content

Commit

Permalink
fix redshift auto user deadlocking (#43335)
Browse files Browse the repository at this point in the history
  • Loading branch information
GavinFrazar committed Jul 9, 2024
1 parent 46a6c5c commit 2c21d4f
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 26 deletions.
21 changes: 18 additions & 3 deletions e2e/aws/redshift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,27 +90,42 @@ func testRedshiftCluster(t *testing.T) {
waitForDatabases(t, cluster.Process, redshiftDBName)
db, err := cluster.Process.GetAuthServer().GetDatabase(ctx, redshiftDBName)
require.NoError(t, err)
// make sure the cloud tag is imported as a label and sets the admin
_ = mustGetDBAdmin(t, db)
// but then ignore the tag and use a randomized admin name with a randomized
// schema in its search_path to prevent tests from interfering with
// eachother.
labels := db.GetStaticLabels()
labels[types.DatabaseAdminLabel] = "test_admin_" + randASCII(t, 6)
cluster.Process.GetAuthServer().UpdateDatabase(ctx, db)
require.NoError(t, err)
adminUser := mustGetDBAdmin(t, db)

conn := connectAsRedshiftClusterAdmin(t, ctx, db.GetAWS().Redshift.ClusterID)
provisionRedshiftAutoUsersAdmin(t, ctx, conn, adminUser.Name)

// create a new schema with tables that can only be accessed if the
// auto roles are granted by Teleport automatically.
testSchema := "test_" + randASCII(t, 4)
testSchema := "test_" + randASCII(t, 8)
_, err = conn.Exec(ctx, fmt.Sprintf("CREATE SCHEMA %q", testSchema))
require.NoError(t, err)
// now the admin will install its procedures in the test schema.
_, err = conn.Exec(ctx, fmt.Sprintf(`ALTER USER %q SET SEARCH_PATH = %q`, adminUser.Name, testSchema))
require.NoError(t, err)
_, err = conn.Exec(ctx, fmt.Sprintf("GRANT USAGE,CREATE ON SCHEMA %q TO %q", testSchema, adminUser.Name))
require.NoError(t, err)
t.Cleanup(func() {
// users/roles can only be dropped after we drop the schema+table.
// So, rather than juggling the order of drops, just attempt to drop
// everything as part of test cleanup, regardless of what the test
// actually created successfully.
for _, stmt := range []string{
fmt.Sprintf("DROP SCHEMA %q CASCADE", testSchema),
fmt.Sprintf("DROP ROLE %q", autoRole1),
fmt.Sprintf("DROP ROLE %q", autoRole2),
fmt.Sprintf("DROP USER IF EXISTS %q", autoUserKeep),
fmt.Sprintf("DROP USER IF EXISTS %q", autoUserDrop),
fmt.Sprintf("DROP ROLE %q", autoRole1),
fmt.Sprintf("DROP ROLE %q", autoRole2),
fmt.Sprintf("DROP USER IF EXISTS %q", adminUser.Name),
} {
_, err := conn.Exec(ctx, stmt)
assert.NoError(t, err, "test cleanup failed, stmt=%q", stmt)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,10 @@ github.com/aws/aws-sdk-go-v2/service/kms v1.30.0 h1:yS0JkEdV6h9JOo8sy2JSpjX+i7vs
github.com/aws/aws-sdk-go-v2/service/kms v1.30.0/go.mod h1:+I8VUUSVD4p5ISQtzpgSva4I8cJ4SQ4b1dcBcof7O+g=
github.com/aws/aws-sdk-go-v2/service/rds v1.66.2 h1:2DwZGc7FM7swBDbkPlOhRJ5WolNYkIu+/ToEFK+rLmA=
github.com/aws/aws-sdk-go-v2/service/rds v1.66.2/go.mod h1:N/ijzTwR4cOG2P8Kvos/QOCetpDTtconhvDOheqnrTw=
github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4 h1:lW5xUzOPGAMY7HPuNF4FdyBwRc3UJ/e8KsapbesVeNU=
github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk=
github.com/aws/aws-sdk-go-v2/service/redshift v1.44.0 h1:j18lTPPqe+qlapn1R8//+ujvXdplku8V41xzBNNLtn0=
github.com/aws/aws-sdk-go-v2/service/redshift v1.44.0/go.mod h1:8ldsMsikORLj0GZUozSvbQdctrumAPYhizmj/3AAATI=
github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4 h1:lW5xUzOPGAMY7HPuNF4FdyBwRc3UJ/e8KsapbesVeNU=
github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6 h1:TIOEjw0i2yyhmhRry3Oeu9YtiiHWISZ6j/irS1W3gX4=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6/go.mod h1:3Ba++UwWd154xtP4FRX5pUK3Gt4up5sDHCve6kVfE+g=
github.com/aws/aws-sdk-go-v2/service/sns v1.26.7 h1:DylmW2c1Z7qGxN3Y02k+voPbtM1mh7Rp+gV+7maG5io=
Expand Down
3 changes: 3 additions & 0 deletions lib/srv/db/common/autousers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (a *UserProvisioner) Activate(ctx context.Context, sessionCtx *Session) (fu
retryCtx, cancel := context.WithTimeout(ctx, defaults.DatabaseConnectTimeout)
defer cancel()

a.Log.WithField("user", sessionCtx.DatabaseUser).Debug("Activating database user")
lease, err := services.AcquireSemaphoreWithRetry(retryCtx, a.makeAcquireSemaphoreConfig(sessionCtx))
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -119,6 +120,7 @@ func (a *UserProvisioner) Teardown(ctx context.Context, sessionCtx *Session) err
func (a *UserProvisioner) deactivate(ctx context.Context, sessionCtx *Session) error {
// Observe.
defer methodCallMetrics("UserProvisioner:Deactivate", teleport.ComponentDatabase, sessionCtx.Database)()
a.Log.WithField("user", sessionCtx.DatabaseUser).Debug("Deactivating database user")

retryCtx, cancel := context.WithTimeout(ctx, defaults.DatabaseConnectTimeout)
defer cancel()
Expand Down Expand Up @@ -147,6 +149,7 @@ func (a *UserProvisioner) deactivate(ctx context.Context, sessionCtx *Session) e
func (a *UserProvisioner) delete(ctx context.Context, sessionCtx *Session) error {
// Observe.
defer methodCallMetrics("UserProvisioner:Delete", teleport.ComponentDatabase, sessionCtx.Database)()
a.Log.WithField("user", sessionCtx.DatabaseUser).Debug("Deleting database user")

retryCtx, cancel := context.WithTimeout(ctx, defaults.DatabaseConnectTimeout)
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion lib/srv/db/postgres/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (e *Engine) HandleConnection(ctx context.Context, sessionCtx *common.Sessio
defer func() {
err := e.GetUserProvisioner(e).Teardown(ctx, sessionCtx)
if err != nil {
e.Log.WithError(err).Error("Failed to teardown auto user.")
e.Log.WithError(err).WithField("user", sessionCtx.DatabaseUser).Error("Failed to teardown auto user.")
}
}()
// This is where we connect to the actual Postgres database.
Expand Down
8 changes: 6 additions & 2 deletions lib/srv/db/postgres/sql/redshift-deactivate-user.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ BEGIN
IF EXISTS (SELECT user_name FROM stv_sessions WHERE user_name = CONCAT('IAM:', username)) THEN
RAISE EXCEPTION 'TP000: User has active connections';
ELSE
-- Disable ability to login for the user.
-- We do this before revoking roles so that the pg_shadow table
-- (oid 1260) lock is acquired before the pg_role (oid 4775) and
-- pg_identity (oid 4771) table locks, so that the locks are acquired in
-- the same order in the activate/deactivate/delete procedures.
EXECUTE 'ALTER USER ' || QUOTE_IDENT(username) || ' WITH CONNECTION LIMIT 0';
-- Revoke all role memberships except teleport-auto-user.
FOR rec IN select role_name FROM svv_user_grants WHERE user_name = username AND admin_option = false AND role_name != 'teleport-auto-user' LOOP
EXECUTE 'REVOKE ROLE ' || QUOTE_IDENT(rec.role_name) || ' FROM ' || QUOTE_IDENT(username);
END LOOP;
-- Disable ability to login for the user.
EXECUTE 'ALTER USER ' || QUOTE_IDENT(username) || ' WITH CONNECTION LIMIT 0';
END IF;
END;$$;
112 changes: 94 additions & 18 deletions lib/srv/db/postgres/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/gravitational/trace"
"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v4"
"github.com/lib/pq"
"github.com/sirupsen/logrus"

"github.com/gravitational/teleport/api/types"
apiawsutils "github.com/gravitational/teleport/api/utils/aws"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/srv/db/common"
"github.com/gravitational/teleport/lib/srv/db/common/databaseobjectimportrule"
"github.com/gravitational/teleport/lib/srv/db/common/permissions"
Expand Down Expand Up @@ -70,7 +75,10 @@ func (e *Engine) ActivateUser(ctx context.Context, sessionCtx *common.Session) e
// We could call this once when the database is being initialized but
// doing it here has a nice "self-healing" property in case the Teleport
// bookkeeping group or stored procedures get deleted or changed offband.
err = e.initAutoUsers(ctx, sessionCtx, conn)
logger := e.Log.WithField("user", sessionCtx.DatabaseUser)
err = withRetry(ctx, logger, func() error {
return trace.Wrap(e.initAutoUsers(ctx, sessionCtx, conn))
})
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -80,10 +88,13 @@ func (e *Engine) ActivateUser(ctx context.Context, sessionCtx *common.Session) e
return trace.Wrap(err)
}

e.Log.WithField("user", sessionCtx.DatabaseUser).WithField("roles", roles).Info("Activating PostgreSQL user")
_, err = conn.Exec(ctx, activateQuery, sessionCtx.DatabaseUser, roles)
logger.WithField("roles", roles).Info("Activating PostgreSQL user")
err = withRetry(ctx, logger, func() error {
_, err = conn.Exec(ctx, activateQuery, sessionCtx.DatabaseUser, roles)
return trace.Wrap(err)
})
if err != nil {
e.Log.WithError(err).Debug("Call teleport_activate_user failed.")
logger.WithError(err).Debug("Call teleport_activate_user failed.")
errOut := convertActivateError(sessionCtx, err)
e.Audit.OnDatabaseUserCreate(ctx, sessionCtx, errOut)
return trace.Wrap(errOut)
Expand All @@ -99,7 +110,7 @@ func (e *Engine) ActivateUser(ctx context.Context, sessionCtx *common.Session) e

err = e.applyPermissions(ctx, sessionCtx)
if err != nil {
e.Log.WithError(err).Warn("Failed to apply permissions.")
logger.WithError(err).Warn("Failed to apply permissions.")
return trace.Wrap(err)
}
return nil
Expand Down Expand Up @@ -302,9 +313,12 @@ func (e *Engine) DeactivateUser(ctx context.Context, sessionCtx *common.Session)
}
defer conn.Close(ctx)

e.Log.WithField("user", sessionCtx.DatabaseUser).Info("Deactivating PostgreSQL user.")

_, err = conn.Exec(ctx, deactivateQuery, sessionCtx.DatabaseUser)
logger := e.Log.WithField("user", sessionCtx.DatabaseUser)
logger.Info("Deactivating PostgreSQL user.")
err = withRetry(ctx, logger, func() error {
_, err = conn.Exec(ctx, deactivateQuery, sessionCtx.DatabaseUser)
return trace.Wrap(err)
})
if err != nil {
e.Audit.OnDatabaseUserDeactivate(ctx, sessionCtx, false, err)
return trace.NewAggregate(errRemove, trace.Wrap(err))
Expand All @@ -329,28 +343,31 @@ func (e *Engine) DeleteUser(ctx context.Context, sessionCtx *common.Session) err
}
defer conn.Close(ctx)

e.Log.WithField("user", sessionCtx.DatabaseUser).Info("Deleting PostgreSQL user.")
logger := e.Log.WithField("user", sessionCtx.DatabaseUser)
logger.Info("Deleting PostgreSQL user.")

var state string
switch {
case sessionCtx.Database.IsRedshift():
err = e.deleteUserRedshift(ctx, sessionCtx, conn, &state)
default:
err = conn.QueryRow(ctx, deleteQuery, sessionCtx.DatabaseUser).Scan(&state)
}
err = withRetry(ctx, logger, func() error {
switch {
case sessionCtx.Database.IsRedshift():
return trace.Wrap(e.deleteUserRedshift(ctx, sessionCtx, conn, &state))
default:
return trace.Wrap(conn.QueryRow(ctx, deleteQuery, sessionCtx.DatabaseUser).Scan(&state))
}
})
if err != nil {
return trace.NewAggregate(errRemove, trace.Wrap(err))
}

deleted := true
switch state {
case common.SQLStateUserDropped:
e.Log.WithField("user", sessionCtx.DatabaseUser).Debug("User deleted successfully.")
logger.Debug("User deleted successfully.")
case common.SQLStateUserDeactivated:
deleted = false
e.Log.WithField("user", sessionCtx.DatabaseUser).Info("Unable to delete user, it was disabled instead.")
logger.Info("Unable to delete user, it was disabled instead.")
default:
e.Log.WithField("user", sessionCtx.DatabaseUser).Warn("Unable to determine user deletion state.")
logger.Warn("Unable to determine user deletion state.")
}
e.Audit.OnDatabaseUserDeactivate(ctx, sessionCtx, deleted, nil)

Expand Down Expand Up @@ -532,3 +549,62 @@ var (
removePermissionsProcName: removePermissionsProc,
}
)

// withRetry is a helper for auto user operations that runs a given func a
// finite number of times until it returns nil error or the given context is
// done.
func withRetry(ctx context.Context, log logrus.FieldLogger, f func() error) error {
linear, err := retryutils.NewLinear(retryutils.LinearConfig{
// arbitrarily copied settings from retry logic in lib/backend/pgbk.
First: 0,
Step: 100 * time.Millisecond,
Max: 750 * time.Millisecond,
Jitter: retryutils.NewHalfJitter(),
})
if err != nil {
return trace.Wrap(err)
}

// retry a finite number of times before giving up.
for i := 0; i < 10; i++ {
err := f()
if err == nil {
return nil
}

if isRetryable(err) {
log.WithError(err).Debug("User operation failed, retrying")
} else {
return trace.Wrap(err)
}

linear.Inc()
select {
case <-linear.After():
case <-ctx.Done():
return trace.Wrap(ctx.Err())
}
}
return trace.Wrap(err, "too many retries")
}

// isRetryable returns true if an error can be retried.
func isRetryable(err error) bool {
var pgErr *pgconn.PgError
err = trace.Unwrap(err)
if errors.As(err, &pgErr) {
// https://www.postgresql.org/docs/current/mvcc-serialization-failure-handling.html
switch pgErr.Code {
case pgerrcode.DeadlockDetected, pgerrcode.SerializationFailure,
pgerrcode.UniqueViolation, pgerrcode.ExclusionViolation:
return true
}
}
// Redshift reports this with a vague SQLSTATE XX000, which is the internal
// error code, but this is a serialization error that rolls back the
// transaction, so it should be retried.
if strings.Contains(err.Error(), "conflict with concurrent transaction") {
return true
}
return pgconn.SafeToRetry(err)
}

0 comments on commit 2c21d4f

Please sign in to comment.