Skip to content

Commit

Permalink
Migration CLI adaptations
Browse files Browse the repository at this point in the history
  • Loading branch information
barroco committed Dec 2, 2024
1 parent 59a8d2b commit dcf7215
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 12 deletions.
26 changes: 26 additions & 0 deletions build/dev/docker-compose_dss.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@ services:
- dss_sandbox_default_network
profiles: ["with-yugabyte"]

local-dss-rid-bootstrapper-ybdb:
build:
context: ../..
dockerfile: Dockerfile
image: interuss-local/dss
command: /usr/bin/db-manager migrate --schemas_dir=/db-schemas/yugabyte/rid --db_version "latest" --cockroach_host local-dss-ybdb --cockroach_user yugabyte --cockroach_port 5433
depends_on:
local-dss-crdb:
condition: service_healthy
networks:
- dss_sandbox_default_network
profiles: ["with-yugabyte"]

local-dss-scd-bootstrapper-ybdb:
build:
context: ../..
dockerfile: Dockerfile
image: interuss-local/dss
entrypoint: /usr/bin/db-manager migrate --schemas_dir=/db-schemas/yugabyte/scd --db_version "latest" --cockroach_host local-dss-ybdb --cockroach_user yugabyte --cockroach_port 5433
depends_on:
local-dss-crdb:
condition: service_healthy
networks:
- dss_sandbox_default_network
profiles: ["with-yugabyte"]

local-dss-rid-bootstrapper:
build:
context: ../..
Expand Down
52 changes: 40 additions & 12 deletions cmds/db-manager/migration/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,19 @@ func migrate(cmd *cobra.Command, _ []string) error {
ds.Pool.Close()
}()

log.Printf("CRDB server version: %s", ds.Version.SemVer.String())
log.Printf("Datastore server type and version: %s@%s", ds.Version.Type, ds.Version.SemVer.String())

var (
isCockroach = ds.Version.Type == datastore.CockroachDB
isYugabyte = ds.Version.Type == datastore.Yugabyte
)

// Make sure specified database exists
exists, err := ds.DatabaseExists(ctx, dbName)
if err != nil {
return fmt.Errorf("failed to check whether database %s exists: %w", dbName, err)
}
if !exists && dbName == "rid" {
if isCockroach && !exists && dbName == "rid" {
// In the special case of rid, the database was previously named defaultdb
log.Printf("Database %s does not exist; checking for older \"defaultdb\" database", dbName)
dbName = "defaultdb"
Expand All @@ -103,14 +108,25 @@ func migrate(cmd *cobra.Command, _ []string) error {
}
if !exists {
log.Printf("Database %s does not exist; creating now", dbName)
createDB := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbName)
createDB := fmt.Sprintf("CREATE DATABASE %s", dbName)
if _, err := ds.Pool.Exec(ctx, createDB); err != nil {
return fmt.Errorf("failed to create new database %s: %v", dbName, err)
}
} else {
log.Printf("Database %s already exists; reading current state", dbName)
}

if isYugabyte {
// Reconnect to proper database (Yugabyte does not support cross-database references)
connectParameters = crdbflags.ConnectParameters()
connectParameters.ApplicationName = "db-manager"
connectParameters.DBName = dbName
ds, err = datastore.Dial(ctx, connectParameters)
if err != nil {
return fmt.Errorf("failed to reconnect to database %s: %w", dbName, err)
}
}

// Read current schema version of database
currentVersion, err := ds.GetSchemaVersion(ctx, dbName)
if err != nil {
Expand Down Expand Up @@ -157,25 +173,37 @@ func migrate(cmd *cobra.Command, _ []string) error {

// Ensure SQL session has implicit transactions disabled for CRDB versions 22.2+
sessionConfigurationSQL := ""
if ds.Version.SemVer.Compare(*semver.New("22.2.0")) >= 0 {
if isCockroach && ds.Version.SemVer.Compare(*semver.New("22.2.0")) >= 0 {
sessionConfigurationSQL = "SET enable_implicit_transaction_for_batch_statements = false;\n"
}

migrationSQL := sessionConfigurationSQL + fmt.Sprintf("USE %s;\n", dbName) + string(rawMigrationSQL)
migrationSQL := ""
if isCockroach {
migrationSQL = sessionConfigurationSQL + fmt.Sprintf("USE %s;\n", dbName) + string(rawMigrationSQL)
} else {
ds, err = datastore.Dial(ctx, connectParameters)
if err != nil {
return fmt.Errorf("failed to reconnect to database %s: %w", dbName, err)
}
migrationSQL = sessionConfigurationSQL + string(rawMigrationSQL)
}

// Execute migration step
if _, err := ds.Pool.Exec(ctx, migrationSQL); err != nil {
return fmt.Errorf("failed to execute %s migration step %s: %w", dbName, fullFilePath, err)
}

// Update current state
if dbName == "defaultdb" && newVersion.String() == "4.0.0" && newCurrentStepIndex > currentStepIndex {
// RID database changes from `defaultdb` to `rid` when moving up to 4.0.0
dbName = "rid"
}
if dbName == "rid" && currentVersion.String() == "4.0.0" && newCurrentStepIndex < currentStepIndex {
// RID database changes from `rid` to `defaultdb` when moving down from 4.0.0
dbName = "defaultdb"
if isCockroach {
// Update current state for CRDB
if dbName == "defaultdb" && newVersion.String() == "4.0.0" && newCurrentStepIndex > currentStepIndex {
// RID database changes from `defaultdb` to `rid` when moving up to 4.0.0
dbName = "rid"
}
if dbName == "rid" && currentVersion.String() == "4.0.0" && newCurrentStepIndex < currentStepIndex {
// RID database changes from `rid` to `defaultdb` when moving down from 4.0.0
dbName = "defaultdb"
}
}
actualVersion, err := ds.GetSchemaVersion(ctx, dbName)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func initDatastore(ctx context.Context, pool *pgxpool.Pool) (*Datastore, error)
if version.Type == CockroachDB {
return &Datastore{Version: version, Pool: pool}, nil
}
if version.Type == Yugabyte {
return &Datastore{Version: version, Pool: pool}, nil
}
return nil, stacktrace.NewError("%s is not implemented yet", version.Type)
}

Expand Down Expand Up @@ -99,6 +102,9 @@ func (ds *Datastore) GetSchemaVersion(ctx context.Context, dbName string) (*semv
if dbName == "" {
return nil, stacktrace.NewError("GetSchemaVersion was provided with an empty database name")
}
if ds.Version.Type == Yugabyte && dbName != ds.Pool.Config().ConnConfig.Database {
return nil, stacktrace.NewError("Yugabyte do not support switching databases with the same connection. Unable to retrieve schema version for database %s while connected to %s.", dbName, ds.Pool.Config().ConnConfig.Database)
}

var (
checkTableQuery = fmt.Sprintf(`
Expand Down

0 comments on commit dcf7215

Please sign in to comment.