diff --git a/backup/backup.go b/backup/backup.go index 621a89db8..9a8b485e3 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -265,7 +265,6 @@ func backupData(tables []Table) { gplog.Info("Data backup complete") return } - if MustGetFlagBool(options.SINGLE_DATA_FILE) { gplog.Verbose("Initializing pipes and gpbackup_helper on segments for single data file backup") utils.VerifyHelperVersionOnSegments(version, globalCluster) @@ -274,22 +273,26 @@ func backupData(tables []Table) { oidList = append(oidList, fmt.Sprintf("%d", table.Oid)) } utils.WriteOidListToSegments(oidList, globalCluster, globalFPInfo) - utils.CreateFirstSegmentPipeOnAllHosts(oidList[0], globalCluster, globalFPInfo) compressStr := fmt.Sprintf(" --compression-level %d --compression-type %s", MustGetFlagInt(options.COMPRESSION_LEVEL), MustGetFlagString(options.COMPRESSION_TYPE)) if MustGetFlagBool(options.NO_COMPRESSION) { compressStr = " --compression-level 0" } + initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool, globalFPInfo) // Do not pass through the --on-error-continue flag because it does not apply to gpbackup utils.StartGpbackupHelpers(globalCluster, globalFPInfo, "--backup-agent", - MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated) + MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated, initialPipes) } gplog.Info("Writing data to file") - rowsCopiedMaps := backupDataForAllTables(tables) + var rowsCopiedMaps []map[uint32]int64 + if FlagChanged(options.COPY_QUEUE_SIZE) { + rowsCopiedMaps = backupDataForAllTablesCopyQueue(tables) + } else { + rowsCopiedMaps = backupDataForAllTables(tables) + } AddTableDataEntriesToTOC(tables, rowsCopiedMaps) if MustGetFlagBool(options.SINGLE_DATA_FILE) && MustGetFlagString(options.PLUGIN_CONFIG) != "" { pluginConfig.BackupSegmentTOCs(globalCluster, globalFPInfo) } - logCompletionMessage("Data backup") } @@ -505,3 +508,17 @@ func logCompletionMessage(msg string) { gplog.Info("%s complete", msg) } } + +func CreateInitialSegmentPipes(oidList []string, c *cluster.Cluster, connectionPool *dbconn.DBConn, fpInfo filepath.FilePathInfo) int { + // Create min(connections, tables) segment pipes on each host + var maxPipes int + if connectionPool.NumConns < len(oidList) { + maxPipes = connectionPool.NumConns + } else { + maxPipes = len(oidList) + } + for i := 0; i < maxPipes; i++ { + utils.CreateSegmentPipeOnAllHosts(oidList[i], c, fpInfo) + } + return maxPipes +} diff --git a/backup/data.go b/backup/data.go index 7f92cdcde..04e20a7d1 100644 --- a/backup/data.go +++ b/backup/data.go @@ -5,10 +5,12 @@ package backup */ import ( + "errors" "fmt" "strings" "sync" "sync/atomic" + "time" "github.com/greenplum-db/gp-common-go-libs/dbconn" "github.com/greenplum-db/gp-common-go-libs/gplog" @@ -85,13 +87,14 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite } func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { - atomic.AddInt64(&counters.NumRegTables, 1) - numTables := counters.NumRegTables //We save this so it won't be modified before we log it + logMessage := fmt.Sprintf("Worker %d: Writing data for table %s to file", whichConn, table.FQN()) + // Avoid race condition by incrementing counters in call to sprintf + tableCount := fmt.Sprintf(" (table %d of %d)", atomic.AddInt64(&counters.NumRegTables, 1), counters.TotalRegTables) if gplog.GetVerbosity() > gplog.LOGINFO { // No progress bar at this log level, so we note table count here - gplog.Verbose("Writing data for table %s to file (table %d of %d)", table.FQN(), numTables, counters.TotalRegTables) + gplog.Verbose(logMessage + tableCount) } else { - gplog.Verbose("Writing data for table %s to file", table.FQN()) + gplog.Verbose(logMessage) } destinationToWrite := "" @@ -109,6 +112,147 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters return nil } +// backupDataForAllTablesCopyQueue does not backup tables in parallel. This is +// specifically for single-data-file. While tables backed up one at a time, it +// is possible to save a significant amount of time by queuing up the next +// table to copy. Worker 0 does not have tables pre-assigned to it. The copy +// queue uses worker 0 a special deferred worker in the event that the other +// workers encounter locking issues. Worker 0 already has all locks on the +// tables so it will not run into locking issues. +func backupDataForAllTablesCopyQueue(tables []Table) []map[uint32]int64 { + counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables))} + counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO) + counters.ProgressBar.Start() + rowsCopiedMaps := make([]map[uint32]int64, connectionPool.NumConns) + /* + * We break when an interrupt is received and rely on + * TerminateHangingCopySessions to kill any COPY statements + * in progress if they don't finish on their own. + */ + tasks := make(chan Table, len(tables)) + var oidMap sync.Map + var workerPool sync.WaitGroup + var copyErr error + // Record and track tables in a hashmap of oids and table states (preloaded with value Unknown). + // The tables are loaded into the tasks channel for the subsequent goroutines to work on. + for _, table := range tables { + oidMap.Store(table.Oid, Unknown) + tasks <- table + } + // We incremented numConns by 1 to treat connNum 0 as a special worker + rowsCopiedMaps[0] = make(map[uint32]int64) + for connNum := 1; connNum < connectionPool.NumConns; connNum++ { + rowsCopiedMaps[connNum] = make(map[uint32]int64) + workerPool.Add(1) + go func(whichConn int) { + defer workerPool.Done() + for table := range tasks { + if wasTerminated || copyErr != nil { + counters.ProgressBar.(*pb.ProgressBar).NotPrint = true + return + } + + // If a random external SQL command had queued an AccessExclusiveLock acquisition request + // against this next table, the --job worker thread would deadlock on the COPY attempt. + // To prevent gpbackup from hanging, we attempt to acquire an AccessShareLock on the + // relation with the NOWAIT option before we run COPY. If the LOCK TABLE NOWAIT call + // fails, we catch the error and defer the table to the main worker thread, worker 0. + // Afterwards, we break early and terminate the worker since its transaction is now in an + // aborted state. We do not need to do this with the main worker thread because it has + // already acquired AccessShareLocks on all tables before the metadata dumping part. + err := LockTableNoWait(table, whichConn) + if err != nil { + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != PG_LOCK_NOT_AVAILABLE { + copyErr = err + continue + } + + if gplog.GetVerbosity() < gplog.LOGVERBOSE { + // Add a newline to interrupt the progress bar so that + // the following WARN message is nicely outputted. + fmt.Printf("\n") + } + gplog.Warn("Worker %d could not acquire AccessShareLock for table %s. Terminating worker and deferring table to main worker thread.", + whichConn, table.FQN()) + + oidMap.Store(table.Oid, Deferred) + // Rollback transaction since it's in an aborted state + connectionPool.MustRollback(whichConn) + + // Worker no longer has a valid distributed transaction snapshot + break + } + + err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn) + if err != nil { + copyErr = err + } + oidMap.Store(table.Oid, Complete) + } + }(connNum) + } + + // Special goroutine to handle deferred tables + // Handle all tables deferred by the deadlock detection. This can only be + // done with the main worker thread, worker 0, because it has + // AccessShareLocks on all the tables already. + deferredWorkerDone := make(chan bool) + go func() { + for _, table := range tables { + for { + state, _ := oidMap.Load(table.Oid) + if state.(int) == Unknown { + time.Sleep(time.Millisecond * 50) + } else if state.(int) == Deferred { + err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0) + if err != nil { + copyErr = err + } + oidMap.Store(table.Oid, Complete) + break + } else if state.(int) == Complete { + break + } else { + gplog.Fatal(errors.New("Encountered unknown table state"), "") + } + } + } + deferredWorkerDone <- true + }() + + close(tasks) + workerPool.Wait() + + // Check if all the workers were terminated. If they did, defer all remaining tables to worker 0 + allWorkersTerminatedLogged := false + for _, table := range tables { + state, _ := oidMap.Load(table.Oid) + if state == Unknown { + if !allWorkersTerminatedLogged { + gplog.Warn("All copy queue workers terminated due to lock issues. Falling back to single main worker.") + allWorkersTerminatedLogged = true + } + oidMap.Store(table.Oid, Deferred) + } + } + // Main goroutine waits for deferred worker 0 by waiting on this channel + <-deferredWorkerDone + + agentErr := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo) + + if copyErr != nil && agentErr != nil { + gplog.Error(agentErr.Error()) + gplog.Fatal(copyErr, "") + } else if copyErr != nil { + gplog.Fatal(copyErr, "") + } else if agentErr != nil { + gplog.Fatal(agentErr, "") + } + + counters.ProgressBar.Finish() + return rowsCopiedMaps +} + func backupDataForAllTables(tables []Table) []map[uint32]int64 { counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables))} counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO) @@ -146,8 +290,7 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 { if whichConn != 0 { err := LockTableNoWait(table, whichConn) if err != nil { - // Postgres Error Code 55P03 translates to LOCK_NOT_AVAILABLE - if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != "55P03" { + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != PG_LOCK_NOT_AVAILABLE { copyErr = err continue } diff --git a/backup/global_variables.go b/backup/global_variables.go index 11b45adbb..235d0c4f4 100644 --- a/backup/global_variables.go +++ b/backup/global_variables.go @@ -19,6 +19,16 @@ import ( * used in testing. */ +/* + Table backup state constants +*/ +const ( + Unknown int = iota + Deferred + Complete + PG_LOCK_NOT_AVAILABLE = "55P03" +) + /* * Non-flag variables */ @@ -99,6 +109,10 @@ func SetQuotedRoleNames(quotedRoles map[string]string) { // Util functions to enable ease of access to global flag values +func FlagChanged(flagName string) bool { + return cmdFlags.Changed(flagName) +} + func MustGetFlagString(flagName string) string { return options.MustGetFlagString(cmdFlags, flagName) } diff --git a/backup/validate.go b/backup/validate.go index e13fc6cad..610a6b2a0 100644 --- a/backup/validate.go +++ b/backup/validate.go @@ -105,6 +105,9 @@ func validateFlagCombinations(flags *pflag.FlagSet) { options.CheckExclusiveFlags(flags, options.NO_COMPRESSION, options.COMPRESSION_TYPE) options.CheckExclusiveFlags(flags, options.NO_COMPRESSION, options.COMPRESSION_LEVEL) options.CheckExclusiveFlags(flags, options.PLUGIN_CONFIG, options.BACKUP_DIR) + if FlagChanged(options.COPY_QUEUE_SIZE) && !MustGetFlagBool(options.SINGLE_DATA_FILE) { + gplog.Fatal(errors.Errorf("--copy-queue-size must be specified with --single-data-file"), "") + } if MustGetFlagString(options.FROM_TIMESTAMP) != "" && !MustGetFlagBool(options.INCREMENTAL) { gplog.Fatal(errors.Errorf("--from-timestamp must be specified with --incremental"), "") } @@ -124,6 +127,10 @@ func validateFlagValues() { gplog.Fatal(errors.Errorf("Timestamp %s is invalid. Timestamps must be in the format YYYYMMDDHHMMSS.", MustGetFlagString(options.FROM_TIMESTAMP)), "") } + if FlagChanged(options.COPY_QUEUE_SIZE) && MustGetFlagInt(options.COPY_QUEUE_SIZE) < 2 { + gplog.Fatal(errors.Errorf("--copy-queue-size %d is invalid. Must be at least 2", + MustGetFlagInt(options.COPY_QUEUE_SIZE)), "") + } } func validateFromTimestamp(fromTimestamp string) { diff --git a/backup/wrappers.go b/backup/wrappers.go index cf294d44f..54a02e8dd 100644 --- a/backup/wrappers.go +++ b/backup/wrappers.go @@ -39,7 +39,18 @@ func SetLoggerVerbosity() { func initializeConnectionPool(timestamp string) { connectionPool = dbconn.NewDBConnFromEnvironment(MustGetFlagString(options.DBNAME)) - connectionPool.MustConnect(MustGetFlagInt(options.JOBS)) + var numConns int + switch true { + case FlagChanged(options.COPY_QUEUE_SIZE): + // Connection 0 is reserved for deferred worker, initialize 1 additional connection. + numConns = MustGetFlagInt(options.COPY_QUEUE_SIZE) + 1 + case FlagChanged(options.JOBS): + numConns = MustGetFlagInt(options.JOBS) + default: + numConns = 1 + } + gplog.Verbose(fmt.Sprintf("Initializing %d worker connections", numConns)) + connectionPool.MustConnect(numConns) utils.ValidateGPDBVersionCompatibility(connectionPool) InitializeMetadataParams(connectionPool) for connNum := 0; connNum < connectionPool.NumConns; connNum++ { diff --git a/ci/scripts/scale-tests.bash b/ci/scripts/scale-tests.bash index 510b2a9d7..09370a66a 100755 --- a/ci/scripts/scale-tests.bash +++ b/ci/scripts/scale-tests.bash @@ -32,6 +32,39 @@ set -e ### Data scale tests ### log_file=/tmp/gpbackup.log + +echo "## Populating database for copy queue test ##" +createdb copyqueuedb +for j in {1..20000} +do + psql -d copyqueuedb -q -c "CREATE TABLE tbl_1k_\$j(i int) DISTRIBUTED BY (i);" + psql -d copyqueuedb -q -c "INSERT INTO tbl_1k_\$j SELECT generate_series(1,1000)" +done + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 2 backup for copy queue test ##" +time gpbackup --dbname copyqueuedb --backup-dir /data/gpdata/ --single-data-file --no-compression --copy-queue-size 2 | tee "\$log_file" +timestamp=\$(head -10 "\$log_file" | grep "Backup Timestamp " | grep -Eo "[[:digit:]]{14}") +gpbackup_manager display-report \$timestamp + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 4 backup for copy queue test ##" +time gpbackup --dbname copyqueuedb --backup-dir /data/gpdata/ --single-data-file --no-compression --copy-queue-size 4 | tee "\$log_file" +timestamp=\$(head -10 "\$log_file" | grep "Backup Timestamp " | grep -Eo "[[:digit:]]{14}") +gpbackup_manager display-report \$timestamp + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 8 backup for copy queue test ##" +time gpbackup --dbname copyqueuedb --backup-dir /data/gpdata/ --single-data-file --no-compression --copy-queue-size 8 | tee "\$log_file" +timestamp=\$(head -10 "\$log_file" | grep "Backup Timestamp " | grep -Eo "[[:digit:]]{14}") +gpbackup_manager display-report \$timestamp + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 2 restore for copy queue test ##" +time gprestore --timestamp "\$timestamp" --backup-dir /data/gpdata/ --create-db --redirect-db copyqueuerestore2 --copy-queue-size 2 + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 4 restore for copy queue test ##" +time gprestore --timestamp "\$timestamp" --backup-dir /data/gpdata/ --create-db --redirect-db copyqueuerestore4 --copy-queue-size 4 + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 8 restore for copy queue test ##" +time gprestore --timestamp "\$timestamp" --backup-dir /data/gpdata/ --create-db --redirect-db copyqueuerestore8 --copy-queue-size 8 + echo "## Populating database for data scale test ##" createdb datascaledb for j in {1..5000} diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 98a2a0f69..366b67cf7 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -3,7 +3,6 @@ package end_to_end_test import ( "flag" "fmt" - "github.com/pkg/errors" "io/ioutil" "math/rand" "os" @@ -16,6 +15,7 @@ import ( "testing" "time" + "github.com/pkg/errors" "github.com/blang/semver" "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" @@ -563,6 +563,34 @@ var _ = Describe("backup and restore end to end tests", func() { Expect(stdout).To(ContainSubstring("Cleanup complete")) Expect(stdout).To(Not(ContainSubstring("CRITICAL"))) }) + It("runs gpbackup with copy-queue-size and sends a SIGINT to ensure cleanup functions successfully", func() { + if useOldBackupVersion { + Skip("This test is not needed for old backup versions") + } + args := []string{"--dbname", "testdb", + "--backup-dir", backupDir, + "--single-data-file", + "--copy-queue-size", "4", + "--verbose"} + cmd := exec.Command(gpbackupPath, args...) + go func() { + /* + * We use a random delay for the sleep in this test (between + * 0.5s and 0.8s) so that gpbackup will be interrupted at a + * different point in the backup process every time to help + * catch timing issues with the cleanup. + */ + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + time.Sleep(time.Duration(rng.Intn(300)+500) * time.Millisecond) + _ = cmd.Process.Signal(os.Interrupt) + }() + output, _ := cmd.CombinedOutput() + stdout := string(output) + + Expect(stdout).To(ContainSubstring("Received a termination signal, aborting backup process")) + Expect(stdout).To(ContainSubstring("Cleanup complete")) + Expect(stdout).To(Not(ContainSubstring("CRITICAL"))) + }) It("runs gprestore and sends a SIGINT to ensure cleanup functions successfully", func() { if useOldBackupVersion { Skip("This test is not needed for old backup versions") @@ -595,6 +623,39 @@ var _ = Describe("backup and restore end to end tests", func() { Expect(stdout).To(Not(ContainSubstring("CRITICAL"))) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gprestore with copy-queue-size and sends a SIGINT to ensure cleanup functions successfully", func() { + if useOldBackupVersion { + Skip("This test is not needed for old backup versions") + } + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--backup-dir", backupDir, + "--single-data-file") + args := []string{ + "--timestamp", timestamp, + "--redirect-db", "restoredb", + "--backup-dir", backupDir, + "--verbose", + "--copy-queue-size", "4"} + cmd := exec.Command(gprestorePath, args...) + go func() { + /* + * We use a random delay for the sleep in this test (between + * 0.5s and 0.8s) so that gprestore will be interrupted at a + * different point in the backup process every time to help + * catch timing issues with the cleanup. + */ + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + time.Sleep(time.Duration(rng.Intn(300)+500) * time.Millisecond) + _ = cmd.Process.Signal(os.Interrupt) + }() + output, _ := cmd.CombinedOutput() + stdout := string(output) + + Expect(stdout).To(ContainSubstring("Received a termination signal, aborting restore process")) + Expect(stdout).To(ContainSubstring("Cleanup complete")) + Expect(stdout).To(Not(ContainSubstring("CRITICAL"))) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and sends a SIGINT to ensure blocked LOCK TABLE query is canceled", func() { if useOldBackupVersion { Skip("This test is not needed for old backup versions") @@ -1360,7 +1421,6 @@ var _ = Describe("backup and restore end to end tests", func() { "--jobs", "9", "--verbose"} cmd := exec.Command(gpbackupPath, args...) - // Concurrently wait for gpbackup to block when it requests an AccessShareLock on public.foo. Once // that happens, acquire an AccessExclusiveLock on pg_catalog.pg_trigger to block gpbackup during its // trigger metadata dump. Then release the initial AccessExclusiveLock on public.foo (from the @@ -1475,6 +1535,137 @@ var _ = Describe("backup and restore end to end tests", func() { Expect(stdout).To(ContainSubstring("Backup completed successfully")) }) + It("runs gpbackup with copy-queue-size flag and COPY deadlock handling occurs", func() { + if useOldBackupVersion { + Skip("This test is not needed for old backup versions") + } + // Acquire AccessExclusiveLock on public.foo to block gpbackup when it attempts + // to grab AccessShareLocks before its metadata dump section. + backupConn.MustExec("BEGIN; LOCK TABLE public.foo IN ACCESS EXCLUSIVE MODE") + + // Execute gpbackup with --copy-queue-size 2 + args := []string{ + "--dbname", "testdb", + "--backup-dir", backupDir, + "--single-data-file", + "--copy-queue-size", "2", + "--verbose"} + cmd := exec.Command(gpbackupPath, args...) + + // Concurrently wait for gpbackup to block when it requests an AccessShareLock on public.foo. Once + // that happens, acquire an AccessExclusiveLock on pg_catalog.pg_trigger to block gpbackup during its + // trigger metadata dump. Then release the initial AccessExclusiveLock on public.foo (from the + // beginning of the test) to unblock gpbackup and let gpbackup move forward to the trigger metadata dump. + anotherConn := testutils.SetupTestDbConn("testdb") + defer anotherConn.Close() + go func() { + // Query to see if gpbackup's AccessShareLock request on public.foo is blocked + checkLockQuery := `SELECT count(*) FROM pg_locks l, pg_class c, pg_namespace n WHERE l.relation = c.oid AND n.oid = c.relnamespace AND n.nspname = 'public' AND c.relname = 'foo' AND l.granted = 'f' AND l.mode = 'AccessShareLock'` + + // Wait up to 10 seconds for gpbackup to block + var gpbackupBlockedLockCount int + iterations := 100 + for iterations > 0 { + _ = anotherConn.Get(&gpbackupBlockedLockCount, checkLockQuery) + if gpbackupBlockedLockCount < 1 { + time.Sleep(100 * time.Millisecond) + iterations-- + } else { + break + } + } + + // Queue AccessExclusiveLock request on pg_catalog.pg_trigger to block gpbackup + // during the trigger metadata dump so that the test can queue a bunch of + // AccessExclusiveLock requests against the test tables. Afterwards, release the + // AccessExclusiveLock on public.foo to let gpbackup go to the trigger metadata dump. + anotherConn.MustExec(`BEGIN; LOCK TABLE pg_catalog.pg_trigger IN ACCESS EXCLUSIVE MODE`) + backupConn.MustExec("COMMIT") + }() + + // Concurrently wait for gpbackup to block on the trigger metadata dump section. Once we + // see gpbackup blocked, request AccessExclusiveLock (to imitate a TRUNCATE or VACUUM + // FULL) on all the test tables. + dataTables := []string{`public."FOObar"`, "public.foo", "public.holds", "public.sales", + "schema2.ao1", "schema2.ao2", "schema2.foo2", "schema2.foo3", "schema2.returns"} + for _, dataTable := range dataTables { + go func(dataTable string) { + accessExclusiveLockConn := testutils.SetupTestDbConn("testdb") + defer accessExclusiveLockConn.Close() + + // Query to see if gpbackup's AccessShareLock request on pg_catalog.pg_trigger is blocked + checkLockQuery := `SELECT count(*) FROM pg_locks l, pg_class c, pg_namespace n WHERE l.relation = c.oid AND n.oid = c.relnamespace AND n.nspname = 'pg_catalog' AND c.relname = 'pg_trigger' AND l.granted = 'f' AND l.mode = 'AccessShareLock'` + + // Wait up to 10 seconds for gpbackup to block + var gpbackupBlockedLockCount int + iterations := 100 + for iterations > 0 { + _ = accessExclusiveLockConn.Get(&gpbackupBlockedLockCount, checkLockQuery) + if gpbackupBlockedLockCount < 1 { + time.Sleep(100 * time.Millisecond) + iterations-- + } else { + break + } + } + + // Queue an AccessExclusiveLock request on a test table which will later + // result in a detected deadlock during the gpbackup data dump section. + accessExclusiveLockConn.MustExec(fmt.Sprintf(`BEGIN; LOCK TABLE %s IN ACCESS EXCLUSIVE MODE; COMMIT`, dataTable)) + }(dataTable) + } + + // Concurrently wait for all AccessExclusiveLock requests on all 9 test tables to block. + // Once that happens, release the AccessExclusiveLock on pg_catalog.pg_trigger to unblock + // gpbackup and let gpbackup move forward to the data dump section. + var accessExclBlockedLockCount int + go func() { + // Query to check for ungranted AccessExclusiveLock requests on our test tables + checkLockQuery := `SELECT count(*) FROM pg_locks WHERE granted = 'f' AND mode = 'AccessExclusiveLock'` + + // Wait up to 10 seconds + iterations := 100 + for iterations > 0 { + _ = backupConn.Get(&accessExclBlockedLockCount, checkLockQuery) + if accessExclBlockedLockCount < 9 { + time.Sleep(100 * time.Millisecond) + iterations-- + } else { + break + } + } + + // Unblock gpbackup by releasing AccessExclusiveLock on pg_catalog.pg_trigger + anotherConn.MustExec("COMMIT") + }() + + // gpbackup has finished + output, _ := cmd.CombinedOutput() + stdout := string(output) + + // Sanity check that 9 deadlock traps were placed during the test + Expect(accessExclBlockedLockCount).To(Equal(9)) + Expect(stdout).To(ContainSubstring("All copy queue workers terminated due to lock issues. Falling back to single main worker.")) + // No non-main worker should have been able to run COPY due to deadlock detection + for i := 1; i < 2; i++ { + expectedLockString := fmt.Sprintf("[DEBUG]:-Worker %d: LOCK TABLE ", i) + Expect(stdout).To(ContainSubstring(expectedLockString)) + + expectedWarnString := fmt.Sprintf("[WARNING]:-Worker %d could not acquire AccessShareLock for table", i) + Expect(stdout).To(ContainSubstring(expectedWarnString)) + + unexpectedCopyString := fmt.Sprintf("[DEBUG]:-Worker %d: COPY ", i) + Expect(stdout).ToNot(ContainSubstring(unexpectedCopyString)) + } + + // Only the main worker thread, worker 0, will run COPY on all the test tables + for _, dataTable := range dataTables { + expectedString := fmt.Sprintf(`[DEBUG]:-Worker 0: COPY %s TO PROGRAM `, dataTable) + Expect(stdout).To(ContainSubstring(expectedString)) + } + + Expect(stdout).To(ContainSubstring("Backup completed successfully")) + }) It("successfully backs up foreign tables", func() { if backupConn.Version.Before("6") { Skip("Test does not apply for GPDB versions before 6") diff --git a/end_to_end/incremental_test.go b/end_to_end/incremental_test.go index 81e0f7093..2f6713ff1 100644 --- a/end_to_end/incremental_test.go +++ b/end_to_end/incremental_test.go @@ -363,6 +363,51 @@ var _ = Describe("End to End incremental tests", func() { assertArtifactsCleaned(restoreConn, incremental1Timestamp) assertArtifactsCleaned(restoreConn, incremental2Timestamp) }) + It("Restores from an incremental backup based on a from-timestamp incremental with --copy-queue-size", func() { + fullBackupTimestamp := gpbackup(gpbackupPath, backupHelperPath, + "--leaf-partition-data", + "--single-data-file", + "--copy-queue-size", "4", + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, fullBackupTimestamp) + testhelper.AssertQueryRuns(backupConn, + "INSERT into schema2.ao1 values(1001)") + defer testhelper.AssertQueryRuns(backupConn, + "DELETE from schema2.ao1 where i=1001") + incremental1Timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--incremental", + "--leaf-partition-data", + "--single-data-file", + "--copy-queue-size", "4", + "--from-timestamp", fullBackupTimestamp, + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, incremental1Timestamp) + + testhelper.AssertQueryRuns(backupConn, + "INSERT into schema2.ao1 values(1002)") + defer testhelper.AssertQueryRuns(backupConn, + "DELETE from schema2.ao1 where i=1002") + incremental2Timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--incremental", + "--leaf-partition-data", + "--single-data-file", + "--copy-queue-size", "4", + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, incremental2Timestamp) + + gprestore(gprestorePath, restoreHelperPath, incremental2Timestamp, + "--redirect-db", "restoredb", + "--copy-queue-size", "4", + "--plugin-config", pluginConfigPath) + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + schema2TupleCounts["schema2.ao1"] = 1002 + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, fullBackupTimestamp) + assertArtifactsCleaned(restoreConn, incremental1Timestamp) + assertArtifactsCleaned(restoreConn, incremental2Timestamp) + }) It("Runs backup and restore if plugin location changed", func() { pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) fullBackupTimestamp := gpbackup(gpbackupPath, backupHelperPath, diff --git a/end_to_end/plugin_test.go b/end_to_end/plugin_test.go index 6fb497c1c..6d4217dca 100644 --- a/end_to_end/plugin_test.go +++ b/end_to_end/plugin_test.go @@ -64,6 +64,23 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) + }) + It("runs gpbackup and gprestore with single-data-file flag with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--copy-queue-size", "4", + "--backup-dir", backupDir) + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--copy-queue-size", "4", + "--backup-dir", backupDir) + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and gprestore with single-data-file flag without compression", func() { timestamp := gpbackup(gpbackupPath, backupHelperPath, @@ -79,6 +96,23 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore with single-data-file flag without compression with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--copy-queue-size", "4", + "--backup-dir", backupDir, + "--no-compression") + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--copy-queue-size", "4", + "--backup-dir", backupDir) + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and gprestore on database with all objects", func() { testhelper.AssertQueryRuns(backupConn, "DROP SCHEMA IF EXISTS schema2 CASCADE; DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP PROCEDURAL LANGUAGE IF EXISTS plpythonu;") @@ -128,6 +162,58 @@ var _ = Describe("End to End plugin tests", func() { "--redirect-db", "restoredb") assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore on database with all objects with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + testhelper.AssertQueryRuns(backupConn, + "DROP SCHEMA IF EXISTS schema2 CASCADE; DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP PROCEDURAL LANGUAGE IF EXISTS plpythonu;") + defer testutils.ExecuteSQLFile(backupConn, + "resources/test_tables_data.sql") + defer testutils.ExecuteSQLFile(backupConn, + "resources/test_tables_ddl.sql") + defer testhelper.AssertQueryRuns(backupConn, + "DROP SCHEMA IF EXISTS schema2 CASCADE; DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP PROCEDURAL LANGUAGE IF EXISTS plpythonu;") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP SCHEMA IF EXISTS schema2 CASCADE; DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP PROCEDURAL LANGUAGE IF EXISTS plpythonu;") + testhelper.AssertQueryRuns(backupConn, + "CREATE ROLE testrole SUPERUSER") + defer testhelper.AssertQueryRuns(backupConn, + "DROP ROLE testrole") + testutils.ExecuteSQLFile(backupConn, "resources/gpdb4_objects.sql") + if backupConn.Version.AtLeast("5") { + testutils.ExecuteSQLFile(backupConn, "resources/gpdb5_objects.sql") + } + if backupConn.Version.AtLeast("6") { + testutils.ExecuteSQLFile(backupConn, "resources/gpdb6_objects.sql") + defer testhelper.AssertQueryRuns(backupConn, + "DROP FOREIGN DATA WRAPPER fdw CASCADE;") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP FOREIGN DATA WRAPPER fdw CASCADE;") + } + if backupConn.Version.AtLeast("6.2") { + testhelper.AssertQueryRuns(backupConn, + "CREATE TABLE mview_table1(i int, j text);") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP TABLE mview_table1;") + testhelper.AssertQueryRuns(backupConn, + "CREATE MATERIALIZED VIEW mview1 (i2) as select i from mview_table1;") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP MATERIALIZED VIEW mview1;") + testhelper.AssertQueryRuns(backupConn, + "CREATE MATERIALIZED VIEW mview2 as select * from mview1;") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP MATERIALIZED VIEW mview2;") + } + + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--leaf-partition-data", + "--single-data-file", + "--copy-queue-size", "4") + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--metadata-only", + "--redirect-db", "restoredb", + "--copy-queue-size", "4") + assertArtifactsCleaned(restoreConn, timestamp) + }) Context("with include filtering on restore", func() { It("runs gpbackup and gprestore with include-table-file restore flag with a single data file", func() { @@ -147,6 +233,26 @@ var _ = Describe("End to End plugin tests", func() { _ = os.Remove("/tmp/include-tables.txt") }) + It("runs gpbackup and gprestore with include-table-file restore flag with a single data with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + includeFile := iohelper.MustOpenFileForWriting("/tmp/include-tables.txt") + utils.MustPrintln(includeFile, "public.sales\npublic.foo\npublic.myseq1\npublic.myview1") + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--backup-dir", backupDir, + "--single-data-file", + "--copy-queue-size", "4") + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--backup-dir", backupDir, + "--include-table-file", "/tmp/include-tables.txt", + "--copy-queue-size", "4") + assertRelationsCreated(restoreConn, 16) + assertDataRestored(restoreConn, map[string]int{ + "public.sales": 13, "public.foo": 40000}) + assertArtifactsCleaned(restoreConn, timestamp) + + _ = os.Remove("/tmp/include-tables.txt") + }) It("runs gpbackup and gprestore with include-schema restore flag with a single data file", func() { timestamp := gpbackup(gpbackupPath, backupHelperPath, "--backup-dir", backupDir, @@ -160,6 +266,22 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore with include-schema restore flag with a single data file with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--backup-dir", backupDir, + "--single-data-file", + "--copy-queue-size", "4") + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--backup-dir", backupDir, + "--include-schema", "schema2", + "--copy-queue-size", "4") + + assertRelationsCreated(restoreConn, 17) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) }) Context("with plugin", func() { @@ -189,6 +311,27 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore with plugin, single-data-file, no-compression, and copy-queue-size", func() { + pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) + copyPluginToAllHosts(backupConn, pluginExecutablePath) + + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--copy-queue-size", "4", + "--no-compression", + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, timestamp) + + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--plugin-config", pluginConfigPath, + "--copy-queue-size", "4") + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and gprestore with plugin and single-data-file", func() { pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) copyPluginToAllHosts(backupConn, pluginExecutablePath) @@ -207,6 +350,26 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore with plugin, single-data-file, and copy-queue-size", func() { + pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) + copyPluginToAllHosts(backupConn, pluginExecutablePath) + + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--copy-queue-size", "4", + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, timestamp) + + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--plugin-config", pluginConfigPath, + "--copy-queue-size", "4") + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and gprestore with plugin and metadata-only", func() { pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) copyPluginToAllHosts(backupConn, pluginExecutablePath) diff --git a/helper/backup_helper.go b/helper/backup_helper.go index 2c2982b13..a319b5661 100644 --- a/helper/backup_helper.go +++ b/helper/backup_helper.go @@ -31,20 +31,22 @@ func doBackupAgent() error { return err } - currentPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[0]) + preloadCreatedPipes(oidList, *copyQueue) + var currentPipe string /* * It is important that we create the reader before creating the writer * so that we establish a connection to the first pipe (created by gpbackup) * and properly clean it up if an error occurs while creating the writer. */ for i, oid := range oidList { + currentPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) if wasTerminated { return errors.New("Terminated due to user request") } - if i < len(oidList)-1 { - log(fmt.Sprintf("Creating pipe for oid %d\n", oidList[i+1])) - nextPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i+1]) - err := createPipe(nextPipe) + if i < len(oidList)-*copyQueue { + log(fmt.Sprintf("Creating pipe for oid %d\n", oidList[i+*copyQueue])) + nextPipeToCreate := fmt.Sprintf("%s_%d", *pipeFile, oidList[i+*copyQueue]) + err := createPipe(nextPipeToCreate) if err != nil { return err } @@ -73,13 +75,8 @@ func doBackupAgent() error { tocfile.AddSegmentDataEntry(uint(oid), lastRead, lastProcessed) lastRead = lastProcessed - lastPipe = currentPipe - currentPipe = nextPipe _ = readHandle.Close() - err = utils.RemoveFileIfExists(lastPipe) - if err != nil { - return err - } + deletePipe(currentPipe) } _ = pipeWriter.Close() diff --git a/helper/helper.go b/helper/helper.go index af559a4d1..7e4c575ce 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -7,12 +7,12 @@ import ( "fmt" "os" "os/signal" + "path/filepath" "runtime/debug" "strconv" "strings" "sync" "syscall" - "path/filepath" "golang.org/x/sys/unix" @@ -27,14 +27,12 @@ import ( var ( CleanupGroup *sync.WaitGroup - currentPipe string errBuf bytes.Buffer - lastPipe string - nextPipe string version string wasTerminated bool writeHandle *os.File writer *bufio.Writer + pipesMap map[string]bool ) /* @@ -54,6 +52,7 @@ var ( restoreAgent *bool tocFile *string isFiltered *bool + copyQueue *int ) func DoHelper() { @@ -111,18 +110,20 @@ func InitializeGlobals() { restoreAgent = flag.Bool("restore-agent", false, "Use gpbackup_helper as an agent for restore") tocFile = flag.String("toc-file", "", "Absolute path to the table of contents file") isFiltered = flag.Bool("with-filters", false, "Used with table/schema filters") + copyQueue = flag.Int("copy-queue-size", 1, "Used to know how many COPIES are being queued up") if *onErrorContinue && !*restoreAgent { fmt.Printf("--on-error-continue flag can only be used with --restore-agent flag") os.Exit(1) } - flag.Parse() if *printVersion { fmt.Printf("gpbackup_helper version %s\n", version) os.Exit(0) } operating.InitializeSystemFunctions() + + pipesMap = make(map[string]bool, 0) } /* @@ -131,7 +132,30 @@ func InitializeGlobals() { func createPipe(pipe string) error { err := unix.Mkfifo(pipe, 0777) - return err + if err != nil { + return err + } + + pipesMap[pipe] = true + return nil +} + +func deletePipe(pipe string) error { + err := utils.RemoveFileIfExists(pipe) + if err != nil { + return err + } + + delete(pipesMap, pipe) + return nil +} + +// Gpbackup creates the first n pipes. Record these pipes. +func preloadCreatedPipes(oidList []int, queuedPipeCount int) { + for i := 0; i < queuedPipeCount; i++ { + pipeName := fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) + pipesMap[pipeName] = true + } } func getOidListFromFile() ([]int, error) { @@ -185,17 +209,13 @@ func DoCleanup() { if err != nil { log("Encountered error during cleanup: %v", err) } - err = utils.RemoveFileIfExists(lastPipe) - if err != nil { - log("Encountered error during cleanup: %v", err) - } - err = utils.RemoveFileIfExists(currentPipe) - if err != nil { - log("Encountered error during cleanup: %v", err) - } - err = utils.RemoveFileIfExists(nextPipe) - if err != nil { - log("Encountered error during cleanup: %v", err) + + for pipeName, _ := range pipesMap { + log("Removing pipe %s", pipeName) + err = deletePipe(pipeName) + if err != nil { + log("Encountered error removing pipe %s: %v", pipeName, err) + } } skipFiles, _ := filepath.Glob(fmt.Sprintf("%s_skip_*", *pipeFile)) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 4ca9ddfff..886d36942 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -48,7 +48,6 @@ func (r *RestoreReader) positionReader(pos uint64) error { seekPosition, err := r.seekReader.Seek(int64(pos), io.SeekCurrent) if err != nil { // Always hard quit if data reader has issues - _ = utils.RemoveFileIfExists(currentPipe) return err } log(fmt.Sprintf("Data Reader seeked forward to %d byte offset", seekPosition)) @@ -56,7 +55,6 @@ func (r *RestoreReader) positionReader(pos uint64) error { numDiscarded, err := r.bufReader.Discard(int(pos)) if err != nil { // Always hard quit if data reader has issues - _ = utils.RemoveFileIfExists(currentPipe) return err } log(fmt.Sprintf("Data Reader discarded %d bytes", numDiscarded)) @@ -86,7 +84,6 @@ func doRestoreAgent() error { var bytesRead int64 var start uint64 var end uint64 - var errRemove error var lastError error oidList, err := getOidListFromFile() @@ -99,16 +96,20 @@ func doRestoreAgent() error { return err } log(fmt.Sprintf("Using reader type: %s", reader.readerType)) + + preloadCreatedPipes(oidList, *copyQueue) + + var currentPipe string for i, oid := range oidList { if wasTerminated { return errors.New("Terminated due to user request") } currentPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) - if i < len(oidList)-1 { - nextPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i+1]) - log(fmt.Sprintf("Creating pipe for oid %d: %s", oidList[i+1], nextPipe)) - err := createPipe(nextPipe) + if i < len(oidList)-*copyQueue { + log(fmt.Sprintf("Creating pipe for oid %d\n", oidList[i+*copyQueue])) + nextPipeToCreate := fmt.Sprintf("%s_%d", *pipeFile, oidList[i+*copyQueue]) + err := createPipe(nextPipeToCreate) if err != nil { // In the case this error is hit it means we have lost the // ability to create pipes normally, so hard quit even if @@ -145,7 +146,6 @@ func doRestoreAgent() error { // In the case this error is hit it means we have lost the // ability to open pipes normally, so hard quit even if // --on-error-continue is given - _ = utils.RemoveFileIfExists(currentPipe) return err } } else { @@ -185,10 +185,9 @@ func doRestoreAgent() error { LoopEnd: log(fmt.Sprintf("Removing pipe for oid %d: %s", oid, currentPipe)) - errRemove = utils.RemoveFileIfExists(currentPipe) - if errRemove != nil { - _ = utils.RemoveFileIfExists(nextPipe) - return errRemove + errPipe := deletePipe(currentPipe) + if errPipe != nil { + return errPipe } if err != nil { diff --git a/options/flag.go b/options/flag.go index 41117847d..c9696d671 100644 --- a/options/flag.go +++ b/options/flag.go @@ -37,6 +37,7 @@ const ( PLUGIN_CONFIG = "plugin-config" QUIET = "quiet" SINGLE_DATA_FILE = "single-data-file" + COPY_QUEUE_SIZE = "copy-queue-size" VERBOSE = "verbose" WITH_STATS = "with-stats" CREATE_DB = "create-db" @@ -76,6 +77,7 @@ func SetBackupFlagDefaults(flagSet *pflag.FlagSet) { flagSet.Bool("version", false, "Print version number and exit") flagSet.Bool(QUIET, false, "Suppress non-warning, non-error log messages") flagSet.Bool(SINGLE_DATA_FILE, false, "Back up all data to a single file instead of one per table") + flagSet.Int(COPY_QUEUE_SIZE, 1, "number of COPY commands gpbackup should enqueue when backing up using the --single-data-file option") flagSet.Bool(VERBOSE, false, "Print verbose log messages") flagSet.Bool(WITH_STATS, false, "Back up query plan statistics") flagSet.Bool(WITHOUT_GLOBALS, false, "Skip backup of global metadata") @@ -104,6 +106,7 @@ func SetRestoreFlagDefaults(flagSet *pflag.FlagSet) { flagSet.Bool(QUIET, false, "Suppress non-warning, non-error log messages") flagSet.String(REDIRECT_DB, "", "Restore to the specified database instead of the database that was backed up") flagSet.String(REDIRECT_SCHEMA, "", "Restore to the specified schema instead of the schema that was backed up") + flagSet.Int(COPY_QUEUE_SIZE, 1, "Number of COPY commands gprestore should enqueue when restoring a backup taken using the --single-data-file option") flagSet.Bool(WITH_GLOBALS, false, "Restore global metadata") flagSet.String(TIMESTAMP, "", "The timestamp to be restored, in the format YYYYMMDDHHMMSS") flagSet.Bool(TRUNCATE_TABLE, false, "Removes data of the tables getting restored") diff --git a/plugins/plugin_test.sh b/plugins/plugin_test.sh index 81c5e2b7f..5d0371c9f 100755 --- a/plugins/plugin_test.sh +++ b/plugins/plugin_test.sh @@ -486,11 +486,13 @@ test_backup_and_restore_with_plugin() { echo "[PASSED] gpbackup and gprestore (using ${flags})" } +test_backup_and_restore_with_plugin "--single-data-file --no-compression --copy-queue-size 4" "--copy-queue-size 4" test_backup_and_restore_with_plugin "--no-compression --single-data-file" test_backup_and_restore_with_plugin "--no-compression" test_backup_and_restore_with_plugin "--metadata-only" test_backup_and_restore_with_plugin "--no-compression --single-data-file" "restore-filter" + # ---------------------------------------------- # Cleanup test artifacts # ---------------------------------------------- diff --git a/plugins/plugin_test_scale.sh b/plugins/plugin_test_scale.sh index ee8621319..1322db9bb 100755 --- a/plugins/plugin_test_scale.sh +++ b/plugins/plugin_test_scale.sh @@ -142,7 +142,7 @@ if [[ "$plugin" == *gpbackup_s3_plugin ]]; then fi test_backup_and_restore_with_plugin "$plugin_config" "--single-data-file --no-compression" "$restore_filter" - +test_backup_and_restore_with_plugin "$plugin_config" "--single-data-file --copy-queue-size 4 --no-compression" "--copy-queue-size 4" if [[ "$plugin" == *gpbackup_ddboost_plugin ]]; then echo echo "DISABLED restore_subset" diff --git a/restore/data.go b/restore/data.go index 714d8f2f9..9fc76368e 100644 --- a/restore/data.go +++ b/restore/data.go @@ -9,6 +9,7 @@ import ( "sync" "sync/atomic" + "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" "github.com/greenplum-db/gp-common-go-libs/gplog" "github.com/greenplum-db/gpbackup/filepath" @@ -94,13 +95,12 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Ma if backupConfig.SingleDataFile { gplog.Verbose("Initializing pipes and gpbackup_helper on segments for single data file restore") utils.VerifyHelperVersionOnSegments(version, globalCluster) - filteredOids := make([]string, totalTables) + oidList := make([]string, totalTables) for i, entry := range dataEntries { - filteredOids[i] = fmt.Sprintf("%d", entry.Oid) + oidList[i] = fmt.Sprintf("%d", entry.Oid) } - utils.WriteOidListToSegments(filteredOids, globalCluster, fpInfo) - firstOid := fmt.Sprintf("%d", dataEntries[0].Oid) - utils.CreateFirstSegmentPipeOnAllHosts(firstOid, globalCluster, fpInfo) + utils.WriteOidListToSegments(oidList, globalCluster, fpInfo) + initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool,fpInfo) if wasTerminated { return 0 } @@ -108,7 +108,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Ma if len(opts.IncludedRelations) > 0 || len(opts.ExcludedRelations) > 0 || len(opts.IncludedSchemas) > 0 || len(opts.ExcludedSchemas) > 0 { isFilter = true } - utils.StartGpbackupHelpers(globalCluster, fpInfo, "--restore-agent", MustGetFlagString(options.PLUGIN_CONFIG), "", MustGetFlagBool(options.ON_ERROR_CONTINUE), isFilter, &wasTerminated) + utils.StartGpbackupHelpers(globalCluster, fpInfo, "--restore-agent", MustGetFlagString(options.PLUGIN_CONFIG), "", MustGetFlagBool(options.ON_ERROR_CONTINUE), isFilter, &wasTerminated, initialPipes) } /* * We break when an interrupt is received and rely on @@ -144,10 +144,9 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Ma if err == nil { err = restoreSingleTableData(&fpInfo, entry, tableName, whichConn) - atomic.AddInt64(&tableNum, 1) if gplog.GetVerbosity() > gplog.LOGINFO { // No progress bar at this log level, so we note table count here - gplog.Verbose("Restored data to table %s from file (table %d of %d)", tableName, tableNum, totalTables) + gplog.Verbose("Restored data to table %s from file (table %d of %d)", tableName, atomic.AddInt64(&tableNum, 1), totalTables) } else { gplog.Verbose("Restored data to table %s from file", tableName) } @@ -193,3 +192,17 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Ma return numErrors } + +func CreateInitialSegmentPipes(oidList []string, c *cluster.Cluster, connectionPool *dbconn.DBConn, fpInfo filepath.FilePathInfo) int { + // Create min(connections, tables) segment pipes on each host + var maxPipes int + if connectionPool.NumConns < len(oidList) { + maxPipes = connectionPool.NumConns + } else { + maxPipes = len(oidList) + } + for i := 0; i < maxPipes; i++ { + utils.CreateSegmentPipeOnAllHosts(oidList[i], c, fpInfo) + } + return maxPipes +} diff --git a/restore/global_variables.go b/restore/global_variables.go index a21a904d1..3fa66df9b 100644 --- a/restore/global_variables.go +++ b/restore/global_variables.go @@ -94,6 +94,10 @@ func SetTOC(toc *toc.TOC) { // Util functions to enable ease of access to global flag values +func FlagChanged(flagName string) bool { + return cmdFlags.Changed(flagName) +} + func MustGetFlagString(flagName string) string { return options.MustGetFlagString(cmdFlags, flagName) } diff --git a/restore/validate.go b/restore/validate.go index 1a75be613..051e562cf 100644 --- a/restore/validate.go +++ b/restore/validate.go @@ -237,6 +237,9 @@ func ValidateBackupFlagCombinations() { if backupConfig.DataOnly && MustGetFlagBool(options.METADATA_ONLY) { gplog.Fatal(errors.Errorf("Cannot use metadata-only flag when restoring data-only backup"), "") } + if !backupConfig.SingleDataFile && FlagChanged(options.COPY_QUEUE_SIZE) { + gplog.Fatal(errors.Errorf("The --copy-queue-size flag can only be used if the backup was taken with --single-data-file"), "") + } validateBackupFlagPluginCombinations() } diff --git a/restore/validate_test.go b/restore/validate_test.go index 8a48cd8f7..990db1e02 100644 --- a/restore/validate_test.go +++ b/restore/validate_test.go @@ -4,7 +4,6 @@ import ( "strings" "github.com/DATA-DOG/go-sqlmock" - "github.com/spf13/cobra" "github.com/greenplum-db/gp-common-go-libs/testhelper" "github.com/greenplum-db/gpbackup/history" "github.com/greenplum-db/gpbackup/options" @@ -12,10 +11,11 @@ import ( "github.com/greenplum-db/gpbackup/testutils" "github.com/greenplum-db/gpbackup/toc" "github.com/greenplum-db/gpbackup/utils" + "github.com/spf13/cobra" . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" . "github.com/onsi/ginkgo/extensions/table" + . "github.com/onsi/gomega" ) var _ = Describe("restore/validate tests", func() { @@ -299,19 +299,20 @@ var _ = Describe("restore/validate tests", func() { DescribeTable("Validate various flag combinations that are required or exclusive", func(argString string, valid bool) { testCmd := &cobra.Command{ - Use: "flag validation", - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { - restore.ValidateFlagCombinations(cmd.Flags()) - }} + Use: "flag validation", + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + restore.ValidateFlagCombinations(cmd.Flags()) + }} testCmd.SetArgs(strings.Split(argString, " ")) restore.SetCmdFlags(testCmd.Flags()) - if (!valid) { + if !valid { defer testhelper.ShouldPanicWithMessage("CRITICAL") } - err := testCmd.Execute(); if err != nil && valid{ + err := testCmd.Execute() + if err != nil && valid { Fail("Valid flag combination failed validation check") } }, @@ -331,12 +332,12 @@ var _ = Describe("restore/validate tests", func() { Entry("--exclude-schema combos", "--exclude-schema schema1 --exclude-schema-file /tmp/file2", true), // TODO: Verify this. // --exclude-schema-file combinations with other filters - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-table schema.table2", true), // TODO: Verify this. - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-table-file /tmp/file2", true), // TODO: Verify this. - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-schema schema2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-table schema.table2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-table-file /tmp/file2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-schema schema2", true), // TODO: Verify this. Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-schema-file /tmp/file2", true), // TODO: Verify this. - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --exclude-table schema.table2", true), // TODO: Verify this. - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --exclude-table-file /tmp/file2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --exclude-table schema.table2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --exclude-table-file /tmp/file2", true), // TODO: Verify this. // --exclude-table combinations with other filters Entry("--exclude-table combos", "--exclude-table schema.table --include-table schema.table2", false), @@ -359,7 +360,7 @@ var _ = Describe("restore/validate tests", func() { Entry("--include-schema combos", "--include-schema schema1 --include-schema-file /tmp/file2", true), // TODO: Verify this. // --include-schema-file combinations with other filters - Entry("--include-schema-file combos", "--include-schema-file /tmp/file --include-table schema.table2", true), // TODO: Verify this. + Entry("--include-schema-file combos", "--include-schema-file /tmp/file --include-table schema.table2", true), // TODO: Verify this. Entry("--include-schema-file combos", "--include-schema-file /tmp/file --include-table-file /tmp/file2", true), // TODO: Verify this. // --include-table combinations with other filters @@ -397,4 +398,23 @@ var _ = Describe("restore/validate tests", func() { Entry("--redirect-schema combos", "--redirect-schema schema1 --include-table schema.table2 --data-only", true), ) }) + Describe("ValidateBackupFlagCombinations", func() { + It("restore with copy-queue-size should fatal if backup was not taken with single-data-file", func() { + restore.SetBackupConfig(&history.BackupConfig{SingleDataFile: false}) + testCmd := &cobra.Command{ + Use: "flag validation", + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + restore.ValidateBackupFlagCombinations() + }} + testCmd.SetArgs([]string{"--copy-queue-size", "4"}) + restore.SetCmdFlags(testCmd.Flags()) + + defer testhelper.ShouldPanicWithMessage("CRITICAL") + err := testCmd.Execute() + if err == nil { + Fail("invalid flag combination passed validation check") + } + }) + }) }) diff --git a/restore/wrappers.go b/restore/wrappers.go index 17a068057..9f5926b16 100644 --- a/restore/wrappers.go +++ b/restore/wrappers.go @@ -63,7 +63,11 @@ func SetLoggerVerbosity() { func CreateConnectionPool(unquotedDBName string) { connectionPool = dbconn.NewDBConnFromEnvironment(unquotedDBName) - connectionPool.MustConnect(MustGetFlagInt(options.JOBS)) + if FlagChanged(options.COPY_QUEUE_SIZE) { + connectionPool.MustConnect(MustGetFlagInt(options.COPY_QUEUE_SIZE)) + } else { + connectionPool.MustConnect(MustGetFlagInt(options.JOBS)) + } utils.ValidateGPDBVersionCompatibility(connectionPool) } diff --git a/utils/agent_remote.go b/utils/agent_remote.go index 984919481..111c71773 100644 --- a/utils/agent_remote.go +++ b/utils/agent_remote.go @@ -28,7 +28,7 @@ var helperMutex sync.Mutex * possibility that the COPY FROM commands start before gpbackup_helper is done * starting up and setting up the first pipe. */ -func CreateFirstSegmentPipeOnAllHosts(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo) { +func CreateSegmentPipeOnAllHosts(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo) { remoteOutput := c.GenerateAndExecuteCommand("Creating segment data pipes", cluster.ON_SEGMENTS, func(contentID int) string { pipeName := fpInfo.GetSegmentPipeFilePath(contentID) pipeName = fmt.Sprintf("%s_%s", pipeName, oid) @@ -114,7 +114,7 @@ func VerifyHelperVersionOnSegments(version string, c *cluster.Cluster) { } } -func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, operation string, pluginConfigFile string, compressStr string, onErrorContinue bool, isFilter bool, wasTerminated *bool) { +func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, operation string, pluginConfigFile string, compressStr string, onErrorContinue bool, isFilter bool, wasTerminated *bool, copyQueue int) { // A mutex lock for cleaning up and starting gpbackup helpers prevents a // race condition that causes gpbackup_helpers to be orphaned if // gpbackup_helper cleanup happens before they are started. @@ -145,7 +145,7 @@ func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, oper scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, "script") pipeFile := fpInfo.GetSegmentPipeFilePath(contentID) backupFile := fpInfo.GetTableBackupFilePath(contentID, 0, GetPipeThroughProgram().Extension, true) - helperCmdStr := fmt.Sprintf("gpbackup_helper %s --toc-file %s --oid-file %s --pipe-file %s --data-file %s --content %d%s%s%s%s", operation, tocFile, oidFile, pipeFile, backupFile, contentID, pluginStr, compressStr, onErrorContinueStr, filterStr) + helperCmdStr := fmt.Sprintf("gpbackup_helper %s --toc-file %s --oid-file %s --pipe-file %s --data-file %s --content %d%s%s%s%s --copy-queue-size %d", operation, tocFile, oidFile, pipeFile, backupFile, contentID, pluginStr, compressStr, onErrorContinueStr, filterStr, copyQueue) // we run these commands in sequence to ensure that any failure is critical; the last command ensures the agent process was successfully started return fmt.Sprintf(`cat << HEREDOC > %[1]s && chmod +x %[1]s && ( nohup %[1]s &> /dev/null &) #!/bin/bash diff --git a/utils/agent_remote_test.go b/utils/agent_remote_test.go index a75f1ab39..a172709f3 100644 --- a/utils/agent_remote_test.go +++ b/utils/agent_remote_test.go @@ -126,11 +126,18 @@ var _ = Describe("agent remote", func() { Describe("StartGpbackupHelpers()", func() { It("Correctly propagates --on-error-continue flag to gpbackup_helper", func() { wasTerminated := false - utils.StartGpbackupHelpers(testCluster, fpInfo, "operation", "/tmp/pluginConfigFile.yml", " compressStr", true, false, &wasTerminated) + utils.StartGpbackupHelpers(testCluster, fpInfo, "operation", "/tmp/pluginConfigFile.yml", " compressStr", true, false, &wasTerminated, 1) cc := testExecutor.ClusterCommands[0] Expect(cc[1].CommandString).To(ContainSubstring(" --on-error-continue")) }) + It("Correctly propagates --copy-queue-size value to gpbackup_helper", func() { + wasTerminated := false + utils.StartGpbackupHelpers(testCluster, fpInfo, "operation", "/tmp/pluginConfigFile.yml", " compressStr", false, false, &wasTerminated, 4) + + cc := testExecutor.ClusterCommands[0] + Expect(cc[1].CommandString).To(ContainSubstring(" --copy-queue-size 4")) + }) }) Describe("CheckAgentErrorsOnSegments", func() { It("constructs the correct ssh call to check for the existance of an error file on each segment", func() {