From b141b91938e51497e60ec836a5bd0961bb69833a Mon Sep 17 00:00:00 2001 From: Kevin Yeap Date: Wed, 10 Nov 2021 13:11:31 -0800 Subject: [PATCH 1/4] Fix log bug that can record wrong amount of tables restored Using the table restored count returned from atomic.AddInt64 to record the correct amount of tables restored. When multiple tables are being restored at a time, it was possible for the restored table count to be incremented multiple times before a log statement was recorded. Co-authored-by: Kevin Yeap Co-authored-by: Brent Doil --- backup/data.go | 9 +++++---- restore/data.go | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/backup/data.go b/backup/data.go index f35f3759c..c56ea836b 100644 --- a/backup/data.go +++ b/backup/data.go @@ -85,13 +85,14 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite } func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { - atomic.AddInt64(&counters.NumRegTables, 1) - numTables := counters.NumRegTables //We save this so it won't be modified before we log it + logMessage := fmt.Sprintf("Worker %d: Writing data for table %s to file", whichConn, table.FQN()) + // Avoid race condition by incrementing counters in call to sprintf + tableCount := fmt.Sprintf(" (table %d of %d)", atomic.AddInt64(&counters.NumRegTables, 1), counters.TotalRegTables) if gplog.GetVerbosity() > gplog.LOGINFO { // No progress bar at this log level, so we note table count here - gplog.Verbose("Writing data for table %s to file (table %d of %d)", table.FQN(), numTables, counters.TotalRegTables) + gplog.Verbose(logMessage + tableCount) } else { - gplog.Verbose("Writing data for table %s to file", table.FQN()) + gplog.Verbose(logMessage) } destinationToWrite := "" diff --git a/restore/data.go b/restore/data.go index 714d8f2f9..6360eb0e8 100644 --- a/restore/data.go +++ b/restore/data.go @@ -144,10 +144,9 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Ma if err == nil { err = restoreSingleTableData(&fpInfo, entry, tableName, whichConn) - atomic.AddInt64(&tableNum, 1) if gplog.GetVerbosity() > gplog.LOGINFO { // No progress bar at this log level, so we note table count here - gplog.Verbose("Restored data to table %s from file (table %d of %d)", tableName, tableNum, totalTables) + gplog.Verbose("Restored data to table %s from file (table %d of %d)", tableName, atomic.AddInt64(&tableNum, 1), totalTables) } else { gplog.Verbose("Restored data to table %s from file", tableName) } From 5925c1edf3be958e10044c98eff8f037ddc55227 Mon Sep 17 00:00:00 2001 From: Brent Doil Date: Thu, 18 Nov 2021 14:41:08 -0600 Subject: [PATCH 2/4] Queue COPY commands for gpbackup_helper to consume To reduce the amount of time gpbackup_helper is spent blocked waiting for COPY statements to initialize, gpbackup and gprestore can now queue COPY commands to speed up --single-data-file backups and restores if the database has cpu cycles to spare. If a worker can't get a lock on a table, the worker is terminated and the table is then processed by Worker 0. Worker 0 has AccessShare locks for all tables, and acts as a special goroutine to handle deferred tables. Added flag --copy-queue-size. In order fori this to be used during restore the backup must have been taken with --single-data-file. This can improve backup and restore times significantly for databases that contain a lot of tables with very little data where the majority of time is spent waiting for gpdb to initiate COPY. Co-authored-by: Kevin Yeap Co-authored-by: Brent Doil --- backup/backup.go | 27 ++++-- backup/data.go | 158 +++++++++++++++++++++++++++++++++++- backup/global_variables.go | 14 ++++ backup/validate.go | 7 ++ backup/wrappers.go | 13 ++- helper/backup_helper.go | 19 ++--- helper/helper.go | 54 ++++++++---- helper/restore_helper.go | 23 +++--- options/flag.go | 3 + restore/data.go | 26 ++++-- restore/global_variables.go | 4 + restore/validate.go | 3 + restore/wrappers.go | 6 +- utils/agent_remote.go | 6 +- 14 files changed, 305 insertions(+), 58 deletions(-) diff --git a/backup/backup.go b/backup/backup.go index dc9d28a2d..19c3f98ca 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -259,7 +259,6 @@ func backupData(tables []Table) { gplog.Info("Data backup complete") return } - if MustGetFlagBool(options.SINGLE_DATA_FILE) { gplog.Verbose("Initializing pipes and gpbackup_helper on segments for single data file backup") utils.VerifyHelperVersionOnSegments(version, globalCluster) @@ -270,22 +269,26 @@ func backupData(tables []Table) { } } utils.WriteOidListToSegments(oidList, globalCluster, globalFPInfo) - utils.CreateFirstSegmentPipeOnAllHosts(oidList[0], globalCluster, globalFPInfo) compressStr := fmt.Sprintf(" --compression-level %d --compression-type %s", MustGetFlagInt(options.COMPRESSION_LEVEL), MustGetFlagString(options.COMPRESSION_TYPE)) if MustGetFlagBool(options.NO_COMPRESSION) { compressStr = " --compression-level 0" } + initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool, globalFPInfo) // Do not pass through the --on-error-continue flag because it does not apply to gpbackup utils.StartGpbackupHelpers(globalCluster, globalFPInfo, "--backup-agent", - MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated) + MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated, initialPipes) } gplog.Info("Writing data to file") - rowsCopiedMaps := backupDataForAllTables(tables) + var rowsCopiedMaps []map[uint32]int64 + if FlagChanged(options.COPY_QUEUE_SIZE) { + rowsCopiedMaps = backupDataForAllTablesCopyQueue(tables) + } else { + rowsCopiedMaps = backupDataForAllTables(tables) + } AddTableDataEntriesToTOC(tables, rowsCopiedMaps) if MustGetFlagBool(options.SINGLE_DATA_FILE) && MustGetFlagString(options.PLUGIN_CONFIG) != "" { pluginConfig.BackupSegmentTOCs(globalCluster, globalFPInfo) } - logCompletionMessage("Data backup") } @@ -501,3 +504,17 @@ func logCompletionMessage(msg string) { gplog.Info("%s complete", msg) } } + +func CreateInitialSegmentPipes(oidList []string, c *cluster.Cluster, connectionPool *dbconn.DBConn, fpInfo filepath.FilePathInfo) int { + // Create min(connections, tables) segment pipes on each host + var maxPipes int + if connectionPool.NumConns < len(oidList) { + maxPipes = connectionPool.NumConns + } else { + maxPipes = len(oidList) + } + for i := 0; i < maxPipes; i++ { + utils.CreateSegmentPipeOnAllHosts(oidList[i], c, fpInfo) + } + return maxPipes +} diff --git a/backup/data.go b/backup/data.go index c56ea836b..48bef59c2 100644 --- a/backup/data.go +++ b/backup/data.go @@ -5,10 +5,12 @@ package backup */ import ( + "errors" "fmt" "strings" "sync" "sync/atomic" + "time" "github.com/greenplum-db/gp-common-go-libs/dbconn" "github.com/greenplum-db/gp-common-go-libs/gplog" @@ -110,6 +112,159 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters return nil } +// backupDataForAllTablesCopyQueue does not backup tables in parallel. This is +// specifically for single-data-file. While tables backed up one at a time, it +// is possible to save a significant amount of time by queuing up the next +// table to copy. Worker 0 does not have tables pre-assigned to it. The copy +// queue uses worker 0 a special deferred worker in the event that the other +// workers encounter locking issues. Worker 0 already has all locks on the +// tables so it will not run into locking issues. +func backupDataForAllTablesCopyQueue(tables []Table) []map[uint32]int64 { + var numExtOrForeignTables int64 + for _, table := range tables { + if table.SkipDataBackup() { + numExtOrForeignTables++ + } + } + counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables)) - numExtOrForeignTables} + counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO) + counters.ProgressBar.Start() + rowsCopiedMaps := make([]map[uint32]int64, connectionPool.NumConns) + /* + * We break when an interrupt is received and rely on + * TerminateHangingCopySessions to kill any COPY statements + * in progress if they don't finish on their own. + */ + tasks := make(chan Table, len(tables)) + var oidMap sync.Map + var workerPool sync.WaitGroup + var copyErr error + // Record and track tables in a hashmap of oids and table states (preloaded with value Unknown). + // The tables are loaded into the tasks channel for the subsequent goroutines to work on. + for _, table := range tables { + oidMap.Store(table.Oid, Unknown) + tasks <- table + } + // We incremented numConns by 1 to treat connNum 0 as a special worker + rowsCopiedMaps[0] = make(map[uint32]int64) + for connNum := 1; connNum < connectionPool.NumConns; connNum++ { + rowsCopiedMaps[connNum] = make(map[uint32]int64) + workerPool.Add(1) + go func(whichConn int) { + defer workerPool.Done() + for table := range tasks { + if wasTerminated || copyErr != nil { + counters.ProgressBar.(*pb.ProgressBar).NotPrint = true + return + } + + if table.SkipDataBackup() { + gplog.Verbose("Skipping data backup of table %s because it is either an external or foreign table.", table.FQN()) + oidMap.Store(table.Oid, Complete) + continue + } + // If a random external SQL command had queued an AccessExclusiveLock acquisition request + // against this next table, the --job worker thread would deadlock on the COPY attempt. + // To prevent gpbackup from hanging, we attempt to acquire an AccessShareLock on the + // relation with the NOWAIT option before we run COPY. If the LOCK TABLE NOWAIT call + // fails, we catch the error and defer the table to the main worker thread, worker 0. + // Afterwards, we break early and terminate the worker since its transaction is now in an + // aborted state. We do not need to do this with the main worker thread because it has + // already acquired AccessShareLocks on all tables before the metadata dumping part. + err := LockTableNoWait(table, whichConn) + if err != nil { + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != PG_LOCK_NOT_AVAILABLE { + copyErr = err + continue + } + + if gplog.GetVerbosity() < gplog.LOGVERBOSE { + // Add a newline to interrupt the progress bar so that + // the following WARN message is nicely outputted. + fmt.Printf("\n") + } + gplog.Warn("Worker %d could not acquire AccessShareLock for table %s. Terminating worker and deferring table to main worker thread.", + whichConn, table.FQN()) + + oidMap.Store(table.Oid, Deferred) + // Rollback transaction since it's in an aborted state + connectionPool.MustRollback(whichConn) + + // Worker no longer has a valid distributed transaction snapshot + break + } + + err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn) + if err != nil { + copyErr = err + } + oidMap.Store(table.Oid, Complete) + } + }(connNum) + } + + // Special goroutine to handle deferred tables + // Handle all tables deferred by the deadlock detection. This can only be + // done with the main worker thread, worker 0, because it has + // AccessShareLocks on all the tables already. + deferredWorkerDone := make(chan bool) + go func() { + for _, table := range tables { + for { + state, _ := oidMap.Load(table.Oid) + if state.(int) == Unknown { + time.Sleep(time.Millisecond * 50) + } else if state.(int) == Deferred { + err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0) + if err != nil { + copyErr = err + } + oidMap.Store(table.Oid, Complete) + break + } else if state.(int) == Complete { + break + } else { + gplog.Fatal(errors.New("Encountered unknown table state"), "") + } + } + } + deferredWorkerDone <- true + }() + + close(tasks) + workerPool.Wait() + + // Check if all the workers were terminated. If they did, defer all remaining tables to worker 0 + allWorkersTerminatedLogged := false + for _, table := range tables { + state, _ := oidMap.Load(table.Oid) + if state == Unknown { + if !allWorkersTerminatedLogged { + gplog.Warn("All copy queue workers terminated due to lock issues. Falling back to single main worker.") + allWorkersTerminatedLogged = true + } + oidMap.Store(table.Oid, Deferred) + } + } + // Main goroutine waits for deferred worker 0 by waiting on this channel + <-deferredWorkerDone + + agentErr := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo) + + if copyErr != nil && agentErr != nil { + gplog.Error(agentErr.Error()) + gplog.Fatal(copyErr, "") + } else if copyErr != nil { + gplog.Fatal(copyErr, "") + } else if agentErr != nil { + gplog.Fatal(agentErr, "") + } + + counters.ProgressBar.Finish() + printDataBackupWarnings(numExtOrForeignTables) + return rowsCopiedMaps +} + func backupDataForAllTables(tables []Table) []map[uint32]int64 { var numExtOrForeignTables int64 for _, table := range tables { @@ -157,8 +312,7 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 { if whichConn != 0 { err := LockTableNoWait(table, whichConn) if err != nil { - // Postgres Error Code 55P03 translates to LOCK_NOT_AVAILABLE - if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != "55P03" { + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != PG_LOCK_NOT_AVAILABLE { copyErr = err continue } diff --git a/backup/global_variables.go b/backup/global_variables.go index 11b45adbb..235d0c4f4 100644 --- a/backup/global_variables.go +++ b/backup/global_variables.go @@ -19,6 +19,16 @@ import ( * used in testing. */ +/* + Table backup state constants +*/ +const ( + Unknown int = iota + Deferred + Complete + PG_LOCK_NOT_AVAILABLE = "55P03" +) + /* * Non-flag variables */ @@ -99,6 +109,10 @@ func SetQuotedRoleNames(quotedRoles map[string]string) { // Util functions to enable ease of access to global flag values +func FlagChanged(flagName string) bool { + return cmdFlags.Changed(flagName) +} + func MustGetFlagString(flagName string) string { return options.MustGetFlagString(cmdFlags, flagName) } diff --git a/backup/validate.go b/backup/validate.go index a93efaf86..0057f39ca 100644 --- a/backup/validate.go +++ b/backup/validate.go @@ -102,6 +102,9 @@ func validateFlagCombinations(flags *pflag.FlagSet) { options.CheckExclusiveFlags(flags, options.NO_COMPRESSION, options.COMPRESSION_TYPE) options.CheckExclusiveFlags(flags, options.NO_COMPRESSION, options.COMPRESSION_LEVEL) options.CheckExclusiveFlags(flags, options.PLUGIN_CONFIG, options.BACKUP_DIR) + if FlagChanged(options.COPY_QUEUE_SIZE) && !MustGetFlagBool(options.SINGLE_DATA_FILE) { + gplog.Fatal(errors.Errorf("--copy-queue-size must be specified with --single-data-file"), "") + } if MustGetFlagString(options.FROM_TIMESTAMP) != "" && !MustGetFlagBool(options.INCREMENTAL) { gplog.Fatal(errors.Errorf("--from-timestamp must be specified with --incremental"), "") } @@ -121,6 +124,10 @@ func validateFlagValues() { gplog.Fatal(errors.Errorf("Timestamp %s is invalid. Timestamps must be in the format YYYYMMDDHHMMSS.", MustGetFlagString(options.FROM_TIMESTAMP)), "") } + if FlagChanged(options.COPY_QUEUE_SIZE) && MustGetFlagInt(options.COPY_QUEUE_SIZE) < 2 { + gplog.Fatal(errors.Errorf("--copy-queue-size %d is invalid. Must be at least 2", + MustGetFlagInt(options.COPY_QUEUE_SIZE)), "") + } } func validateFromTimestamp(fromTimestamp string) { diff --git a/backup/wrappers.go b/backup/wrappers.go index cf294d44f..54a02e8dd 100644 --- a/backup/wrappers.go +++ b/backup/wrappers.go @@ -39,7 +39,18 @@ func SetLoggerVerbosity() { func initializeConnectionPool(timestamp string) { connectionPool = dbconn.NewDBConnFromEnvironment(MustGetFlagString(options.DBNAME)) - connectionPool.MustConnect(MustGetFlagInt(options.JOBS)) + var numConns int + switch true { + case FlagChanged(options.COPY_QUEUE_SIZE): + // Connection 0 is reserved for deferred worker, initialize 1 additional connection. + numConns = MustGetFlagInt(options.COPY_QUEUE_SIZE) + 1 + case FlagChanged(options.JOBS): + numConns = MustGetFlagInt(options.JOBS) + default: + numConns = 1 + } + gplog.Verbose(fmt.Sprintf("Initializing %d worker connections", numConns)) + connectionPool.MustConnect(numConns) utils.ValidateGPDBVersionCompatibility(connectionPool) InitializeMetadataParams(connectionPool) for connNum := 0; connNum < connectionPool.NumConns; connNum++ { diff --git a/helper/backup_helper.go b/helper/backup_helper.go index 2c2982b13..a319b5661 100644 --- a/helper/backup_helper.go +++ b/helper/backup_helper.go @@ -31,20 +31,22 @@ func doBackupAgent() error { return err } - currentPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[0]) + preloadCreatedPipes(oidList, *copyQueue) + var currentPipe string /* * It is important that we create the reader before creating the writer * so that we establish a connection to the first pipe (created by gpbackup) * and properly clean it up if an error occurs while creating the writer. */ for i, oid := range oidList { + currentPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) if wasTerminated { return errors.New("Terminated due to user request") } - if i < len(oidList)-1 { - log(fmt.Sprintf("Creating pipe for oid %d\n", oidList[i+1])) - nextPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i+1]) - err := createPipe(nextPipe) + if i < len(oidList)-*copyQueue { + log(fmt.Sprintf("Creating pipe for oid %d\n", oidList[i+*copyQueue])) + nextPipeToCreate := fmt.Sprintf("%s_%d", *pipeFile, oidList[i+*copyQueue]) + err := createPipe(nextPipeToCreate) if err != nil { return err } @@ -73,13 +75,8 @@ func doBackupAgent() error { tocfile.AddSegmentDataEntry(uint(oid), lastRead, lastProcessed) lastRead = lastProcessed - lastPipe = currentPipe - currentPipe = nextPipe _ = readHandle.Close() - err = utils.RemoveFileIfExists(lastPipe) - if err != nil { - return err - } + deletePipe(currentPipe) } _ = pipeWriter.Close() diff --git a/helper/helper.go b/helper/helper.go index 899c69fd4..17491ca27 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -7,13 +7,13 @@ import ( "fmt" "os" "os/signal" + "path/filepath" "runtime/debug" "sort" "strconv" "strings" "sync" "syscall" - "path/filepath" "golang.org/x/sys/unix" @@ -28,14 +28,12 @@ import ( var ( CleanupGroup *sync.WaitGroup - currentPipe string errBuf bytes.Buffer - lastPipe string - nextPipe string version string wasTerminated bool writeHandle *os.File writer *bufio.Writer + pipesMap map[string]bool ) /* @@ -55,6 +53,7 @@ var ( restoreAgent *bool tocFile *string isFiltered *bool + copyQueue *int ) func DoHelper() { @@ -112,18 +111,20 @@ func InitializeGlobals() { restoreAgent = flag.Bool("restore-agent", false, "Use gpbackup_helper as an agent for restore") tocFile = flag.String("toc-file", "", "Absolute path to the table of contents file") isFiltered = flag.Bool("with-filters", false, "Used with table/schema filters") + copyQueue = flag.Int("copy-queue-size", 1, "Used to know how many COPIES are being queued up") if *onErrorContinue && !*restoreAgent { fmt.Printf("--on-error-continue flag can only be used with --restore-agent flag") os.Exit(1) } - flag.Parse() if *printVersion { fmt.Printf("gpbackup_helper version %s\n", version) os.Exit(0) } operating.InitializeSystemFunctions() + + pipesMap = make(map[string]bool, 0) } /* @@ -132,7 +133,30 @@ func InitializeGlobals() { func createPipe(pipe string) error { err := unix.Mkfifo(pipe, 0777) - return err + if err != nil { + return err + } + + pipesMap[pipe] = true + return nil +} + +func deletePipe(pipe string) error { + err := utils.RemoveFileIfExists(pipe) + if err != nil { + return err + } + + delete(pipesMap, pipe) + return nil +} + +// Gpbackup creates the first n pipes. Record these pipes. +func preloadCreatedPipes(oidList []int, queuedPipeCount int) { + for i := 0; i < queuedPipeCount; i++ { + pipeName := fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) + pipesMap[pipeName] = true + } } func getOidListFromFile() ([]int, error) { @@ -187,17 +211,13 @@ func DoCleanup() { if err != nil { log("Encountered error during cleanup: %v", err) } - err = utils.RemoveFileIfExists(lastPipe) - if err != nil { - log("Encountered error during cleanup: %v", err) - } - err = utils.RemoveFileIfExists(currentPipe) - if err != nil { - log("Encountered error during cleanup: %v", err) - } - err = utils.RemoveFileIfExists(nextPipe) - if err != nil { - log("Encountered error during cleanup: %v", err) + + for pipeName, _ := range pipesMap { + log("Removing pipe %s", pipeName) + err = deletePipe(pipeName) + if err != nil { + log("Encountered error removing pipe %s: %v", pipeName, err) + } } skipFiles, _ := filepath.Glob(fmt.Sprintf("%s_skip_*", *pipeFile)) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 4ca9ddfff..886d36942 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -48,7 +48,6 @@ func (r *RestoreReader) positionReader(pos uint64) error { seekPosition, err := r.seekReader.Seek(int64(pos), io.SeekCurrent) if err != nil { // Always hard quit if data reader has issues - _ = utils.RemoveFileIfExists(currentPipe) return err } log(fmt.Sprintf("Data Reader seeked forward to %d byte offset", seekPosition)) @@ -56,7 +55,6 @@ func (r *RestoreReader) positionReader(pos uint64) error { numDiscarded, err := r.bufReader.Discard(int(pos)) if err != nil { // Always hard quit if data reader has issues - _ = utils.RemoveFileIfExists(currentPipe) return err } log(fmt.Sprintf("Data Reader discarded %d bytes", numDiscarded)) @@ -86,7 +84,6 @@ func doRestoreAgent() error { var bytesRead int64 var start uint64 var end uint64 - var errRemove error var lastError error oidList, err := getOidListFromFile() @@ -99,16 +96,20 @@ func doRestoreAgent() error { return err } log(fmt.Sprintf("Using reader type: %s", reader.readerType)) + + preloadCreatedPipes(oidList, *copyQueue) + + var currentPipe string for i, oid := range oidList { if wasTerminated { return errors.New("Terminated due to user request") } currentPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) - if i < len(oidList)-1 { - nextPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i+1]) - log(fmt.Sprintf("Creating pipe for oid %d: %s", oidList[i+1], nextPipe)) - err := createPipe(nextPipe) + if i < len(oidList)-*copyQueue { + log(fmt.Sprintf("Creating pipe for oid %d\n", oidList[i+*copyQueue])) + nextPipeToCreate := fmt.Sprintf("%s_%d", *pipeFile, oidList[i+*copyQueue]) + err := createPipe(nextPipeToCreate) if err != nil { // In the case this error is hit it means we have lost the // ability to create pipes normally, so hard quit even if @@ -145,7 +146,6 @@ func doRestoreAgent() error { // In the case this error is hit it means we have lost the // ability to open pipes normally, so hard quit even if // --on-error-continue is given - _ = utils.RemoveFileIfExists(currentPipe) return err } } else { @@ -185,10 +185,9 @@ func doRestoreAgent() error { LoopEnd: log(fmt.Sprintf("Removing pipe for oid %d: %s", oid, currentPipe)) - errRemove = utils.RemoveFileIfExists(currentPipe) - if errRemove != nil { - _ = utils.RemoveFileIfExists(nextPipe) - return errRemove + errPipe := deletePipe(currentPipe) + if errPipe != nil { + return errPipe } if err != nil { diff --git a/options/flag.go b/options/flag.go index 41117847d..c9696d671 100644 --- a/options/flag.go +++ b/options/flag.go @@ -37,6 +37,7 @@ const ( PLUGIN_CONFIG = "plugin-config" QUIET = "quiet" SINGLE_DATA_FILE = "single-data-file" + COPY_QUEUE_SIZE = "copy-queue-size" VERBOSE = "verbose" WITH_STATS = "with-stats" CREATE_DB = "create-db" @@ -76,6 +77,7 @@ func SetBackupFlagDefaults(flagSet *pflag.FlagSet) { flagSet.Bool("version", false, "Print version number and exit") flagSet.Bool(QUIET, false, "Suppress non-warning, non-error log messages") flagSet.Bool(SINGLE_DATA_FILE, false, "Back up all data to a single file instead of one per table") + flagSet.Int(COPY_QUEUE_SIZE, 1, "number of COPY commands gpbackup should enqueue when backing up using the --single-data-file option") flagSet.Bool(VERBOSE, false, "Print verbose log messages") flagSet.Bool(WITH_STATS, false, "Back up query plan statistics") flagSet.Bool(WITHOUT_GLOBALS, false, "Skip backup of global metadata") @@ -104,6 +106,7 @@ func SetRestoreFlagDefaults(flagSet *pflag.FlagSet) { flagSet.Bool(QUIET, false, "Suppress non-warning, non-error log messages") flagSet.String(REDIRECT_DB, "", "Restore to the specified database instead of the database that was backed up") flagSet.String(REDIRECT_SCHEMA, "", "Restore to the specified schema instead of the schema that was backed up") + flagSet.Int(COPY_QUEUE_SIZE, 1, "Number of COPY commands gprestore should enqueue when restoring a backup taken using the --single-data-file option") flagSet.Bool(WITH_GLOBALS, false, "Restore global metadata") flagSet.String(TIMESTAMP, "", "The timestamp to be restored, in the format YYYYMMDDHHMMSS") flagSet.Bool(TRUNCATE_TABLE, false, "Removes data of the tables getting restored") diff --git a/restore/data.go b/restore/data.go index 6360eb0e8..9fc76368e 100644 --- a/restore/data.go +++ b/restore/data.go @@ -9,6 +9,7 @@ import ( "sync" "sync/atomic" + "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" "github.com/greenplum-db/gp-common-go-libs/gplog" "github.com/greenplum-db/gpbackup/filepath" @@ -94,13 +95,12 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Ma if backupConfig.SingleDataFile { gplog.Verbose("Initializing pipes and gpbackup_helper on segments for single data file restore") utils.VerifyHelperVersionOnSegments(version, globalCluster) - filteredOids := make([]string, totalTables) + oidList := make([]string, totalTables) for i, entry := range dataEntries { - filteredOids[i] = fmt.Sprintf("%d", entry.Oid) + oidList[i] = fmt.Sprintf("%d", entry.Oid) } - utils.WriteOidListToSegments(filteredOids, globalCluster, fpInfo) - firstOid := fmt.Sprintf("%d", dataEntries[0].Oid) - utils.CreateFirstSegmentPipeOnAllHosts(firstOid, globalCluster, fpInfo) + utils.WriteOidListToSegments(oidList, globalCluster, fpInfo) + initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool,fpInfo) if wasTerminated { return 0 } @@ -108,7 +108,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Ma if len(opts.IncludedRelations) > 0 || len(opts.ExcludedRelations) > 0 || len(opts.IncludedSchemas) > 0 || len(opts.ExcludedSchemas) > 0 { isFilter = true } - utils.StartGpbackupHelpers(globalCluster, fpInfo, "--restore-agent", MustGetFlagString(options.PLUGIN_CONFIG), "", MustGetFlagBool(options.ON_ERROR_CONTINUE), isFilter, &wasTerminated) + utils.StartGpbackupHelpers(globalCluster, fpInfo, "--restore-agent", MustGetFlagString(options.PLUGIN_CONFIG), "", MustGetFlagBool(options.ON_ERROR_CONTINUE), isFilter, &wasTerminated, initialPipes) } /* * We break when an interrupt is received and rely on @@ -192,3 +192,17 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Ma return numErrors } + +func CreateInitialSegmentPipes(oidList []string, c *cluster.Cluster, connectionPool *dbconn.DBConn, fpInfo filepath.FilePathInfo) int { + // Create min(connections, tables) segment pipes on each host + var maxPipes int + if connectionPool.NumConns < len(oidList) { + maxPipes = connectionPool.NumConns + } else { + maxPipes = len(oidList) + } + for i := 0; i < maxPipes; i++ { + utils.CreateSegmentPipeOnAllHosts(oidList[i], c, fpInfo) + } + return maxPipes +} diff --git a/restore/global_variables.go b/restore/global_variables.go index a21a904d1..3fa66df9b 100644 --- a/restore/global_variables.go +++ b/restore/global_variables.go @@ -94,6 +94,10 @@ func SetTOC(toc *toc.TOC) { // Util functions to enable ease of access to global flag values +func FlagChanged(flagName string) bool { + return cmdFlags.Changed(flagName) +} + func MustGetFlagString(flagName string) string { return options.MustGetFlagString(cmdFlags, flagName) } diff --git a/restore/validate.go b/restore/validate.go index 1a75be613..051e562cf 100644 --- a/restore/validate.go +++ b/restore/validate.go @@ -237,6 +237,9 @@ func ValidateBackupFlagCombinations() { if backupConfig.DataOnly && MustGetFlagBool(options.METADATA_ONLY) { gplog.Fatal(errors.Errorf("Cannot use metadata-only flag when restoring data-only backup"), "") } + if !backupConfig.SingleDataFile && FlagChanged(options.COPY_QUEUE_SIZE) { + gplog.Fatal(errors.Errorf("The --copy-queue-size flag can only be used if the backup was taken with --single-data-file"), "") + } validateBackupFlagPluginCombinations() } diff --git a/restore/wrappers.go b/restore/wrappers.go index 17a068057..9f5926b16 100644 --- a/restore/wrappers.go +++ b/restore/wrappers.go @@ -63,7 +63,11 @@ func SetLoggerVerbosity() { func CreateConnectionPool(unquotedDBName string) { connectionPool = dbconn.NewDBConnFromEnvironment(unquotedDBName) - connectionPool.MustConnect(MustGetFlagInt(options.JOBS)) + if FlagChanged(options.COPY_QUEUE_SIZE) { + connectionPool.MustConnect(MustGetFlagInt(options.COPY_QUEUE_SIZE)) + } else { + connectionPool.MustConnect(MustGetFlagInt(options.JOBS)) + } utils.ValidateGPDBVersionCompatibility(connectionPool) } diff --git a/utils/agent_remote.go b/utils/agent_remote.go index 984919481..111c71773 100644 --- a/utils/agent_remote.go +++ b/utils/agent_remote.go @@ -28,7 +28,7 @@ var helperMutex sync.Mutex * possibility that the COPY FROM commands start before gpbackup_helper is done * starting up and setting up the first pipe. */ -func CreateFirstSegmentPipeOnAllHosts(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo) { +func CreateSegmentPipeOnAllHosts(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo) { remoteOutput := c.GenerateAndExecuteCommand("Creating segment data pipes", cluster.ON_SEGMENTS, func(contentID int) string { pipeName := fpInfo.GetSegmentPipeFilePath(contentID) pipeName = fmt.Sprintf("%s_%s", pipeName, oid) @@ -114,7 +114,7 @@ func VerifyHelperVersionOnSegments(version string, c *cluster.Cluster) { } } -func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, operation string, pluginConfigFile string, compressStr string, onErrorContinue bool, isFilter bool, wasTerminated *bool) { +func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, operation string, pluginConfigFile string, compressStr string, onErrorContinue bool, isFilter bool, wasTerminated *bool, copyQueue int) { // A mutex lock for cleaning up and starting gpbackup helpers prevents a // race condition that causes gpbackup_helpers to be orphaned if // gpbackup_helper cleanup happens before they are started. @@ -145,7 +145,7 @@ func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, oper scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, "script") pipeFile := fpInfo.GetSegmentPipeFilePath(contentID) backupFile := fpInfo.GetTableBackupFilePath(contentID, 0, GetPipeThroughProgram().Extension, true) - helperCmdStr := fmt.Sprintf("gpbackup_helper %s --toc-file %s --oid-file %s --pipe-file %s --data-file %s --content %d%s%s%s%s", operation, tocFile, oidFile, pipeFile, backupFile, contentID, pluginStr, compressStr, onErrorContinueStr, filterStr) + helperCmdStr := fmt.Sprintf("gpbackup_helper %s --toc-file %s --oid-file %s --pipe-file %s --data-file %s --content %d%s%s%s%s --copy-queue-size %d", operation, tocFile, oidFile, pipeFile, backupFile, contentID, pluginStr, compressStr, onErrorContinueStr, filterStr, copyQueue) // we run these commands in sequence to ensure that any failure is critical; the last command ensures the agent process was successfully started return fmt.Sprintf(`cat << HEREDOC > %[1]s && chmod +x %[1]s && ( nohup %[1]s &> /dev/null &) #!/bin/bash From 05b75ec170cb24cf94ae07650a6b9446a030b1ad Mon Sep 17 00:00:00 2001 From: Brent Doil Date: Wed, 17 Nov 2021 13:41:17 -0500 Subject: [PATCH 3/4] Add end-to-end, perf and scale tests for copy-queue-size feature. Co-authored-by: Kevin Yeap Co-authored-by: Brent Doil --- ci/scripts/scale-tests.bash | 33 +++++ end_to_end/end_to_end_suite_test.go | 195 +++++++++++++++++++++++++++- end_to_end/incremental_test.go | 45 +++++++ end_to_end/plugin_test.go | 163 +++++++++++++++++++++++ plugins/plugin_test.sh | 2 + plugins/plugin_test_scale.sh | 2 +- restore/validate_test.go | 50 ++++--- utils/agent_remote_test.go | 9 +- 8 files changed, 480 insertions(+), 19 deletions(-) diff --git a/ci/scripts/scale-tests.bash b/ci/scripts/scale-tests.bash index 510b2a9d7..09370a66a 100755 --- a/ci/scripts/scale-tests.bash +++ b/ci/scripts/scale-tests.bash @@ -32,6 +32,39 @@ set -e ### Data scale tests ### log_file=/tmp/gpbackup.log + +echo "## Populating database for copy queue test ##" +createdb copyqueuedb +for j in {1..20000} +do + psql -d copyqueuedb -q -c "CREATE TABLE tbl_1k_\$j(i int) DISTRIBUTED BY (i);" + psql -d copyqueuedb -q -c "INSERT INTO tbl_1k_\$j SELECT generate_series(1,1000)" +done + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 2 backup for copy queue test ##" +time gpbackup --dbname copyqueuedb --backup-dir /data/gpdata/ --single-data-file --no-compression --copy-queue-size 2 | tee "\$log_file" +timestamp=\$(head -10 "\$log_file" | grep "Backup Timestamp " | grep -Eo "[[:digit:]]{14}") +gpbackup_manager display-report \$timestamp + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 4 backup for copy queue test ##" +time gpbackup --dbname copyqueuedb --backup-dir /data/gpdata/ --single-data-file --no-compression --copy-queue-size 4 | tee "\$log_file" +timestamp=\$(head -10 "\$log_file" | grep "Backup Timestamp " | grep -Eo "[[:digit:]]{14}") +gpbackup_manager display-report \$timestamp + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 8 backup for copy queue test ##" +time gpbackup --dbname copyqueuedb --backup-dir /data/gpdata/ --single-data-file --no-compression --copy-queue-size 8 | tee "\$log_file" +timestamp=\$(head -10 "\$log_file" | grep "Backup Timestamp " | grep -Eo "[[:digit:]]{14}") +gpbackup_manager display-report \$timestamp + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 2 restore for copy queue test ##" +time gprestore --timestamp "\$timestamp" --backup-dir /data/gpdata/ --create-db --redirect-db copyqueuerestore2 --copy-queue-size 2 + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 4 restore for copy queue test ##" +time gprestore --timestamp "\$timestamp" --backup-dir /data/gpdata/ --create-db --redirect-db copyqueuerestore4 --copy-queue-size 4 + +echo "## Performing single-data-file, --no-compression, --copy-queue-size 8 restore for copy queue test ##" +time gprestore --timestamp "\$timestamp" --backup-dir /data/gpdata/ --create-db --redirect-db copyqueuerestore8 --copy-queue-size 8 + echo "## Populating database for data scale test ##" createdb datascaledb for j in {1..5000} diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 98a2a0f69..366b67cf7 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -3,7 +3,6 @@ package end_to_end_test import ( "flag" "fmt" - "github.com/pkg/errors" "io/ioutil" "math/rand" "os" @@ -16,6 +15,7 @@ import ( "testing" "time" + "github.com/pkg/errors" "github.com/blang/semver" "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" @@ -563,6 +563,34 @@ var _ = Describe("backup and restore end to end tests", func() { Expect(stdout).To(ContainSubstring("Cleanup complete")) Expect(stdout).To(Not(ContainSubstring("CRITICAL"))) }) + It("runs gpbackup with copy-queue-size and sends a SIGINT to ensure cleanup functions successfully", func() { + if useOldBackupVersion { + Skip("This test is not needed for old backup versions") + } + args := []string{"--dbname", "testdb", + "--backup-dir", backupDir, + "--single-data-file", + "--copy-queue-size", "4", + "--verbose"} + cmd := exec.Command(gpbackupPath, args...) + go func() { + /* + * We use a random delay for the sleep in this test (between + * 0.5s and 0.8s) so that gpbackup will be interrupted at a + * different point in the backup process every time to help + * catch timing issues with the cleanup. + */ + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + time.Sleep(time.Duration(rng.Intn(300)+500) * time.Millisecond) + _ = cmd.Process.Signal(os.Interrupt) + }() + output, _ := cmd.CombinedOutput() + stdout := string(output) + + Expect(stdout).To(ContainSubstring("Received a termination signal, aborting backup process")) + Expect(stdout).To(ContainSubstring("Cleanup complete")) + Expect(stdout).To(Not(ContainSubstring("CRITICAL"))) + }) It("runs gprestore and sends a SIGINT to ensure cleanup functions successfully", func() { if useOldBackupVersion { Skip("This test is not needed for old backup versions") @@ -595,6 +623,39 @@ var _ = Describe("backup and restore end to end tests", func() { Expect(stdout).To(Not(ContainSubstring("CRITICAL"))) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gprestore with copy-queue-size and sends a SIGINT to ensure cleanup functions successfully", func() { + if useOldBackupVersion { + Skip("This test is not needed for old backup versions") + } + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--backup-dir", backupDir, + "--single-data-file") + args := []string{ + "--timestamp", timestamp, + "--redirect-db", "restoredb", + "--backup-dir", backupDir, + "--verbose", + "--copy-queue-size", "4"} + cmd := exec.Command(gprestorePath, args...) + go func() { + /* + * We use a random delay for the sleep in this test (between + * 0.5s and 0.8s) so that gprestore will be interrupted at a + * different point in the backup process every time to help + * catch timing issues with the cleanup. + */ + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + time.Sleep(time.Duration(rng.Intn(300)+500) * time.Millisecond) + _ = cmd.Process.Signal(os.Interrupt) + }() + output, _ := cmd.CombinedOutput() + stdout := string(output) + + Expect(stdout).To(ContainSubstring("Received a termination signal, aborting restore process")) + Expect(stdout).To(ContainSubstring("Cleanup complete")) + Expect(stdout).To(Not(ContainSubstring("CRITICAL"))) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and sends a SIGINT to ensure blocked LOCK TABLE query is canceled", func() { if useOldBackupVersion { Skip("This test is not needed for old backup versions") @@ -1360,7 +1421,6 @@ var _ = Describe("backup and restore end to end tests", func() { "--jobs", "9", "--verbose"} cmd := exec.Command(gpbackupPath, args...) - // Concurrently wait for gpbackup to block when it requests an AccessShareLock on public.foo. Once // that happens, acquire an AccessExclusiveLock on pg_catalog.pg_trigger to block gpbackup during its // trigger metadata dump. Then release the initial AccessExclusiveLock on public.foo (from the @@ -1475,6 +1535,137 @@ var _ = Describe("backup and restore end to end tests", func() { Expect(stdout).To(ContainSubstring("Backup completed successfully")) }) + It("runs gpbackup with copy-queue-size flag and COPY deadlock handling occurs", func() { + if useOldBackupVersion { + Skip("This test is not needed for old backup versions") + } + // Acquire AccessExclusiveLock on public.foo to block gpbackup when it attempts + // to grab AccessShareLocks before its metadata dump section. + backupConn.MustExec("BEGIN; LOCK TABLE public.foo IN ACCESS EXCLUSIVE MODE") + + // Execute gpbackup with --copy-queue-size 2 + args := []string{ + "--dbname", "testdb", + "--backup-dir", backupDir, + "--single-data-file", + "--copy-queue-size", "2", + "--verbose"} + cmd := exec.Command(gpbackupPath, args...) + + // Concurrently wait for gpbackup to block when it requests an AccessShareLock on public.foo. Once + // that happens, acquire an AccessExclusiveLock on pg_catalog.pg_trigger to block gpbackup during its + // trigger metadata dump. Then release the initial AccessExclusiveLock on public.foo (from the + // beginning of the test) to unblock gpbackup and let gpbackup move forward to the trigger metadata dump. + anotherConn := testutils.SetupTestDbConn("testdb") + defer anotherConn.Close() + go func() { + // Query to see if gpbackup's AccessShareLock request on public.foo is blocked + checkLockQuery := `SELECT count(*) FROM pg_locks l, pg_class c, pg_namespace n WHERE l.relation = c.oid AND n.oid = c.relnamespace AND n.nspname = 'public' AND c.relname = 'foo' AND l.granted = 'f' AND l.mode = 'AccessShareLock'` + + // Wait up to 10 seconds for gpbackup to block + var gpbackupBlockedLockCount int + iterations := 100 + for iterations > 0 { + _ = anotherConn.Get(&gpbackupBlockedLockCount, checkLockQuery) + if gpbackupBlockedLockCount < 1 { + time.Sleep(100 * time.Millisecond) + iterations-- + } else { + break + } + } + + // Queue AccessExclusiveLock request on pg_catalog.pg_trigger to block gpbackup + // during the trigger metadata dump so that the test can queue a bunch of + // AccessExclusiveLock requests against the test tables. Afterwards, release the + // AccessExclusiveLock on public.foo to let gpbackup go to the trigger metadata dump. + anotherConn.MustExec(`BEGIN; LOCK TABLE pg_catalog.pg_trigger IN ACCESS EXCLUSIVE MODE`) + backupConn.MustExec("COMMIT") + }() + + // Concurrently wait for gpbackup to block on the trigger metadata dump section. Once we + // see gpbackup blocked, request AccessExclusiveLock (to imitate a TRUNCATE or VACUUM + // FULL) on all the test tables. + dataTables := []string{`public."FOObar"`, "public.foo", "public.holds", "public.sales", + "schema2.ao1", "schema2.ao2", "schema2.foo2", "schema2.foo3", "schema2.returns"} + for _, dataTable := range dataTables { + go func(dataTable string) { + accessExclusiveLockConn := testutils.SetupTestDbConn("testdb") + defer accessExclusiveLockConn.Close() + + // Query to see if gpbackup's AccessShareLock request on pg_catalog.pg_trigger is blocked + checkLockQuery := `SELECT count(*) FROM pg_locks l, pg_class c, pg_namespace n WHERE l.relation = c.oid AND n.oid = c.relnamespace AND n.nspname = 'pg_catalog' AND c.relname = 'pg_trigger' AND l.granted = 'f' AND l.mode = 'AccessShareLock'` + + // Wait up to 10 seconds for gpbackup to block + var gpbackupBlockedLockCount int + iterations := 100 + for iterations > 0 { + _ = accessExclusiveLockConn.Get(&gpbackupBlockedLockCount, checkLockQuery) + if gpbackupBlockedLockCount < 1 { + time.Sleep(100 * time.Millisecond) + iterations-- + } else { + break + } + } + + // Queue an AccessExclusiveLock request on a test table which will later + // result in a detected deadlock during the gpbackup data dump section. + accessExclusiveLockConn.MustExec(fmt.Sprintf(`BEGIN; LOCK TABLE %s IN ACCESS EXCLUSIVE MODE; COMMIT`, dataTable)) + }(dataTable) + } + + // Concurrently wait for all AccessExclusiveLock requests on all 9 test tables to block. + // Once that happens, release the AccessExclusiveLock on pg_catalog.pg_trigger to unblock + // gpbackup and let gpbackup move forward to the data dump section. + var accessExclBlockedLockCount int + go func() { + // Query to check for ungranted AccessExclusiveLock requests on our test tables + checkLockQuery := `SELECT count(*) FROM pg_locks WHERE granted = 'f' AND mode = 'AccessExclusiveLock'` + + // Wait up to 10 seconds + iterations := 100 + for iterations > 0 { + _ = backupConn.Get(&accessExclBlockedLockCount, checkLockQuery) + if accessExclBlockedLockCount < 9 { + time.Sleep(100 * time.Millisecond) + iterations-- + } else { + break + } + } + + // Unblock gpbackup by releasing AccessExclusiveLock on pg_catalog.pg_trigger + anotherConn.MustExec("COMMIT") + }() + + // gpbackup has finished + output, _ := cmd.CombinedOutput() + stdout := string(output) + + // Sanity check that 9 deadlock traps were placed during the test + Expect(accessExclBlockedLockCount).To(Equal(9)) + Expect(stdout).To(ContainSubstring("All copy queue workers terminated due to lock issues. Falling back to single main worker.")) + // No non-main worker should have been able to run COPY due to deadlock detection + for i := 1; i < 2; i++ { + expectedLockString := fmt.Sprintf("[DEBUG]:-Worker %d: LOCK TABLE ", i) + Expect(stdout).To(ContainSubstring(expectedLockString)) + + expectedWarnString := fmt.Sprintf("[WARNING]:-Worker %d could not acquire AccessShareLock for table", i) + Expect(stdout).To(ContainSubstring(expectedWarnString)) + + unexpectedCopyString := fmt.Sprintf("[DEBUG]:-Worker %d: COPY ", i) + Expect(stdout).ToNot(ContainSubstring(unexpectedCopyString)) + } + + // Only the main worker thread, worker 0, will run COPY on all the test tables + for _, dataTable := range dataTables { + expectedString := fmt.Sprintf(`[DEBUG]:-Worker 0: COPY %s TO PROGRAM `, dataTable) + Expect(stdout).To(ContainSubstring(expectedString)) + } + + Expect(stdout).To(ContainSubstring("Backup completed successfully")) + }) It("successfully backs up foreign tables", func() { if backupConn.Version.Before("6") { Skip("Test does not apply for GPDB versions before 6") diff --git a/end_to_end/incremental_test.go b/end_to_end/incremental_test.go index 81e0f7093..2f6713ff1 100644 --- a/end_to_end/incremental_test.go +++ b/end_to_end/incremental_test.go @@ -363,6 +363,51 @@ var _ = Describe("End to End incremental tests", func() { assertArtifactsCleaned(restoreConn, incremental1Timestamp) assertArtifactsCleaned(restoreConn, incremental2Timestamp) }) + It("Restores from an incremental backup based on a from-timestamp incremental with --copy-queue-size", func() { + fullBackupTimestamp := gpbackup(gpbackupPath, backupHelperPath, + "--leaf-partition-data", + "--single-data-file", + "--copy-queue-size", "4", + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, fullBackupTimestamp) + testhelper.AssertQueryRuns(backupConn, + "INSERT into schema2.ao1 values(1001)") + defer testhelper.AssertQueryRuns(backupConn, + "DELETE from schema2.ao1 where i=1001") + incremental1Timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--incremental", + "--leaf-partition-data", + "--single-data-file", + "--copy-queue-size", "4", + "--from-timestamp", fullBackupTimestamp, + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, incremental1Timestamp) + + testhelper.AssertQueryRuns(backupConn, + "INSERT into schema2.ao1 values(1002)") + defer testhelper.AssertQueryRuns(backupConn, + "DELETE from schema2.ao1 where i=1002") + incremental2Timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--incremental", + "--leaf-partition-data", + "--single-data-file", + "--copy-queue-size", "4", + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, incremental2Timestamp) + + gprestore(gprestorePath, restoreHelperPath, incremental2Timestamp, + "--redirect-db", "restoredb", + "--copy-queue-size", "4", + "--plugin-config", pluginConfigPath) + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + schema2TupleCounts["schema2.ao1"] = 1002 + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, fullBackupTimestamp) + assertArtifactsCleaned(restoreConn, incremental1Timestamp) + assertArtifactsCleaned(restoreConn, incremental2Timestamp) + }) It("Runs backup and restore if plugin location changed", func() { pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) fullBackupTimestamp := gpbackup(gpbackupPath, backupHelperPath, diff --git a/end_to_end/plugin_test.go b/end_to_end/plugin_test.go index 6fb497c1c..6d4217dca 100644 --- a/end_to_end/plugin_test.go +++ b/end_to_end/plugin_test.go @@ -64,6 +64,23 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) + }) + It("runs gpbackup and gprestore with single-data-file flag with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--copy-queue-size", "4", + "--backup-dir", backupDir) + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--copy-queue-size", "4", + "--backup-dir", backupDir) + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and gprestore with single-data-file flag without compression", func() { timestamp := gpbackup(gpbackupPath, backupHelperPath, @@ -79,6 +96,23 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore with single-data-file flag without compression with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--copy-queue-size", "4", + "--backup-dir", backupDir, + "--no-compression") + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--copy-queue-size", "4", + "--backup-dir", backupDir) + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and gprestore on database with all objects", func() { testhelper.AssertQueryRuns(backupConn, "DROP SCHEMA IF EXISTS schema2 CASCADE; DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP PROCEDURAL LANGUAGE IF EXISTS plpythonu;") @@ -128,6 +162,58 @@ var _ = Describe("End to End plugin tests", func() { "--redirect-db", "restoredb") assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore on database with all objects with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + testhelper.AssertQueryRuns(backupConn, + "DROP SCHEMA IF EXISTS schema2 CASCADE; DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP PROCEDURAL LANGUAGE IF EXISTS plpythonu;") + defer testutils.ExecuteSQLFile(backupConn, + "resources/test_tables_data.sql") + defer testutils.ExecuteSQLFile(backupConn, + "resources/test_tables_ddl.sql") + defer testhelper.AssertQueryRuns(backupConn, + "DROP SCHEMA IF EXISTS schema2 CASCADE; DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP PROCEDURAL LANGUAGE IF EXISTS plpythonu;") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP SCHEMA IF EXISTS schema2 CASCADE; DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP PROCEDURAL LANGUAGE IF EXISTS plpythonu;") + testhelper.AssertQueryRuns(backupConn, + "CREATE ROLE testrole SUPERUSER") + defer testhelper.AssertQueryRuns(backupConn, + "DROP ROLE testrole") + testutils.ExecuteSQLFile(backupConn, "resources/gpdb4_objects.sql") + if backupConn.Version.AtLeast("5") { + testutils.ExecuteSQLFile(backupConn, "resources/gpdb5_objects.sql") + } + if backupConn.Version.AtLeast("6") { + testutils.ExecuteSQLFile(backupConn, "resources/gpdb6_objects.sql") + defer testhelper.AssertQueryRuns(backupConn, + "DROP FOREIGN DATA WRAPPER fdw CASCADE;") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP FOREIGN DATA WRAPPER fdw CASCADE;") + } + if backupConn.Version.AtLeast("6.2") { + testhelper.AssertQueryRuns(backupConn, + "CREATE TABLE mview_table1(i int, j text);") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP TABLE mview_table1;") + testhelper.AssertQueryRuns(backupConn, + "CREATE MATERIALIZED VIEW mview1 (i2) as select i from mview_table1;") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP MATERIALIZED VIEW mview1;") + testhelper.AssertQueryRuns(backupConn, + "CREATE MATERIALIZED VIEW mview2 as select * from mview1;") + defer testhelper.AssertQueryRuns(restoreConn, + "DROP MATERIALIZED VIEW mview2;") + } + + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--leaf-partition-data", + "--single-data-file", + "--copy-queue-size", "4") + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--metadata-only", + "--redirect-db", "restoredb", + "--copy-queue-size", "4") + assertArtifactsCleaned(restoreConn, timestamp) + }) Context("with include filtering on restore", func() { It("runs gpbackup and gprestore with include-table-file restore flag with a single data file", func() { @@ -147,6 +233,26 @@ var _ = Describe("End to End plugin tests", func() { _ = os.Remove("/tmp/include-tables.txt") }) + It("runs gpbackup and gprestore with include-table-file restore flag with a single data with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + includeFile := iohelper.MustOpenFileForWriting("/tmp/include-tables.txt") + utils.MustPrintln(includeFile, "public.sales\npublic.foo\npublic.myseq1\npublic.myview1") + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--backup-dir", backupDir, + "--single-data-file", + "--copy-queue-size", "4") + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--backup-dir", backupDir, + "--include-table-file", "/tmp/include-tables.txt", + "--copy-queue-size", "4") + assertRelationsCreated(restoreConn, 16) + assertDataRestored(restoreConn, map[string]int{ + "public.sales": 13, "public.foo": 40000}) + assertArtifactsCleaned(restoreConn, timestamp) + + _ = os.Remove("/tmp/include-tables.txt") + }) It("runs gpbackup and gprestore with include-schema restore flag with a single data file", func() { timestamp := gpbackup(gpbackupPath, backupHelperPath, "--backup-dir", backupDir, @@ -160,6 +266,22 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore with include-schema restore flag with a single data file with copy-queue-size", func() { + skipIfOldBackupVersionBefore("1.23.0") + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--backup-dir", backupDir, + "--single-data-file", + "--copy-queue-size", "4") + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--backup-dir", backupDir, + "--include-schema", "schema2", + "--copy-queue-size", "4") + + assertRelationsCreated(restoreConn, 17) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) }) Context("with plugin", func() { @@ -189,6 +311,27 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore with plugin, single-data-file, no-compression, and copy-queue-size", func() { + pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) + copyPluginToAllHosts(backupConn, pluginExecutablePath) + + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--copy-queue-size", "4", + "--no-compression", + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, timestamp) + + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--plugin-config", pluginConfigPath, + "--copy-queue-size", "4") + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and gprestore with plugin and single-data-file", func() { pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) copyPluginToAllHosts(backupConn, pluginExecutablePath) @@ -207,6 +350,26 @@ var _ = Describe("End to End plugin tests", func() { assertDataRestored(restoreConn, schema2TupleCounts) assertArtifactsCleaned(restoreConn, timestamp) }) + It("runs gpbackup and gprestore with plugin, single-data-file, and copy-queue-size", func() { + pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) + copyPluginToAllHosts(backupConn, pluginExecutablePath) + + timestamp := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--copy-queue-size", "4", + "--plugin-config", pluginConfigPath) + forceMetadataFileDownloadFromPlugin(backupConn, timestamp) + + gprestore(gprestorePath, restoreHelperPath, timestamp, + "--redirect-db", "restoredb", + "--plugin-config", pluginConfigPath, + "--copy-queue-size", "4") + + assertRelationsCreated(restoreConn, TOTAL_RELATIONS) + assertDataRestored(restoreConn, publicSchemaTupleCounts) + assertDataRestored(restoreConn, schema2TupleCounts) + assertArtifactsCleaned(restoreConn, timestamp) + }) It("runs gpbackup and gprestore with plugin and metadata-only", func() { pluginExecutablePath := fmt.Sprintf("%s/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash", os.Getenv("GOPATH")) copyPluginToAllHosts(backupConn, pluginExecutablePath) diff --git a/plugins/plugin_test.sh b/plugins/plugin_test.sh index 81c5e2b7f..5d0371c9f 100755 --- a/plugins/plugin_test.sh +++ b/plugins/plugin_test.sh @@ -486,11 +486,13 @@ test_backup_and_restore_with_plugin() { echo "[PASSED] gpbackup and gprestore (using ${flags})" } +test_backup_and_restore_with_plugin "--single-data-file --no-compression --copy-queue-size 4" "--copy-queue-size 4" test_backup_and_restore_with_plugin "--no-compression --single-data-file" test_backup_and_restore_with_plugin "--no-compression" test_backup_and_restore_with_plugin "--metadata-only" test_backup_and_restore_with_plugin "--no-compression --single-data-file" "restore-filter" + # ---------------------------------------------- # Cleanup test artifacts # ---------------------------------------------- diff --git a/plugins/plugin_test_scale.sh b/plugins/plugin_test_scale.sh index ee8621319..1322db9bb 100755 --- a/plugins/plugin_test_scale.sh +++ b/plugins/plugin_test_scale.sh @@ -142,7 +142,7 @@ if [[ "$plugin" == *gpbackup_s3_plugin ]]; then fi test_backup_and_restore_with_plugin "$plugin_config" "--single-data-file --no-compression" "$restore_filter" - +test_backup_and_restore_with_plugin "$plugin_config" "--single-data-file --copy-queue-size 4 --no-compression" "--copy-queue-size 4" if [[ "$plugin" == *gpbackup_ddboost_plugin ]]; then echo echo "DISABLED restore_subset" diff --git a/restore/validate_test.go b/restore/validate_test.go index 8a48cd8f7..990db1e02 100644 --- a/restore/validate_test.go +++ b/restore/validate_test.go @@ -4,7 +4,6 @@ import ( "strings" "github.com/DATA-DOG/go-sqlmock" - "github.com/spf13/cobra" "github.com/greenplum-db/gp-common-go-libs/testhelper" "github.com/greenplum-db/gpbackup/history" "github.com/greenplum-db/gpbackup/options" @@ -12,10 +11,11 @@ import ( "github.com/greenplum-db/gpbackup/testutils" "github.com/greenplum-db/gpbackup/toc" "github.com/greenplum-db/gpbackup/utils" + "github.com/spf13/cobra" . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" . "github.com/onsi/ginkgo/extensions/table" + . "github.com/onsi/gomega" ) var _ = Describe("restore/validate tests", func() { @@ -299,19 +299,20 @@ var _ = Describe("restore/validate tests", func() { DescribeTable("Validate various flag combinations that are required or exclusive", func(argString string, valid bool) { testCmd := &cobra.Command{ - Use: "flag validation", - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { - restore.ValidateFlagCombinations(cmd.Flags()) - }} + Use: "flag validation", + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + restore.ValidateFlagCombinations(cmd.Flags()) + }} testCmd.SetArgs(strings.Split(argString, " ")) restore.SetCmdFlags(testCmd.Flags()) - if (!valid) { + if !valid { defer testhelper.ShouldPanicWithMessage("CRITICAL") } - err := testCmd.Execute(); if err != nil && valid{ + err := testCmd.Execute() + if err != nil && valid { Fail("Valid flag combination failed validation check") } }, @@ -331,12 +332,12 @@ var _ = Describe("restore/validate tests", func() { Entry("--exclude-schema combos", "--exclude-schema schema1 --exclude-schema-file /tmp/file2", true), // TODO: Verify this. // --exclude-schema-file combinations with other filters - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-table schema.table2", true), // TODO: Verify this. - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-table-file /tmp/file2", true), // TODO: Verify this. - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-schema schema2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-table schema.table2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-table-file /tmp/file2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-schema schema2", true), // TODO: Verify this. Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --include-schema-file /tmp/file2", true), // TODO: Verify this. - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --exclude-table schema.table2", true), // TODO: Verify this. - Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --exclude-table-file /tmp/file2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --exclude-table schema.table2", true), // TODO: Verify this. + Entry("--exclude-schema-file combos", "--exclude-schema-file /tmp/file --exclude-table-file /tmp/file2", true), // TODO: Verify this. // --exclude-table combinations with other filters Entry("--exclude-table combos", "--exclude-table schema.table --include-table schema.table2", false), @@ -359,7 +360,7 @@ var _ = Describe("restore/validate tests", func() { Entry("--include-schema combos", "--include-schema schema1 --include-schema-file /tmp/file2", true), // TODO: Verify this. // --include-schema-file combinations with other filters - Entry("--include-schema-file combos", "--include-schema-file /tmp/file --include-table schema.table2", true), // TODO: Verify this. + Entry("--include-schema-file combos", "--include-schema-file /tmp/file --include-table schema.table2", true), // TODO: Verify this. Entry("--include-schema-file combos", "--include-schema-file /tmp/file --include-table-file /tmp/file2", true), // TODO: Verify this. // --include-table combinations with other filters @@ -397,4 +398,23 @@ var _ = Describe("restore/validate tests", func() { Entry("--redirect-schema combos", "--redirect-schema schema1 --include-table schema.table2 --data-only", true), ) }) + Describe("ValidateBackupFlagCombinations", func() { + It("restore with copy-queue-size should fatal if backup was not taken with single-data-file", func() { + restore.SetBackupConfig(&history.BackupConfig{SingleDataFile: false}) + testCmd := &cobra.Command{ + Use: "flag validation", + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + restore.ValidateBackupFlagCombinations() + }} + testCmd.SetArgs([]string{"--copy-queue-size", "4"}) + restore.SetCmdFlags(testCmd.Flags()) + + defer testhelper.ShouldPanicWithMessage("CRITICAL") + err := testCmd.Execute() + if err == nil { + Fail("invalid flag combination passed validation check") + } + }) + }) }) diff --git a/utils/agent_remote_test.go b/utils/agent_remote_test.go index a75f1ab39..a172709f3 100644 --- a/utils/agent_remote_test.go +++ b/utils/agent_remote_test.go @@ -126,11 +126,18 @@ var _ = Describe("agent remote", func() { Describe("StartGpbackupHelpers()", func() { It("Correctly propagates --on-error-continue flag to gpbackup_helper", func() { wasTerminated := false - utils.StartGpbackupHelpers(testCluster, fpInfo, "operation", "/tmp/pluginConfigFile.yml", " compressStr", true, false, &wasTerminated) + utils.StartGpbackupHelpers(testCluster, fpInfo, "operation", "/tmp/pluginConfigFile.yml", " compressStr", true, false, &wasTerminated, 1) cc := testExecutor.ClusterCommands[0] Expect(cc[1].CommandString).To(ContainSubstring(" --on-error-continue")) }) + It("Correctly propagates --copy-queue-size value to gpbackup_helper", func() { + wasTerminated := false + utils.StartGpbackupHelpers(testCluster, fpInfo, "operation", "/tmp/pluginConfigFile.yml", " compressStr", false, false, &wasTerminated, 4) + + cc := testExecutor.ClusterCommands[0] + Expect(cc[1].CommandString).To(ContainSubstring(" --copy-queue-size 4")) + }) }) Describe("CheckAgentErrorsOnSegments", func() { It("constructs the correct ssh call to check for the existance of an error file on each segment", func() { From 39cc1e865d805997fc59c6698a038ebea6987623 Mon Sep 17 00:00:00 2001 From: Vasiliy Ivanov Date: Tue, 14 Dec 2021 13:39:07 +1000 Subject: [PATCH 4/4] remove checks for external tables in queued backup We've moved all external tables check from backup data routines in 76830c67075ae5e21afe0b0ce1dee7b0ee91418b. There is new routine to backup into single data file with a connection queue implemented in 5925c1edf3be958e10044c98eff8f037ddc55227. So remove checks from this new routine to as useless. Co-authored-by: Polina Bungina --- backup/data.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/backup/data.go b/backup/data.go index cdd913ac3..04e20a7d1 100644 --- a/backup/data.go +++ b/backup/data.go @@ -120,13 +120,7 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters // workers encounter locking issues. Worker 0 already has all locks on the // tables so it will not run into locking issues. func backupDataForAllTablesCopyQueue(tables []Table) []map[uint32]int64 { - var numExtOrForeignTables int64 - for _, table := range tables { - if table.SkipDataBackup() { - numExtOrForeignTables++ - } - } - counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables)) - numExtOrForeignTables} + counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables))} counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO) counters.ProgressBar.Start() rowsCopiedMaps := make([]map[uint32]int64, connectionPool.NumConns) @@ -158,11 +152,6 @@ func backupDataForAllTablesCopyQueue(tables []Table) []map[uint32]int64 { return } - if table.SkipDataBackup() { - gplog.Verbose("Skipping data backup of table %s because it is either an external or foreign table.", table.FQN()) - oidMap.Store(table.Oid, Complete) - continue - } // If a random external SQL command had queued an AccessExclusiveLock acquisition request // against this next table, the --job worker thread would deadlock on the COPY attempt. // To prevent gpbackup from hanging, we attempt to acquire an AccessShareLock on the @@ -261,7 +250,6 @@ func backupDataForAllTablesCopyQueue(tables []Table) []map[uint32]int64 { } counters.ProgressBar.Finish() - printDataBackupWarnings(numExtOrForeignTables) return rowsCopiedMaps }