From 2c21d4ff56b335fdb236023d9a3add327cbad371 Mon Sep 17 00:00:00 2001 From: Gavin Frazar Date: Mon, 1 Jul 2024 18:04:41 -0700 Subject: [PATCH] fix redshift auto user deadlocking (#43335) --- e2e/aws/redshift_test.go | 21 +++- go.sum | 4 +- lib/srv/db/common/autousers.go | 3 + lib/srv/db/postgres/engine.go | 2 +- .../postgres/sql/redshift-deactivate-user.sql | 8 +- lib/srv/db/postgres/users.go | 112 +++++++++++++++--- 6 files changed, 124 insertions(+), 26 deletions(-) diff --git a/e2e/aws/redshift_test.go b/e2e/aws/redshift_test.go index 467a2d9966265..0b73dd4715c76 100644 --- a/e2e/aws/redshift_test.go +++ b/e2e/aws/redshift_test.go @@ -90,6 +90,15 @@ func testRedshiftCluster(t *testing.T) { waitForDatabases(t, cluster.Process, redshiftDBName) db, err := cluster.Process.GetAuthServer().GetDatabase(ctx, redshiftDBName) require.NoError(t, err) + // make sure the cloud tag is imported as a label and sets the admin + _ = mustGetDBAdmin(t, db) + // but then ignore the tag and use a randomized admin name with a randomized + // schema in its search_path to prevent tests from interfering with + // eachother. + labels := db.GetStaticLabels() + labels[types.DatabaseAdminLabel] = "test_admin_" + randASCII(t, 6) + cluster.Process.GetAuthServer().UpdateDatabase(ctx, db) + require.NoError(t, err) adminUser := mustGetDBAdmin(t, db) conn := connectAsRedshiftClusterAdmin(t, ctx, db.GetAWS().Redshift.ClusterID) @@ -97,9 +106,14 @@ func testRedshiftCluster(t *testing.T) { // create a new schema with tables that can only be accessed if the // auto roles are granted by Teleport automatically. - testSchema := "test_" + randASCII(t, 4) + testSchema := "test_" + randASCII(t, 8) _, err = conn.Exec(ctx, fmt.Sprintf("CREATE SCHEMA %q", testSchema)) require.NoError(t, err) + // now the admin will install its procedures in the test schema. + _, err = conn.Exec(ctx, fmt.Sprintf(`ALTER USER %q SET SEARCH_PATH = %q`, adminUser.Name, testSchema)) + require.NoError(t, err) + _, err = conn.Exec(ctx, fmt.Sprintf("GRANT USAGE,CREATE ON SCHEMA %q TO %q", testSchema, adminUser.Name)) + require.NoError(t, err) t.Cleanup(func() { // users/roles can only be dropped after we drop the schema+table. // So, rather than juggling the order of drops, just attempt to drop @@ -107,10 +121,11 @@ func testRedshiftCluster(t *testing.T) { // actually created successfully. for _, stmt := range []string{ fmt.Sprintf("DROP SCHEMA %q CASCADE", testSchema), - fmt.Sprintf("DROP ROLE %q", autoRole1), - fmt.Sprintf("DROP ROLE %q", autoRole2), fmt.Sprintf("DROP USER IF EXISTS %q", autoUserKeep), fmt.Sprintf("DROP USER IF EXISTS %q", autoUserDrop), + fmt.Sprintf("DROP ROLE %q", autoRole1), + fmt.Sprintf("DROP ROLE %q", autoRole2), + fmt.Sprintf("DROP USER IF EXISTS %q", adminUser.Name), } { _, err := conn.Exec(ctx, stmt) assert.NoError(t, err, "test cleanup failed, stmt=%q", stmt) diff --git a/go.sum b/go.sum index 926fc4c5fd2e5..dbccfc4e3d29e 100644 --- a/go.sum +++ b/go.sum @@ -297,10 +297,10 @@ github.com/aws/aws-sdk-go-v2/service/kms v1.30.0 h1:yS0JkEdV6h9JOo8sy2JSpjX+i7vs github.com/aws/aws-sdk-go-v2/service/kms v1.30.0/go.mod h1:+I8VUUSVD4p5ISQtzpgSva4I8cJ4SQ4b1dcBcof7O+g= github.com/aws/aws-sdk-go-v2/service/rds v1.66.2 h1:2DwZGc7FM7swBDbkPlOhRJ5WolNYkIu+/ToEFK+rLmA= github.com/aws/aws-sdk-go-v2/service/rds v1.66.2/go.mod h1:N/ijzTwR4cOG2P8Kvos/QOCetpDTtconhvDOheqnrTw= -github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4 h1:lW5xUzOPGAMY7HPuNF4FdyBwRc3UJ/e8KsapbesVeNU= -github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk= github.com/aws/aws-sdk-go-v2/service/redshift v1.44.0 h1:j18lTPPqe+qlapn1R8//+ujvXdplku8V41xzBNNLtn0= github.com/aws/aws-sdk-go-v2/service/redshift v1.44.0/go.mod h1:8ldsMsikORLj0GZUozSvbQdctrumAPYhizmj/3AAATI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4 h1:lW5xUzOPGAMY7HPuNF4FdyBwRc3UJ/e8KsapbesVeNU= +github.com/aws/aws-sdk-go-v2/service/s3 v1.51.4/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk= github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6 h1:TIOEjw0i2yyhmhRry3Oeu9YtiiHWISZ6j/irS1W3gX4= github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6/go.mod h1:3Ba++UwWd154xtP4FRX5pUK3Gt4up5sDHCve6kVfE+g= github.com/aws/aws-sdk-go-v2/service/sns v1.26.7 h1:DylmW2c1Z7qGxN3Y02k+voPbtM1mh7Rp+gV+7maG5io= diff --git a/lib/srv/db/common/autousers.go b/lib/srv/db/common/autousers.go index 71bc0ccc62f35..115f7854f9d55 100644 --- a/lib/srv/db/common/autousers.go +++ b/lib/srv/db/common/autousers.go @@ -77,6 +77,7 @@ func (a *UserProvisioner) Activate(ctx context.Context, sessionCtx *Session) (fu retryCtx, cancel := context.WithTimeout(ctx, defaults.DatabaseConnectTimeout) defer cancel() + a.Log.WithField("user", sessionCtx.DatabaseUser).Debug("Activating database user") lease, err := services.AcquireSemaphoreWithRetry(retryCtx, a.makeAcquireSemaphoreConfig(sessionCtx)) if err != nil { return nil, trace.Wrap(err) @@ -119,6 +120,7 @@ func (a *UserProvisioner) Teardown(ctx context.Context, sessionCtx *Session) err func (a *UserProvisioner) deactivate(ctx context.Context, sessionCtx *Session) error { // Observe. defer methodCallMetrics("UserProvisioner:Deactivate", teleport.ComponentDatabase, sessionCtx.Database)() + a.Log.WithField("user", sessionCtx.DatabaseUser).Debug("Deactivating database user") retryCtx, cancel := context.WithTimeout(ctx, defaults.DatabaseConnectTimeout) defer cancel() @@ -147,6 +149,7 @@ func (a *UserProvisioner) deactivate(ctx context.Context, sessionCtx *Session) e func (a *UserProvisioner) delete(ctx context.Context, sessionCtx *Session) error { // Observe. defer methodCallMetrics("UserProvisioner:Delete", teleport.ComponentDatabase, sessionCtx.Database)() + a.Log.WithField("user", sessionCtx.DatabaseUser).Debug("Deleting database user") retryCtx, cancel := context.WithTimeout(ctx, defaults.DatabaseConnectTimeout) defer cancel() diff --git a/lib/srv/db/postgres/engine.go b/lib/srv/db/postgres/engine.go index 796c21e5b3343..76aae6d72d5d6 100644 --- a/lib/srv/db/postgres/engine.go +++ b/lib/srv/db/postgres/engine.go @@ -139,7 +139,7 @@ func (e *Engine) HandleConnection(ctx context.Context, sessionCtx *common.Sessio defer func() { err := e.GetUserProvisioner(e).Teardown(ctx, sessionCtx) if err != nil { - e.Log.WithError(err).Error("Failed to teardown auto user.") + e.Log.WithError(err).WithField("user", sessionCtx.DatabaseUser).Error("Failed to teardown auto user.") } }() // This is where we connect to the actual Postgres database. diff --git a/lib/srv/db/postgres/sql/redshift-deactivate-user.sql b/lib/srv/db/postgres/sql/redshift-deactivate-user.sql index 67658626e97b7..8c738f401d0d9 100644 --- a/lib/srv/db/postgres/sql/redshift-deactivate-user.sql +++ b/lib/srv/db/postgres/sql/redshift-deactivate-user.sql @@ -11,11 +11,15 @@ BEGIN IF EXISTS (SELECT user_name FROM stv_sessions WHERE user_name = CONCAT('IAM:', username)) THEN RAISE EXCEPTION 'TP000: User has active connections'; ELSE + -- Disable ability to login for the user. + -- We do this before revoking roles so that the pg_shadow table + -- (oid 1260) lock is acquired before the pg_role (oid 4775) and + -- pg_identity (oid 4771) table locks, so that the locks are acquired in + -- the same order in the activate/deactivate/delete procedures. + EXECUTE 'ALTER USER ' || QUOTE_IDENT(username) || ' WITH CONNECTION LIMIT 0'; -- Revoke all role memberships except teleport-auto-user. FOR rec IN select role_name FROM svv_user_grants WHERE user_name = username AND admin_option = false AND role_name != 'teleport-auto-user' LOOP EXECUTE 'REVOKE ROLE ' || QUOTE_IDENT(rec.role_name) || ' FROM ' || QUOTE_IDENT(username); END LOOP; - -- Disable ability to login for the user. - EXECUTE 'ALTER USER ' || QUOTE_IDENT(username) || ' WITH CONNECTION LIMIT 0'; END IF; END;$$; diff --git a/lib/srv/db/postgres/users.go b/lib/srv/db/postgres/users.go index a8d80a3bbe0d8..a96e60e05c6dc 100644 --- a/lib/srv/db/postgres/users.go +++ b/lib/srv/db/postgres/users.go @@ -25,13 +25,18 @@ import ( "errors" "fmt" "strings" + "time" "github.com/gravitational/trace" + "github.com/jackc/pgconn" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v4" "github.com/lib/pq" + "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/types" apiawsutils "github.com/gravitational/teleport/api/utils/aws" + "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/srv/db/common" "github.com/gravitational/teleport/lib/srv/db/common/databaseobjectimportrule" "github.com/gravitational/teleport/lib/srv/db/common/permissions" @@ -70,7 +75,10 @@ func (e *Engine) ActivateUser(ctx context.Context, sessionCtx *common.Session) e // We could call this once when the database is being initialized but // doing it here has a nice "self-healing" property in case the Teleport // bookkeeping group or stored procedures get deleted or changed offband. - err = e.initAutoUsers(ctx, sessionCtx, conn) + logger := e.Log.WithField("user", sessionCtx.DatabaseUser) + err = withRetry(ctx, logger, func() error { + return trace.Wrap(e.initAutoUsers(ctx, sessionCtx, conn)) + }) if err != nil { return trace.Wrap(err) } @@ -80,10 +88,13 @@ func (e *Engine) ActivateUser(ctx context.Context, sessionCtx *common.Session) e return trace.Wrap(err) } - e.Log.WithField("user", sessionCtx.DatabaseUser).WithField("roles", roles).Info("Activating PostgreSQL user") - _, err = conn.Exec(ctx, activateQuery, sessionCtx.DatabaseUser, roles) + logger.WithField("roles", roles).Info("Activating PostgreSQL user") + err = withRetry(ctx, logger, func() error { + _, err = conn.Exec(ctx, activateQuery, sessionCtx.DatabaseUser, roles) + return trace.Wrap(err) + }) if err != nil { - e.Log.WithError(err).Debug("Call teleport_activate_user failed.") + logger.WithError(err).Debug("Call teleport_activate_user failed.") errOut := convertActivateError(sessionCtx, err) e.Audit.OnDatabaseUserCreate(ctx, sessionCtx, errOut) return trace.Wrap(errOut) @@ -99,7 +110,7 @@ func (e *Engine) ActivateUser(ctx context.Context, sessionCtx *common.Session) e err = e.applyPermissions(ctx, sessionCtx) if err != nil { - e.Log.WithError(err).Warn("Failed to apply permissions.") + logger.WithError(err).Warn("Failed to apply permissions.") return trace.Wrap(err) } return nil @@ -302,9 +313,12 @@ func (e *Engine) DeactivateUser(ctx context.Context, sessionCtx *common.Session) } defer conn.Close(ctx) - e.Log.WithField("user", sessionCtx.DatabaseUser).Info("Deactivating PostgreSQL user.") - - _, err = conn.Exec(ctx, deactivateQuery, sessionCtx.DatabaseUser) + logger := e.Log.WithField("user", sessionCtx.DatabaseUser) + logger.Info("Deactivating PostgreSQL user.") + err = withRetry(ctx, logger, func() error { + _, err = conn.Exec(ctx, deactivateQuery, sessionCtx.DatabaseUser) + return trace.Wrap(err) + }) if err != nil { e.Audit.OnDatabaseUserDeactivate(ctx, sessionCtx, false, err) return trace.NewAggregate(errRemove, trace.Wrap(err)) @@ -329,15 +343,18 @@ func (e *Engine) DeleteUser(ctx context.Context, sessionCtx *common.Session) err } defer conn.Close(ctx) - e.Log.WithField("user", sessionCtx.DatabaseUser).Info("Deleting PostgreSQL user.") + logger := e.Log.WithField("user", sessionCtx.DatabaseUser) + logger.Info("Deleting PostgreSQL user.") var state string - switch { - case sessionCtx.Database.IsRedshift(): - err = e.deleteUserRedshift(ctx, sessionCtx, conn, &state) - default: - err = conn.QueryRow(ctx, deleteQuery, sessionCtx.DatabaseUser).Scan(&state) - } + err = withRetry(ctx, logger, func() error { + switch { + case sessionCtx.Database.IsRedshift(): + return trace.Wrap(e.deleteUserRedshift(ctx, sessionCtx, conn, &state)) + default: + return trace.Wrap(conn.QueryRow(ctx, deleteQuery, sessionCtx.DatabaseUser).Scan(&state)) + } + }) if err != nil { return trace.NewAggregate(errRemove, trace.Wrap(err)) } @@ -345,12 +362,12 @@ func (e *Engine) DeleteUser(ctx context.Context, sessionCtx *common.Session) err deleted := true switch state { case common.SQLStateUserDropped: - e.Log.WithField("user", sessionCtx.DatabaseUser).Debug("User deleted successfully.") + logger.Debug("User deleted successfully.") case common.SQLStateUserDeactivated: deleted = false - e.Log.WithField("user", sessionCtx.DatabaseUser).Info("Unable to delete user, it was disabled instead.") + logger.Info("Unable to delete user, it was disabled instead.") default: - e.Log.WithField("user", sessionCtx.DatabaseUser).Warn("Unable to determine user deletion state.") + logger.Warn("Unable to determine user deletion state.") } e.Audit.OnDatabaseUserDeactivate(ctx, sessionCtx, deleted, nil) @@ -532,3 +549,62 @@ var ( removePermissionsProcName: removePermissionsProc, } ) + +// withRetry is a helper for auto user operations that runs a given func a +// finite number of times until it returns nil error or the given context is +// done. +func withRetry(ctx context.Context, log logrus.FieldLogger, f func() error) error { + linear, err := retryutils.NewLinear(retryutils.LinearConfig{ + // arbitrarily copied settings from retry logic in lib/backend/pgbk. + First: 0, + Step: 100 * time.Millisecond, + Max: 750 * time.Millisecond, + Jitter: retryutils.NewHalfJitter(), + }) + if err != nil { + return trace.Wrap(err) + } + + // retry a finite number of times before giving up. + for i := 0; i < 10; i++ { + err := f() + if err == nil { + return nil + } + + if isRetryable(err) { + log.WithError(err).Debug("User operation failed, retrying") + } else { + return trace.Wrap(err) + } + + linear.Inc() + select { + case <-linear.After(): + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + } + } + return trace.Wrap(err, "too many retries") +} + +// isRetryable returns true if an error can be retried. +func isRetryable(err error) bool { + var pgErr *pgconn.PgError + err = trace.Unwrap(err) + if errors.As(err, &pgErr) { + // https://www.postgresql.org/docs/current/mvcc-serialization-failure-handling.html + switch pgErr.Code { + case pgerrcode.DeadlockDetected, pgerrcode.SerializationFailure, + pgerrcode.UniqueViolation, pgerrcode.ExclusionViolation: + return true + } + } + // Redshift reports this with a vague SQLSTATE XX000, which is the internal + // error code, but this is a serialization error that rolls back the + // transaction, so it should be retried. + if strings.Contains(err.Error(), "conflict with concurrent transaction") { + return true + } + return pgconn.SafeToRetry(err) +}