Skip to content

Commit

Permalink
[crdb] Refactor cockroach package to prepare for Yugabyte compatible …
Browse files Browse the repository at this point in the history
…migration (#1133)
  • Loading branch information
barroco authored Dec 2, 2024
1 parent 664417e commit 11c42cd
Show file tree
Hide file tree
Showing 16 changed files with 368 additions and 313 deletions.
2 changes: 1 addition & 1 deletion cmds/core-service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ go run ./cmds/core-service \

#### CockroachDB cluster

To run correctly, core-service must be able to [access](../../pkg/cockroach/flags/flags.go) a CockroachDB cluster. Provision of this cluster is handled automatically for a local development environment if following [the instructions for a standalone instance](../../build/dev/standalone_instance.md). Or, a CockroachDB instance can be created manually with:
To run correctly, core-service must be able to [access](../../pkg/datastore/flags/flags.go) a CockroachDB cluster. Provision of this cluster is handled automatically for a local development environment if following [the instructions for a standalone instance](../../build/dev/standalone_instance.md). Or, a CockroachDB instance can be created manually with:

```bash
docker container run -p 26257:26257 -p 8080:8080 --rm cockroachdb/cockroach:v24.1.3 start-single-node --insecure
Expand Down
12 changes: 6 additions & 6 deletions cmds/core-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/interuss/dss/pkg/auth"
aux "github.com/interuss/dss/pkg/aux_"
"github.com/interuss/dss/pkg/build"
"github.com/interuss/dss/pkg/cockroach"
"github.com/interuss/dss/pkg/cockroach/flags" // Force command line flag registration
"github.com/interuss/dss/pkg/datastore"
"github.com/interuss/dss/pkg/datastore/flags" // Force command line flag registration
"github.com/interuss/dss/pkg/logging"
"github.com/interuss/dss/pkg/rid/application"
rid_v1 "github.com/interuss/dss/pkg/rid/server/v1"
Expand Down Expand Up @@ -65,7 +65,7 @@ const (
codeRetryable = stacktrace.ErrorCode(1)
)

func getDBStats(ctx context.Context, db *cockroach.DB, databaseName string) {
func getDBStats(ctx context.Context, db *datastore.Datastore, databaseName string) {
logger := logging.WithValuesFromContext(ctx, logging.Logger)
statsPtr := db.Pool.Stat()
stats := make(map[string]string)
Expand Down Expand Up @@ -109,7 +109,7 @@ func createKeyResolver() (auth.KeyResolver, error) {
func createRIDServers(ctx context.Context, locality string, logger *zap.Logger) (*rid_v1.Server, *rid_v2.Server, error) {
connectParameters := flags.ConnectParameters()
connectParameters.DBName = "rid"
ridCrdb, err := cockroach.Dial(ctx, connectParameters)
ridCrdb, err := datastore.Dial(ctx, connectParameters)
if err != nil {
// TODO: More robustly detect failure to create RID server is due to a problem that may be temporary
if strings.Contains(err.Error(), "connect: connection refused") {
Expand All @@ -123,7 +123,7 @@ func createRIDServers(ctx context.Context, locality string, logger *zap.Logger)
// try DBName of defaultdb for older versions.
ridCrdb.Pool.Close()
connectParameters.DBName = "defaultdb"
ridCrdb, err := cockroach.Dial(ctx, connectParameters)
ridCrdb, err := datastore.Dial(ctx, connectParameters)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "Failed to connect to remote ID database for older version <defaultdb>; verify your database configuration is current with https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas")
}
Expand Down Expand Up @@ -176,7 +176,7 @@ func createRIDServers(ctx context.Context, locality string, logger *zap.Logger)
func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, error) {
connectParameters := flags.ConnectParameters()
connectParameters.DBName = scdc.DatabaseName
scdCrdb, err := cockroach.Dial(ctx, connectParameters)
scdCrdb, err := datastore.Dial(ctx, connectParameters)
if err != nil {
return nil, stacktrace.Propagate(err, "Failed to connect to strategic conflict detection database; verify your database configuration is current with https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas")
}
Expand Down
6 changes: 3 additions & 3 deletions cmds/db-manager/cleanup/evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"log"
"time"

"github.com/interuss/dss/pkg/cockroach"
crdbflags "github.com/interuss/dss/pkg/cockroach/flags"
"github.com/interuss/dss/pkg/datastore"
crdbflags "github.com/interuss/dss/pkg/datastore/flags"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
"github.com/interuss/dss/pkg/scd/repos"
Expand Down Expand Up @@ -102,7 +102,7 @@ func getSCDStore(ctx context.Context) (*scdc.Store, error) {
connectParameters := crdbflags.ConnectParameters()
connectParameters.ApplicationName = "db-manager"
connectParameters.DBName = scdc.DatabaseName
scdCrdb, err := cockroach.Dial(ctx, connectParameters)
scdCrdb, err := datastore.Dial(ctx, connectParameters)
if err != nil {
logParams := connectParameters
logParams.Credentials.Password = "[REDACTED]"
Expand Down
43 changes: 12 additions & 31 deletions cmds/db-manager/migration/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"strings"

"github.com/coreos/go-semver/semver"
"github.com/interuss/dss/pkg/cockroach"
crdbflags "github.com/interuss/dss/pkg/cockroach/flags"
"github.com/interuss/dss/pkg/datastore"
crdbflags "github.com/interuss/dss/pkg/datastore/flags"

"github.com/interuss/stacktrace"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/net/context"
)

type MigrationStep struct {
Expand Down Expand Up @@ -78,46 +77,42 @@ func migrate(cmd *cobra.Command, _ []string) error {
connectParameters := crdbflags.ConnectParameters()
connectParameters.ApplicationName = "db-manager"
connectParameters.DBName = "postgres" // Use an initial database that is known to always be present
crdb, err := cockroach.Dial(ctx, connectParameters)
ds, err := datastore.Dial(ctx, connectParameters)
if err != nil {
return fmt.Errorf("failed to connect to database with %+v: %w", connectParameters, err)
}
defer func() {
crdb.Pool.Close()
ds.Pool.Close()
}()

crdbVersion, err := crdb.GetServerVersion()
if err != nil {
return fmt.Errorf("unable to retrieve the version of the server %s:%d: %w", connectParameters.Host, connectParameters.Port, err)
}
log.Printf("CRDB server version: %s", crdbVersion)
log.Printf("CRDB server version: %s", ds.Version.SemVer.String())

// Make sure specified database exists
exists, err := doesDatabaseExist(ctx, crdb, dbName)
exists, err := ds.DatabaseExists(ctx, dbName)
if err != nil {
return fmt.Errorf("failed to check whether database %s exists: %w", dbName, err)
}
if !exists && dbName == "rid" {
// In the special case of rid, the database was previously named defaultdb
log.Printf("Database %s does not exist; checking for older \"defaultdb\" database", dbName)
dbName = "defaultdb"
exists, err = doesDatabaseExist(ctx, crdb, dbName)
exists, err = ds.DatabaseExists(ctx, dbName)
if err != nil {
return fmt.Errorf("failed to check whether old defaultdb database exists: %w", err)
}
}
if !exists {
log.Printf("Database %s does not exist; creating now", dbName)
createDB := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbName)
if _, err := crdb.Pool.Exec(ctx, createDB); err != nil {
if _, err := ds.Pool.Exec(ctx, createDB); err != nil {
return fmt.Errorf("failed to create new database %s: %v", dbName, err)
}
} else {
log.Printf("Database %s already exists; reading current state", dbName)
}

// Read current schema version of database
currentVersion, err := crdb.GetVersion(ctx, dbName)
currentVersion, err := ds.GetSchemaVersion(ctx, dbName)
if err != nil {
return fmt.Errorf("failed to get current database version for %s: %w", dbName, err)
}
Expand Down Expand Up @@ -162,14 +157,14 @@ func migrate(cmd *cobra.Command, _ []string) error {

// Ensure SQL session has implicit transactions disabled for CRDB versions 22.2+
sessionConfigurationSQL := ""
if crdbVersion.Compare(*semver.New("22.2.0")) >= 0 {
if ds.Version.SemVer.Compare(*semver.New("22.2.0")) >= 0 {
sessionConfigurationSQL = "SET enable_implicit_transaction_for_batch_statements = false;\n"
}

migrationSQL := sessionConfigurationSQL + fmt.Sprintf("USE %s;\n", dbName) + string(rawMigrationSQL)

// Execute migration step
if _, err := crdb.Pool.Exec(ctx, migrationSQL); err != nil {
if _, err := ds.Pool.Exec(ctx, migrationSQL); err != nil {
return fmt.Errorf("failed to execute %s migration step %s: %w", dbName, fullFilePath, err)
}

Expand All @@ -182,7 +177,7 @@ func migrate(cmd *cobra.Command, _ []string) error {
// RID database changes from `rid` to `defaultdb` when moving down from 4.0.0
dbName = "defaultdb"
}
actualVersion, err := crdb.GetVersion(ctx, dbName)
actualVersion, err := ds.GetSchemaVersion(ctx, dbName)
if err != nil {
return fmt.Errorf("failed to get current database version for %s: %w", dbName, err)
}
Expand Down Expand Up @@ -244,17 +239,3 @@ func enumerateMigrationSteps(path *string) ([]MigrationStep, error) {

return result, nil
}

func doesDatabaseExist(ctx context.Context, crdb *cockroach.DB, database string) (bool, error) {
const checkDbQuery = `
SELECT EXISTS (
SELECT * FROM pg_database WHERE datname = $1
)`

var exists bool
if err := crdb.Pool.QueryRow(ctx, checkDbQuery, database).Scan(&exists); err != nil {
return false, err
}

return exists, nil
}
Loading

0 comments on commit 11c42cd

Please sign in to comment.