diff --git a/backup/data.go b/backup/data.go index 54fdcb2fa..5d7606f8e 100644 --- a/backup/data.go +++ b/backup/data.go @@ -185,12 +185,6 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure it's called to release resources even if no errors - // Launch a checker that polls if the backup helper has ended with an error. It will cancel all pending - // COPY commands that could be hanging on pipes, that the backup helper didn't close before it died. - if MustGetFlagBool(options.SINGLE_DATA_FILE) { - utils.StartHelperChecker(globalCluster, globalFPInfo, cancel) - } - /* * Worker 0 is a special database connection that * 1) Exports the database snapshot if the feature is supported diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 67628c63f..7a45ffe82 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2579,8 +2579,8 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--jobs", "3") output, err := gprestoreCmd.CombinedOutput() Expect(err).To(HaveOccurred()) - Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t1`)) - Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2`)) + Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`)) + Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2: timeout: context canceled`)) assertArtifactsCleaned("20240502095933") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") }) diff --git a/helper/backup_helper.go b/helper/backup_helper.go index f156ad56f..b72f7252d 100644 --- a/helper/backup_helper.go +++ b/helper/backup_helper.go @@ -33,7 +33,6 @@ func doBackupAgent() error { return err } - preloadCreatedPipesForBackup(oidList, *copyQueue) var currentPipe string var errBuf bytes.Buffer /* diff --git a/helper/helper.go b/helper/helper.go index 56564342b..5c0a30675 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "golang.org/x/sys/unix" @@ -26,9 +27,9 @@ var ( CleanupGroup *sync.WaitGroup version string wasTerminated bool + wasSigpiped atomic.Bool writeHandle *os.File writer *bufio.Writer - pipesMap map[string]bool ) /* @@ -129,8 +130,6 @@ func InitializeGlobals() { } operating.InitializeSystemFunctions() - pipesMap = make(map[string]bool, 0) - gplog.InitializeLogging("gpbackup_helper", "") gplog.SetLogFileVerbosity(*verbosity) } @@ -151,6 +150,7 @@ func InitializeSignalHandler() { gplog.Warn("Received a termination signal on segment %d: aborting", *content) terminatedChan <- true case unix.SIGPIPE: + wasSigpiped.Store(true) if *onErrorContinue { gplog.Warn("Received a broken pipe signal on segment %d: on-error-continue set, continuing", *content) terminatedChan <- false @@ -183,7 +183,6 @@ func createPipe(pipe string) error { return err } - pipesMap[pipe] = true return nil } @@ -193,23 +192,25 @@ func deletePipe(pipe string) error { return err } - delete(pipesMap, pipe) return nil } -// Gpbackup creates the first n pipes. Record these pipes. -func preloadCreatedPipesForBackup(oidList []int, queuedPipeCount int) { - for i := 0; i < queuedPipeCount; i++ { - pipeName := fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) - pipesMap[pipeName] = true +func openClosePipe(filename string) error { + flag := unix.O_NONBLOCK + if *backupAgent { + flag |= os.O_RDONLY + } else if *restoreAgent { + flag |= os.O_WRONLY } -} - -func preloadCreatedPipesForRestore(oidWithBatchList []oidWithBatch, queuedPipeCount int) { - for i := 0; i < queuedPipeCount; i++ { - pipeName := fmt.Sprintf("%s_%d_%d", *pipeFile, oidWithBatchList[i].oid, oidWithBatchList[i].batch) - pipesMap[pipeName] = true + handle, err := os.OpenFile(filename, flag, os.ModeNamedPipe) + if err != nil { + return err } + err = handle.Close() + if err != nil { + return err + } + return nil } func getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) { @@ -296,7 +297,19 @@ func DoCleanup() { logVerbose("Encountered error during cleanup: %v", err) } - for pipeName, _ := range pipesMap { + pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) + for _, pipeName := range pipeFiles { + if !wasSigpiped.Load() { + /* + * The main process doesn't know about the error yet, so it needs to + * open/close pipes so that the COPY commands hanging on them can complete. + */ + logVerbose("Opening/closing pipe %s", pipeName) + err = openClosePipe(pipeName) + if err != nil { + logVerbose("Encountered error opening/closing pipe %s: %v", pipeName, err) + } + } logVerbose("Removing pipe %s", pipeName) err = deletePipe(pipeName) if err != nil { diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 8b893c822..331cf72d0 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -224,8 +224,6 @@ func doRestoreAgent() error { } } - preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue) - var currentPipe string // If skip file is detected for the particular tableOid, will not process batches related to this oid diff --git a/restore/data.go b/restore/data.go index 7f88b2cd7..0f58f4122 100644 --- a/restore/data.go +++ b/restore/data.go @@ -261,12 +261,6 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure it's called to release resources even if no errors - // Launch a checker that polls if the restore helper has ended with an error. It will cancel all pending - // COPY commands that could be hanging on pipes, that the restore helper didn't close before it died. - if backupConfig.SingleDataFile || resizeCluster { - utils.StartHelperChecker(globalCluster, globalFPInfo, cancel) - } - for i := 0; i < connectionPool.NumConns; i++ { workerPool.Add(1) go func(whichConn int) { diff --git a/utils/agent_remote.go b/utils/agent_remote.go index 9d65c0742..436553652 100644 --- a/utils/agent_remote.go +++ b/utils/agent_remote.go @@ -362,19 +362,3 @@ func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster, return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID), oid) }) } - -func StartHelperChecker(cl *cluster.Cluster, fpInfo filepath.FilePathInfo, cancel func()) { - go func() { - for { - time.Sleep(5 * time.Second) - remoteOutput := cl.GenerateAndExecuteCommand("Checking gpbackup_helper agent failure", cluster.ON_SEGMENTS, func(contentID int) string { - helperErrorFileName := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID)) - return fmt.Sprintf("! ls %s", helperErrorFileName) - }) - if remoteOutput.NumErrors != 0 { - gplog.Error("gpbackup_helper failed to start on some segments") - cancel() - } - } - }() -}