From fbba35b34c8db7e1a35cb8796ac01be9d952e30b Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Tue, 12 Nov 2024 13:30:09 +0500 Subject: [PATCH 01/25] rework --- backup/data.go | 6 ------ helper/backup_helper.go | 1 + helper/helper.go | 23 ++++++++++++++++++++--- helper/restore_helper.go | 2 ++ restore/data.go | 6 ------ utils/agent_remote.go | 16 ---------------- 6 files changed, 23 insertions(+), 31 deletions(-) diff --git a/backup/data.go b/backup/data.go index 54fdcb2fa..5d7606f8e 100644 --- a/backup/data.go +++ b/backup/data.go @@ -185,12 +185,6 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure it's called to release resources even if no errors - // Launch a checker that polls if the backup helper has ended with an error. It will cancel all pending - // COPY commands that could be hanging on pipes, that the backup helper didn't close before it died. - if MustGetFlagBool(options.SINGLE_DATA_FILE) { - utils.StartHelperChecker(globalCluster, globalFPInfo, cancel) - } - /* * Worker 0 is a special database connection that * 1) Exports the database snapshot if the feature is supported diff --git a/helper/backup_helper.go b/helper/backup_helper.go index f156ad56f..8f5e1541a 100644 --- a/helper/backup_helper.go +++ b/helper/backup_helper.go @@ -123,6 +123,7 @@ func getBackupPipeReader(currentPipe string) (io.Reader, io.ReadCloser, error) { // Once this bug is fixed, the call to Fd() can be removed readHandle.Fd() reader := bufio.NewReader(readHandle) + pipesMap[currentPipe] = true return reader, readHandle, nil } diff --git a/helper/helper.go b/helper/helper.go index 56564342b..6d7b381f1 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -183,11 +183,28 @@ func createPipe(pipe string) error { return err } - pipesMap[pipe] = true + pipesMap[pipe] = false return nil } func deletePipe(pipe string) error { + if utils.FileExists(pipe) && !pipesMap[pipe] { + var err error + var handle *os.File + if *backupAgent { + handle, err = os.OpenFile(pipe, os.O_RDONLY, os.ModeNamedPipe) + } else { + handle, err = os.OpenFile(pipe, os.O_WRONLY|unix.O_NONBLOCK, os.ModeNamedPipe) + } + if err != nil { + logVerbose("Encountered error creating pipe file: %v", err) + } + err = handle.Close() + if err != nil { + logVerbose("Encountered error closing pipe file: %v", err) + } + } + err := utils.RemoveFileIfExists(pipe) if err != nil { return err @@ -201,14 +218,14 @@ func deletePipe(pipe string) error { func preloadCreatedPipesForBackup(oidList []int, queuedPipeCount int) { for i := 0; i < queuedPipeCount; i++ { pipeName := fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) - pipesMap[pipeName] = true + pipesMap[pipeName] = false } } func preloadCreatedPipesForRestore(oidWithBatchList []oidWithBatch, queuedPipeCount int) { for i := 0; i < queuedPipeCount; i++ { pipeName := fmt.Sprintf("%s_%d_%d", *pipeFile, oidWithBatchList[i].oid, oidWithBatchList[i].batch) - pipesMap[pipeName] = true + pipesMap[pipeName] = false } } diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 5aaa8987a..5a50c2119 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -512,6 +512,8 @@ func getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) { // scenarios with --on-error-continue. pipeWriter := bufio.NewWriter(struct{ io.WriteCloser }{fileHandle}) + pipesMap[currentPipe] = true + return pipeWriter, fileHandle, nil } diff --git a/restore/data.go b/restore/data.go index a843f4ac8..25301501a 100644 --- a/restore/data.go +++ b/restore/data.go @@ -258,12 +258,6 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure it's called to release resources even if no errors - // Launch a checker that polls if the restore helper has ended with an error. It will cancel all pending - // COPY commands that could be hanging on pipes, that the restore helper didn't close before it died. - if backupConfig.SingleDataFile || resizeCluster { - utils.StartHelperChecker(globalCluster, globalFPInfo, cancel) - } - for i := 0; i < connectionPool.NumConns; i++ { workerPool.Add(1) go func(whichConn int) { diff --git a/utils/agent_remote.go b/utils/agent_remote.go index 3ee401afd..af43f42d4 100644 --- a/utils/agent_remote.go +++ b/utils/agent_remote.go @@ -361,19 +361,3 @@ func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster, return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID), oid) }) } - -func StartHelperChecker(cl *cluster.Cluster, fpInfo filepath.FilePathInfo, cancel func()) { - go func() { - for { - time.Sleep(5 * time.Second) - remoteOutput := cl.GenerateAndExecuteCommand("Checking gpbackup_helper agent failure", cluster.ON_SEGMENTS, func(contentID int) string { - helperErrorFileName := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID)) - return fmt.Sprintf("! ls %s", helperErrorFileName) - }) - if remoteOutput.NumErrors != 0 { - gplog.Error("gpbackup_helper failed to start on some segments") - cancel() - } - } - }() -} From 934657ee2e348796a66a6d203abcf6cd87947fab Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Tue, 12 Nov 2024 15:30:59 +0500 Subject: [PATCH 02/25] fix --- helper/restore_helper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 5a50c2119..4655d2f71 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -181,6 +181,8 @@ func doRestoreAgent() error { } } + preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue) + if *singleDataFile { contentToRestore := *content segmentTOC = make(map[int]*toc.SegmentTOC) @@ -224,8 +226,6 @@ func doRestoreAgent() error { } } - preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue) - var currentPipe string // If skip file is detected for the particular tableOid, will not process batches related to this oid From 844838175e365d2487fde56bdcbee2d7a6832b5a Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Tue, 12 Nov 2024 17:07:18 +0500 Subject: [PATCH 03/25] fix --- helper/helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/helper.go b/helper/helper.go index 6d7b381f1..37fb5edf9 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -192,7 +192,7 @@ func deletePipe(pipe string) error { var err error var handle *os.File if *backupAgent { - handle, err = os.OpenFile(pipe, os.O_RDONLY, os.ModeNamedPipe) + handle, err = os.OpenFile(pipe, os.O_RDONLY|unix.O_NONBLOCK, os.ModeNamedPipe) } else { handle, err = os.OpenFile(pipe, os.O_WRONLY|unix.O_NONBLOCK, os.ModeNamedPipe) } From 6d6d64dae8753b28ebbd425ea79bc8987868ec54 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Tue, 12 Nov 2024 17:34:20 +0500 Subject: [PATCH 04/25] fix --- end_to_end/end_to_end_suite_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 494c73869..127aa81f6 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2579,8 +2579,8 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--jobs", "3") output, err := gprestoreCmd.CombinedOutput() Expect(err).To(HaveOccurred()) + Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t0`)) Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t1`)) - Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2`)) assertArtifactsCleaned("20240502095933") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") }) From d7d9dcd2867601cdad5d23cbad766d26f46839e3 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Wed, 13 Nov 2024 09:49:42 +0500 Subject: [PATCH 05/25] optimize --- helper/helper.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index 37fb5edf9..336aec79d 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -189,15 +189,15 @@ func createPipe(pipe string) error { func deletePipe(pipe string) error { if utils.FileExists(pipe) && !pipesMap[pipe] { - var err error - var handle *os.File + flag := unix.O_NONBLOCK if *backupAgent { - handle, err = os.OpenFile(pipe, os.O_RDONLY|unix.O_NONBLOCK, os.ModeNamedPipe) + flag |= os.O_RDONLY } else { - handle, err = os.OpenFile(pipe, os.O_WRONLY|unix.O_NONBLOCK, os.ModeNamedPipe) + flag |= os.O_WRONLY } + handle, err := os.OpenFile(pipe, flag, os.ModeNamedPipe) if err != nil { - logVerbose("Encountered error creating pipe file: %v", err) + logVerbose("Encountered error opening pipe file: %v", err) } err = handle.Close() if err != nil { From c3cbd733e48c8d4c735acde360e72e246b8fca50 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Wed, 13 Nov 2024 09:56:41 +0500 Subject: [PATCH 06/25] move --- helper/backup_helper.go | 2 +- helper/restore_helper.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/helper/backup_helper.go b/helper/backup_helper.go index 8f5e1541a..0b1b0cb16 100644 --- a/helper/backup_helper.go +++ b/helper/backup_helper.go @@ -119,11 +119,11 @@ func getBackupPipeReader(currentPipe string) (io.Reader, io.ReadCloser, error) { // error logging handled by calling functions return nil, nil, err } + pipesMap[currentPipe] = true // This is a workaround for https://github.com/golang/go/issues/24164. // Once this bug is fixed, the call to Fd() can be removed readHandle.Fd() reader := bufio.NewReader(readHandle) - pipesMap[currentPipe] = true return reader, readHandle, nil } diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 4655d2f71..7d08f5d81 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -503,6 +503,7 @@ func getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) { // error logging handled by calling functions return nil, nil, err } + pipesMap[currentPipe] = true // At the moment (Golang 1.15), the copy_file_range system call from the os.File // ReadFrom method is only supported for Linux platforms. Furthermore, cross-filesystem @@ -512,8 +513,6 @@ func getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) { // scenarios with --on-error-continue. pipeWriter := bufio.NewWriter(struct{ io.WriteCloser }{fileHandle}) - pipesMap[currentPipe] = true - return pipeWriter, fileHandle, nil } From 99885ffbc6cf11d9a514aacde18adfd7cc8465bc Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Wed, 13 Nov 2024 13:38:52 +0500 Subject: [PATCH 07/25] partial restore --- helper/backup_helper.go | 1 - helper/helper.go | 23 +++-------------------- helper/restore_helper.go | 5 ++--- 3 files changed, 5 insertions(+), 24 deletions(-) diff --git a/helper/backup_helper.go b/helper/backup_helper.go index 0b1b0cb16..f156ad56f 100644 --- a/helper/backup_helper.go +++ b/helper/backup_helper.go @@ -119,7 +119,6 @@ func getBackupPipeReader(currentPipe string) (io.Reader, io.ReadCloser, error) { // error logging handled by calling functions return nil, nil, err } - pipesMap[currentPipe] = true // This is a workaround for https://github.com/golang/go/issues/24164. // Once this bug is fixed, the call to Fd() can be removed readHandle.Fd() diff --git a/helper/helper.go b/helper/helper.go index 336aec79d..56564342b 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -183,28 +183,11 @@ func createPipe(pipe string) error { return err } - pipesMap[pipe] = false + pipesMap[pipe] = true return nil } func deletePipe(pipe string) error { - if utils.FileExists(pipe) && !pipesMap[pipe] { - flag := unix.O_NONBLOCK - if *backupAgent { - flag |= os.O_RDONLY - } else { - flag |= os.O_WRONLY - } - handle, err := os.OpenFile(pipe, flag, os.ModeNamedPipe) - if err != nil { - logVerbose("Encountered error opening pipe file: %v", err) - } - err = handle.Close() - if err != nil { - logVerbose("Encountered error closing pipe file: %v", err) - } - } - err := utils.RemoveFileIfExists(pipe) if err != nil { return err @@ -218,14 +201,14 @@ func deletePipe(pipe string) error { func preloadCreatedPipesForBackup(oidList []int, queuedPipeCount int) { for i := 0; i < queuedPipeCount; i++ { pipeName := fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) - pipesMap[pipeName] = false + pipesMap[pipeName] = true } } func preloadCreatedPipesForRestore(oidWithBatchList []oidWithBatch, queuedPipeCount int) { for i := 0; i < queuedPipeCount; i++ { pipeName := fmt.Sprintf("%s_%d_%d", *pipeFile, oidWithBatchList[i].oid, oidWithBatchList[i].batch) - pipesMap[pipeName] = false + pipesMap[pipeName] = true } } diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 7d08f5d81..5aaa8987a 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -181,8 +181,6 @@ func doRestoreAgent() error { } } - preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue) - if *singleDataFile { contentToRestore := *content segmentTOC = make(map[int]*toc.SegmentTOC) @@ -226,6 +224,8 @@ func doRestoreAgent() error { } } + preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue) + var currentPipe string // If skip file is detected for the particular tableOid, will not process batches related to this oid @@ -503,7 +503,6 @@ func getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) { // error logging handled by calling functions return nil, nil, err } - pipesMap[currentPipe] = true // At the moment (Golang 1.15), the copy_file_range system call from the os.File // ReadFrom method is only supported for Linux platforms. Furthermore, cross-filesystem From 62a594c6176d494967d31e8ad8d11af140fd39b8 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Wed, 13 Nov 2024 14:07:10 +0500 Subject: [PATCH 08/25] rework solutiuon --- helper/helper.go | 8 ++++++++ utils/util.go | 20 ++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/helper/helper.go b/helper/helper.go index 56564342b..86e68aeb1 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -296,6 +296,14 @@ func DoCleanup() { logVerbose("Encountered error during cleanup: %v", err) } + pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) + for _, pipe := range pipeFiles { + err = utils.OpenClosePipeIfExists(pipe, *backupAgent, *restoreAgent) + if err != nil { + logVerbose("Encountered error during cleanup pipe files: %v", err) + } + } + for pipeName, _ := range pipesMap { logVerbose("Removing pipe %s", pipeName) err = deletePipe(pipeName) diff --git a/utils/util.go b/utils/util.go index 89b9a961a..9927218a0 100644 --- a/utils/util.go +++ b/utils/util.go @@ -56,6 +56,26 @@ func RemoveFileIfExists(filename string) error { return nil } +func OpenClosePipeIfExists(filename string, backupAgent bool, restoreAgent bool) error { + if FileExists(filename) { + flag := unix.O_NONBLOCK + if backupAgent { + flag |= os.O_RDONLY + } else if restoreAgent { + flag |= os.O_WRONLY + } + handle, err := os.OpenFile(filename, flag, os.ModeNamedPipe) + if err != nil { + gplog.Debug("Encountered error opening pipe file: %v", err) + } + err = handle.Close() + if err != nil { + gplog.Debug("Encountered error closing pipe file: %v", err) + } + } + return nil +} + func OpenFileForWrite(filename string) (*os.File, error) { baseFilename := path.Base(filename) file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) From 9a9db2eba03f4be1f0f50dc82dcf95780ad7a416 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Thu, 14 Nov 2024 09:23:00 +0500 Subject: [PATCH 09/25] optimize and simplify --- helper/backup_helper.go | 1 - helper/helper.go | 40 ++++++++++++++++++---------------------- helper/restore_helper.go | 2 -- utils/util.go | 20 -------------------- 4 files changed, 18 insertions(+), 45 deletions(-) diff --git a/helper/backup_helper.go b/helper/backup_helper.go index f156ad56f..b72f7252d 100644 --- a/helper/backup_helper.go +++ b/helper/backup_helper.go @@ -33,7 +33,6 @@ func doBackupAgent() error { return err } - preloadCreatedPipesForBackup(oidList, *copyQueue) var currentPipe string var errBuf bytes.Buffer /* diff --git a/helper/helper.go b/helper/helper.go index 86e68aeb1..82de00e34 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -28,7 +28,6 @@ var ( wasTerminated bool writeHandle *os.File writer *bufio.Writer - pipesMap map[string]bool ) /* @@ -129,8 +128,6 @@ func InitializeGlobals() { } operating.InitializeSystemFunctions() - pipesMap = make(map[string]bool, 0) - gplog.InitializeLogging("gpbackup_helper", "") gplog.SetLogFileVerbosity(*verbosity) } @@ -183,7 +180,6 @@ func createPipe(pipe string) error { return err } - pipesMap[pipe] = true return nil } @@ -193,23 +189,25 @@ func deletePipe(pipe string) error { return err } - delete(pipesMap, pipe) return nil } -// Gpbackup creates the first n pipes. Record these pipes. -func preloadCreatedPipesForBackup(oidList []int, queuedPipeCount int) { - for i := 0; i < queuedPipeCount; i++ { - pipeName := fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) - pipesMap[pipeName] = true +func openClosePipe(filename string) error { + flag := unix.O_NONBLOCK + if *backupAgent { + flag |= os.O_RDONLY + } else if *restoreAgent { + flag |= os.O_WRONLY } -} - -func preloadCreatedPipesForRestore(oidWithBatchList []oidWithBatch, queuedPipeCount int) { - for i := 0; i < queuedPipeCount; i++ { - pipeName := fmt.Sprintf("%s_%d_%d", *pipeFile, oidWithBatchList[i].oid, oidWithBatchList[i].batch) - pipesMap[pipeName] = true + handle, err := os.OpenFile(filename, flag, os.ModeNamedPipe) + if err != nil { + gplog.Debug("Encountered error opening pipe file: %v", err) } + err = handle.Close() + if err != nil { + gplog.Debug("Encountered error closing pipe file: %v", err) + } + return nil } func getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) { @@ -297,14 +295,12 @@ func DoCleanup() { } pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) - for _, pipe := range pipeFiles { - err = utils.OpenClosePipeIfExists(pipe, *backupAgent, *restoreAgent) + for _, pipeName := range pipeFiles { + logVerbose("Opening/closing pipe %s", pipeName) + err = openClosePipe(pipeName) if err != nil { - logVerbose("Encountered error during cleanup pipe files: %v", err) + logVerbose("Encountered error opening/closing pipe %s: %v", pipeName, err) } - } - - for pipeName, _ := range pipesMap { logVerbose("Removing pipe %s", pipeName) err = deletePipe(pipeName) if err != nil { diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 5aaa8987a..152320863 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -224,8 +224,6 @@ func doRestoreAgent() error { } } - preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue) - var currentPipe string // If skip file is detected for the particular tableOid, will not process batches related to this oid diff --git a/utils/util.go b/utils/util.go index 9927218a0..89b9a961a 100644 --- a/utils/util.go +++ b/utils/util.go @@ -56,26 +56,6 @@ func RemoveFileIfExists(filename string) error { return nil } -func OpenClosePipeIfExists(filename string, backupAgent bool, restoreAgent bool) error { - if FileExists(filename) { - flag := unix.O_NONBLOCK - if backupAgent { - flag |= os.O_RDONLY - } else if restoreAgent { - flag |= os.O_WRONLY - } - handle, err := os.OpenFile(filename, flag, os.ModeNamedPipe) - if err != nil { - gplog.Debug("Encountered error opening pipe file: %v", err) - } - err = handle.Close() - if err != nil { - gplog.Debug("Encountered error closing pipe file: %v", err) - } - } - return nil -} - func OpenFileForWrite(filename string) (*os.File, error) { baseFilename := path.Base(filename) file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) From e19a15f4cbbd1a49c591aadb1e891238ec5dc080 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Thu, 14 Nov 2024 17:02:52 +0500 Subject: [PATCH 10/25] remove context --- backup/data.go | 32 ++++++-------------------------- restore/data.go | 22 +++++----------------- 2 files changed, 11 insertions(+), 43 deletions(-) diff --git a/backup/data.go b/backup/data.go index 5d7606f8e..e3d4c37d4 100644 --- a/backup/data.go +++ b/backup/data.go @@ -5,7 +5,6 @@ package backup */ import ( - "context" "errors" "fmt" "strings" @@ -63,7 +62,7 @@ type BackupProgressCounters struct { ProgressBar utils.ProgressBar } -func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) { +func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) { if wasTerminated { return -1, nil } @@ -113,7 +112,7 @@ func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, t } else { utils.LogProgress(`%sExecuting "%s" on master`, workerInfo, query) } - result, err := connectionPool.ExecContext(queryContext, query, connNum) + result, err := connectionPool.Exec(query, connNum) if err != nil { return 0, err } @@ -122,7 +121,7 @@ func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, t return numRows, nil } -func BackupSingleTableData(queryContext context.Context, table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { +func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { workerInfo := "" if gplog.GetVerbosity() >= gplog.LOGVERBOSE { workerInfo = fmt.Sprintf("Worker %d: ", whichConn) @@ -138,7 +137,7 @@ func BackupSingleTableData(queryContext context.Context, table Table, rowsCopied } else { destinationToWrite = globalFPInfo.GetTableBackupFilePathForCopyCommand(table.Oid, utils.GetPipeThroughProgram().Extension, false) } - rowsCopied, err := CopyTableOut(queryContext, connectionPool, table, destinationToWrite, whichConn) + rowsCopied, err := CopyTableOut(connectionPool, table, destinationToWrite, whichConn) if err != nil { return err } @@ -182,9 +181,6 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { tasks <- table } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Make sure it's called to release resources even if no errors - /* * Worker 0 is a special database connection that * 1) Exports the database snapshot if the feature is supported @@ -200,7 +196,6 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { go func(whichConn int) { defer func() { if panicErr := recover(); panicErr != nil { - cancel() panicChan <- fmt.Errorf("%v", panicErr) } }() @@ -217,15 +212,8 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { * transaction commits and the locks are released. */ for table := range tasks { - // Check if any error occurred in any other goroutines: - select { - case <-ctx.Done(): - return // Error somewhere, terminate - default: // Default is must to avoid blocking - } if wasTerminated || isErroredBackup.Load() { counters.ProgressBar.(*pb.ProgressBar).NotPrint = true - cancel() return } if backupSnapshot != "" && connectionPool.Tx[whichConn] == nil { @@ -273,7 +261,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { break } } - err = BackupSingleTableData(ctx, table, rowsCopiedMaps[whichConn], &counters, whichConn) + err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn) if err != nil { // if copy isn't working, skip remaining backups, and let downstream panic // handling deal with it @@ -301,27 +289,19 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { go func() { defer func() { if panicErr := recover(); panicErr != nil { - cancel() panicChan <- fmt.Errorf("%v", panicErr) } }() for _, table := range tables { for { - // Check if any error occurred in any other goroutines: - select { - case <-ctx.Done(): - return // Error somewhere, terminate - default: // Default is must to avoid blocking - } if wasTerminated || isErroredBackup.Load() { - cancel() return } state, _ := oidMap.Load(table.Oid) if state.(int) == Unknown { time.Sleep(time.Millisecond * 50) } else if state.(int) == Deferred { - err := BackupSingleTableData(ctx, table, rowsCopiedMaps[0], &counters, 0) + err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0) if err != nil { isErroredBackup.Store(true) gplog.Fatal(err, "") diff --git a/restore/data.go b/restore/data.go index 25301501a..ac7349d80 100644 --- a/restore/data.go +++ b/restore/data.go @@ -5,7 +5,6 @@ package restore */ import ( - "context" "fmt" "sync" "sync/atomic" @@ -26,7 +25,7 @@ var ( tableDelim = "," ) -func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) { +func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) { if wasTerminated { return -1, nil } @@ -58,7 +57,7 @@ func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, ta } else { utils.LogProgress(`Executing "%s" on master`, query) } - result, err := connectionPool.ExecContext(queryContext, query, whichConn) + result, err := connectionPool.Exec(query, whichConn) if err != nil { errStr := fmt.Sprintf("Error loading data into table %s", tableName) @@ -77,7 +76,7 @@ func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, ta return rowsLoaded, nil } -func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int) error { +func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int) error { origSize, destSize, resizeCluster, batches := GetResizeClusterInfo() var numRowsRestored int64 @@ -110,7 +109,7 @@ func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FileP gplog.FatalOnError(agentErr) } - partialRowsRestored, copyErr := CopyTableIn(queryContext, connectionPool, tableName, entry.AttributeString, destinationToRead, backupConfig.SingleDataFile, whichConn) + partialRowsRestored, copyErr := CopyTableIn(connectionPool, tableName, entry.AttributeString, destinationToRead, backupConfig.SingleDataFile, whichConn) if copyErr != nil { gplog.Error(copyErr.Error()) @@ -255,15 +254,12 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co var numErrors int32 var mutex = &sync.Mutex{} panicChan := make(chan error) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Make sure it's called to release resources even if no errors for i := 0; i < connectionPool.NumConns; i++ { workerPool.Add(1) go func(whichConn int) { defer func() { if panicErr := recover(); panicErr != nil { - cancel() panicChan <- fmt.Errorf("%v", panicErr) } }() @@ -271,15 +267,8 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co setGUCsForConnection(gucStatements, whichConn) for entry := range tasks { - // Check if any error occurred in any other goroutines: - select { - case <-ctx.Done(): - return // Error somewhere, terminate - default: // Default is must to avoid blocking - } if wasTerminated { dataProgressBar.(*pb.ProgressBar).NotPrint = true - cancel() return } tableName := utils.MakeFQN(entry.Schema, entry.Name) @@ -296,14 +285,13 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co } } if err == nil { - err = restoreSingleTableData(ctx, &fpInfo, entry, tableName, whichConn) + err = restoreSingleTableData(&fpInfo, entry, tableName, whichConn) } if err != nil { atomic.AddInt32(&numErrors, 1) if !MustGetFlagBool(options.ON_ERROR_CONTINUE) { dataProgressBar.(*pb.ProgressBar).NotPrint = true - cancel() return } mutex.Lock() From a30092de4590cf573fa8f7bf982519c653ba41e2 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Thu, 14 Nov 2024 17:09:14 +0500 Subject: [PATCH 11/25] fix tests --- backup/data_test.go | 19 +++++++++---------- end_to_end/end_to_end_suite_test.go | 3 ++- restore/data_test.go | 17 ++++++++--------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/backup/data_test.go b/backup/data_test.go index 3fd8fe038..2743d1328 100644 --- a/backup/data_test.go +++ b/backup/data_test.go @@ -1,7 +1,6 @@ package backup_test import ( - "context" "fmt" "regexp" @@ -80,7 +79,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz" - _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -93,7 +92,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -103,7 +102,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst" - _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -116,7 +115,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -126,7 +125,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -139,7 +138,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -149,7 +148,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -179,7 +178,7 @@ var _ = Describe("backup/data tests", func() { backupFile := fmt.Sprintf("/gpbackup__20170101010101_pipe_(.*)_%d", testTable.Oid) copyCmd := fmt.Sprintf(copyFmtStr, backupFile) mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10)) - err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0) + err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0) Expect(err).ShouldNot(HaveOccurred()) Expect(rowsCopiedMap[0]).To(Equal(int64(10))) @@ -191,7 +190,7 @@ var _ = Describe("backup/data tests", func() { backupFile := fmt.Sprintf("/backups/20170101/20170101010101/gpbackup__20170101010101_%d", testTable.Oid) copyCmd := fmt.Sprintf(copyFmtStr, backupFile) mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10)) - err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0) + err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0) Expect(err).ShouldNot(HaveOccurred()) Expect(rowsCopiedMap[0]).To(Equal(int64(10))) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 127aa81f6..31b691944 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2579,8 +2579,9 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--jobs", "3") output, err := gprestoreCmd.CombinedOutput() Expect(err).To(HaveOccurred()) - Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t0`)) + // Expect(string(output)).To(ContainSubstring(`Restored data to table public.t0 from file`)) Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t1`)) + Expect(string(output)).To(ContainSubstring(`Expected to restore 1000000 rows to table public.t2, but restored 0 instead`)) assertArtifactsCleaned("20240502095933") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") }) diff --git a/restore/data_test.go b/restore/data_test.go index b1667c8dc..6f2fd1241 100644 --- a/restore/data_test.go +++ b/restore/data_test.go @@ -1,7 +1,6 @@ package restore_test import ( - "context" "fmt" "os" "regexp" @@ -35,7 +34,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz | gzip -d -c' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz" - _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -44,7 +43,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst | zstd --decompress -c' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst" - _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -52,7 +51,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_3456' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -60,7 +59,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta(fmt.Sprintf("COPY public.foo(i,j) FROM PROGRAM '(timeout --foreground 300 bash -c \"while [[ ! -p \"/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456\" && ! -f \"/gpbackup__20170101010101_pipe_%d_error\" ]]; do sleep 1; done\" || (echo \"Pipe not found /backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456\">&2; exit 1)) && cat /backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456' WITH CSV DELIMITER ',' ON SEGMENT", os.Getpid())) mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456" - _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, true, 0) + _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, true, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -73,7 +72,7 @@ var _ = Describe("restore/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456.gz" - _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -86,7 +85,7 @@ var _ = Describe("restore/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456.zst" - _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -98,7 +97,7 @@ var _ = Describe("restore/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456.gz" - _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -112,7 +111,7 @@ var _ = Describe("restore/data tests", func() { } mock.ExpectExec(execStr).WillReturnError(pgErr) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Error loading data into table public.foo: " + From c5dd6db041a6b2f1b5eb23042e5023fb8b142f80 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Thu, 14 Nov 2024 17:14:50 +0500 Subject: [PATCH 12/25] rm --- end_to_end/end_to_end_suite_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 31b691944..1fc283293 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2579,7 +2579,6 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--jobs", "3") output, err := gprestoreCmd.CombinedOutput() Expect(err).To(HaveOccurred()) - // Expect(string(output)).To(ContainSubstring(`Restored data to table public.t0 from file`)) Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t1`)) Expect(string(output)).To(ContainSubstring(`Expected to restore 1000000 rows to table public.t2, but restored 0 instead`)) assertArtifactsCleaned("20240502095933") From 5181a107bf3afa59dec6cbe7ee939230b23c2a40 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 15 Nov 2024 08:45:31 +0500 Subject: [PATCH 13/25] stabellize test --- end_to_end/end_to_end_suite_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 1fc283293..62de50448 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2579,8 +2579,9 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--jobs", "3") output, err := gprestoreCmd.CombinedOutput() Expect(err).To(HaveOccurred()) - Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t1`)) + Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`)) Expect(string(output)).To(ContainSubstring(`Expected to restore 1000000 rows to table public.t2, but restored 0 instead`)) + assertDataRestored(restoreConn, map[string]int{"public.t0": 1000000}) assertArtifactsCleaned("20240502095933") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") }) From 1907c3cd0202da4cb236d386ecba2e1cbc8ee614 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 15 Nov 2024 10:23:03 +0500 Subject: [PATCH 14/25] signal pipe instead open/close --- end_to_end/end_to_end_suite_test.go | 4 ++-- helper/helper.go | 25 +++++++++---------------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 62de50448..ad282d593 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2579,9 +2579,9 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--jobs", "3") output, err := gprestoreCmd.CombinedOutput() Expect(err).To(HaveOccurred()) - Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`)) - Expect(string(output)).To(ContainSubstring(`Expected to restore 1000000 rows to table public.t2, but restored 0 instead`)) assertDataRestored(restoreConn, map[string]int{"public.t0": 1000000}) + Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`)) + Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2: ERROR: command error message`)) assertArtifactsCleaned("20240502095933") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") }) diff --git a/helper/helper.go b/helper/helper.go index 82de00e34..bd3da7dcd 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "os" + "os/exec" "os/signal" "path/filepath" "strconv" @@ -192,20 +193,12 @@ func deletePipe(pipe string) error { return nil } -func openClosePipe(filename string) error { - flag := unix.O_NONBLOCK - if *backupAgent { - flag |= os.O_RDONLY - } else if *restoreAgent { - flag |= os.O_WRONLY - } - handle, err := os.OpenFile(filename, flag, os.ModeNamedPipe) - if err != nil { - gplog.Debug("Encountered error opening pipe file: %v", err) - } - err = handle.Close() +func signalPipe(filename string) error { + out, err := exec.Command("pkill", "-SIGPIPE", "-efx", fmt.Sprintf("cat %s", filename)).CombinedOutput() if err != nil { - gplog.Debug("Encountered error closing pipe file: %v", err) + gplog.Debug("Cannot pkill %s: %v: %v", filename, string(out), err) + } else { + gplog.Debug("Can pkill %s: %v", filename, string(out)) } return nil } @@ -296,10 +289,10 @@ func DoCleanup() { pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) for _, pipeName := range pipeFiles { - logVerbose("Opening/closing pipe %s", pipeName) - err = openClosePipe(pipeName) + logVerbose("Signaling pipe %s", pipeName) + err = signalPipe(pipeName) if err != nil { - logVerbose("Encountered error opening/closing pipe %s: %v", pipeName, err) + logVerbose("Encountered error signaling pipe %s: %v", pipeName, err) } logVerbose("Removing pipe %s", pipeName) err = deletePipe(pipeName) From 2a83c5b25c98dc612aad4e8ad052ac32f564ba23 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 15 Nov 2024 10:54:02 +0500 Subject: [PATCH 15/25] update log messages --- helper/helper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index bd3da7dcd..86a373c31 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -196,9 +196,9 @@ func deletePipe(pipe string) error { func signalPipe(filename string) error { out, err := exec.Command("pkill", "-SIGPIPE", "-efx", fmt.Sprintf("cat %s", filename)).CombinedOutput() if err != nil { - gplog.Debug("Cannot pkill %s: %v: %v", filename, string(out), err) + gplog.Debug("Cannot signal %s: %s: %v", filename, string(out), err) } else { - gplog.Debug("Can pkill %s: %v", filename, string(out)) + gplog.Debug("Can signal %s: %s", filename, string(out)) } return nil } From a1628ea26a7d041f600d4a2fa8c8cbdec9adbdf3 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 15 Nov 2024 13:42:38 +0500 Subject: [PATCH 16/25] restore contexts --- backup/data.go | 32 +++++++++++++++++++++++------ backup/data_test.go | 19 +++++++++-------- end_to_end/end_to_end_suite_test.go | 4 ++-- restore/data.go | 22 +++++++++++++++----- restore/data_test.go | 17 +++++++-------- 5 files changed, 64 insertions(+), 30 deletions(-) diff --git a/backup/data.go b/backup/data.go index e3d4c37d4..5d7606f8e 100644 --- a/backup/data.go +++ b/backup/data.go @@ -5,6 +5,7 @@ package backup */ import ( + "context" "errors" "fmt" "strings" @@ -62,7 +63,7 @@ type BackupProgressCounters struct { ProgressBar utils.ProgressBar } -func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) { +func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) { if wasTerminated { return -1, nil } @@ -112,7 +113,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite } else { utils.LogProgress(`%sExecuting "%s" on master`, workerInfo, query) } - result, err := connectionPool.Exec(query, connNum) + result, err := connectionPool.ExecContext(queryContext, query, connNum) if err != nil { return 0, err } @@ -121,7 +122,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite return numRows, nil } -func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { +func BackupSingleTableData(queryContext context.Context, table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { workerInfo := "" if gplog.GetVerbosity() >= gplog.LOGVERBOSE { workerInfo = fmt.Sprintf("Worker %d: ", whichConn) @@ -137,7 +138,7 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters } else { destinationToWrite = globalFPInfo.GetTableBackupFilePathForCopyCommand(table.Oid, utils.GetPipeThroughProgram().Extension, false) } - rowsCopied, err := CopyTableOut(connectionPool, table, destinationToWrite, whichConn) + rowsCopied, err := CopyTableOut(queryContext, connectionPool, table, destinationToWrite, whichConn) if err != nil { return err } @@ -181,6 +182,9 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { tasks <- table } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure it's called to release resources even if no errors + /* * Worker 0 is a special database connection that * 1) Exports the database snapshot if the feature is supported @@ -196,6 +200,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { go func(whichConn int) { defer func() { if panicErr := recover(); panicErr != nil { + cancel() panicChan <- fmt.Errorf("%v", panicErr) } }() @@ -212,8 +217,15 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { * transaction commits and the locks are released. */ for table := range tasks { + // Check if any error occurred in any other goroutines: + select { + case <-ctx.Done(): + return // Error somewhere, terminate + default: // Default is must to avoid blocking + } if wasTerminated || isErroredBackup.Load() { counters.ProgressBar.(*pb.ProgressBar).NotPrint = true + cancel() return } if backupSnapshot != "" && connectionPool.Tx[whichConn] == nil { @@ -261,7 +273,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { break } } - err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn) + err = BackupSingleTableData(ctx, table, rowsCopiedMaps[whichConn], &counters, whichConn) if err != nil { // if copy isn't working, skip remaining backups, and let downstream panic // handling deal with it @@ -289,19 +301,27 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { go func() { defer func() { if panicErr := recover(); panicErr != nil { + cancel() panicChan <- fmt.Errorf("%v", panicErr) } }() for _, table := range tables { for { + // Check if any error occurred in any other goroutines: + select { + case <-ctx.Done(): + return // Error somewhere, terminate + default: // Default is must to avoid blocking + } if wasTerminated || isErroredBackup.Load() { + cancel() return } state, _ := oidMap.Load(table.Oid) if state.(int) == Unknown { time.Sleep(time.Millisecond * 50) } else if state.(int) == Deferred { - err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0) + err := BackupSingleTableData(ctx, table, rowsCopiedMaps[0], &counters, 0) if err != nil { isErroredBackup.Store(true) gplog.Fatal(err, "") diff --git a/backup/data_test.go b/backup/data_test.go index 2743d1328..3fd8fe038 100644 --- a/backup/data_test.go +++ b/backup/data_test.go @@ -1,6 +1,7 @@ package backup_test import ( + "context" "fmt" "regexp" @@ -79,7 +80,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -92,7 +93,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -102,7 +103,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -115,7 +116,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -125,7 +126,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -138,7 +139,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -148,7 +149,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -178,7 +179,7 @@ var _ = Describe("backup/data tests", func() { backupFile := fmt.Sprintf("/gpbackup__20170101010101_pipe_(.*)_%d", testTable.Oid) copyCmd := fmt.Sprintf(copyFmtStr, backupFile) mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10)) - err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0) + err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0) Expect(err).ShouldNot(HaveOccurred()) Expect(rowsCopiedMap[0]).To(Equal(int64(10))) @@ -190,7 +191,7 @@ var _ = Describe("backup/data tests", func() { backupFile := fmt.Sprintf("/backups/20170101/20170101010101/gpbackup__20170101010101_%d", testTable.Oid) copyCmd := fmt.Sprintf(copyFmtStr, backupFile) mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10)) - err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0) + err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0) Expect(err).ShouldNot(HaveOccurred()) Expect(rowsCopiedMap[0]).To(Equal(int64(10))) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index ad282d593..0758f312b 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2579,9 +2579,9 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--jobs", "3") output, err := gprestoreCmd.CombinedOutput() Expect(err).To(HaveOccurred()) - assertDataRestored(restoreConn, map[string]int{"public.t0": 1000000}) + Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t0: timeout: context canceled`)) Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`)) - Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2: ERROR: command error message`)) + Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2: timeout: context canceled`)) assertArtifactsCleaned("20240502095933") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") }) diff --git a/restore/data.go b/restore/data.go index ac7349d80..25301501a 100644 --- a/restore/data.go +++ b/restore/data.go @@ -5,6 +5,7 @@ package restore */ import ( + "context" "fmt" "sync" "sync/atomic" @@ -25,7 +26,7 @@ var ( tableDelim = "," ) -func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) { +func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) { if wasTerminated { return -1, nil } @@ -57,7 +58,7 @@ func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttribute } else { utils.LogProgress(`Executing "%s" on master`, query) } - result, err := connectionPool.Exec(query, whichConn) + result, err := connectionPool.ExecContext(queryContext, query, whichConn) if err != nil { errStr := fmt.Sprintf("Error loading data into table %s", tableName) @@ -76,7 +77,7 @@ func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttribute return rowsLoaded, nil } -func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int) error { +func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int) error { origSize, destSize, resizeCluster, batches := GetResizeClusterInfo() var numRowsRestored int64 @@ -109,7 +110,7 @@ func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.Coordinator gplog.FatalOnError(agentErr) } - partialRowsRestored, copyErr := CopyTableIn(connectionPool, tableName, entry.AttributeString, destinationToRead, backupConfig.SingleDataFile, whichConn) + partialRowsRestored, copyErr := CopyTableIn(queryContext, connectionPool, tableName, entry.AttributeString, destinationToRead, backupConfig.SingleDataFile, whichConn) if copyErr != nil { gplog.Error(copyErr.Error()) @@ -254,12 +255,15 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co var numErrors int32 var mutex = &sync.Mutex{} panicChan := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure it's called to release resources even if no errors for i := 0; i < connectionPool.NumConns; i++ { workerPool.Add(1) go func(whichConn int) { defer func() { if panicErr := recover(); panicErr != nil { + cancel() panicChan <- fmt.Errorf("%v", panicErr) } }() @@ -267,8 +271,15 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co setGUCsForConnection(gucStatements, whichConn) for entry := range tasks { + // Check if any error occurred in any other goroutines: + select { + case <-ctx.Done(): + return // Error somewhere, terminate + default: // Default is must to avoid blocking + } if wasTerminated { dataProgressBar.(*pb.ProgressBar).NotPrint = true + cancel() return } tableName := utils.MakeFQN(entry.Schema, entry.Name) @@ -285,13 +296,14 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co } } if err == nil { - err = restoreSingleTableData(&fpInfo, entry, tableName, whichConn) + err = restoreSingleTableData(ctx, &fpInfo, entry, tableName, whichConn) } if err != nil { atomic.AddInt32(&numErrors, 1) if !MustGetFlagBool(options.ON_ERROR_CONTINUE) { dataProgressBar.(*pb.ProgressBar).NotPrint = true + cancel() return } mutex.Lock() diff --git a/restore/data_test.go b/restore/data_test.go index 6f2fd1241..b1667c8dc 100644 --- a/restore/data_test.go +++ b/restore/data_test.go @@ -1,6 +1,7 @@ package restore_test import ( + "context" "fmt" "os" "regexp" @@ -34,7 +35,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz | gzip -d -c' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -43,7 +44,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst | zstd --decompress -c' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -51,7 +52,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_3456' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -59,7 +60,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta(fmt.Sprintf("COPY public.foo(i,j) FROM PROGRAM '(timeout --foreground 300 bash -c \"while [[ ! -p \"/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456\" && ! -f \"/gpbackup__20170101010101_pipe_%d_error\" ]]; do sleep 1; done\" || (echo \"Pipe not found /backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456\">&2; exit 1)) && cat /backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456' WITH CSV DELIMITER ',' ON SEGMENT", os.Getpid())) mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, true, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, true, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -72,7 +73,7 @@ var _ = Describe("restore/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456.gz" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -85,7 +86,7 @@ var _ = Describe("restore/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456.zst" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -97,7 +98,7 @@ var _ = Describe("restore/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456.gz" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -111,7 +112,7 @@ var _ = Describe("restore/data tests", func() { } mock.ExpectExec(execStr).WillReturnError(pgErr) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Error loading data into table public.foo: " + From 8a94b59a4d4114d31a5a6c2f930f9f409710e096 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 15 Nov 2024 13:46:12 +0500 Subject: [PATCH 17/25] simplify --- helper/helper.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index 86a373c31..923b244ea 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -193,16 +193,6 @@ func deletePipe(pipe string) error { return nil } -func signalPipe(filename string) error { - out, err := exec.Command("pkill", "-SIGPIPE", "-efx", fmt.Sprintf("cat %s", filename)).CombinedOutput() - if err != nil { - gplog.Debug("Cannot signal %s: %s: %v", filename, string(out), err) - } else { - gplog.Debug("Can signal %s: %s", filename, string(out)) - } - return nil -} - func getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) { oidStr, err := operating.System.ReadFile(oidFileName) if err != nil { @@ -289,10 +279,11 @@ func DoCleanup() { pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) for _, pipeName := range pipeFiles { - logVerbose("Signaling pipe %s", pipeName) - err = signalPipe(pipeName) + out, err := exec.Command("pkill", "-SIGPIPE", "-efx", fmt.Sprintf("cat %s", pipeName)).CombinedOutput() if err != nil { - logVerbose("Encountered error signaling pipe %s: %v", pipeName, err) + gplog.Debug("Cannot signal to pipe %s: %s: %v", pipeName, string(out), err) + } else { + gplog.Debug("Pipe %s signalled: %s", pipeName, string(out)) } logVerbose("Removing pipe %s", pipeName) err = deletePipe(pipeName) From 9fb4727b433da735f3cf5f0f131546e946116039 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 15 Nov 2024 15:25:37 +0500 Subject: [PATCH 18/25] stabellize test --- end_to_end/end_to_end_suite_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 0758f312b..d8ead853a 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2579,7 +2579,6 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--jobs", "3") output, err := gprestoreCmd.CombinedOutput() Expect(err).To(HaveOccurred()) - Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t0: timeout: context canceled`)) Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`)) Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2: timeout: context canceled`)) assertArtifactsCleaned("20240502095933") From 484b24276617692865320f25d02eb0be6151b0cd Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 15 Nov 2024 20:28:31 +0500 Subject: [PATCH 19/25] open/close --- helper/helper.go | 26 +++++++++++++++++++++----- restore/data.go | 2 ++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index 923b244ea..82de00e34 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -5,7 +5,6 @@ import ( "flag" "fmt" "os" - "os/exec" "os/signal" "path/filepath" "strconv" @@ -193,6 +192,24 @@ func deletePipe(pipe string) error { return nil } +func openClosePipe(filename string) error { + flag := unix.O_NONBLOCK + if *backupAgent { + flag |= os.O_RDONLY + } else if *restoreAgent { + flag |= os.O_WRONLY + } + handle, err := os.OpenFile(filename, flag, os.ModeNamedPipe) + if err != nil { + gplog.Debug("Encountered error opening pipe file: %v", err) + } + err = handle.Close() + if err != nil { + gplog.Debug("Encountered error closing pipe file: %v", err) + } + return nil +} + func getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) { oidStr, err := operating.System.ReadFile(oidFileName) if err != nil { @@ -279,11 +296,10 @@ func DoCleanup() { pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) for _, pipeName := range pipeFiles { - out, err := exec.Command("pkill", "-SIGPIPE", "-efx", fmt.Sprintf("cat %s", pipeName)).CombinedOutput() + logVerbose("Opening/closing pipe %s", pipeName) + err = openClosePipe(pipeName) if err != nil { - gplog.Debug("Cannot signal to pipe %s: %s: %v", pipeName, string(out), err) - } else { - gplog.Debug("Pipe %s signalled: %s", pipeName, string(out)) + logVerbose("Encountered error opening/closing pipe %s: %v", pipeName, err) } logVerbose("Removing pipe %s", pipeName) err = deletePipe(pipeName) diff --git a/restore/data.go b/restore/data.go index 25301501a..e267f9d55 100644 --- a/restore/data.go +++ b/restore/data.go @@ -9,6 +9,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" @@ -303,6 +304,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co atomic.AddInt32(&numErrors, 1) if !MustGetFlagBool(options.ON_ERROR_CONTINUE) { dataProgressBar.(*pb.ProgressBar).NotPrint = true + time.Sleep(1 * time.Second) cancel() return } From 5fe1f3cf40e3b8a223ea18aa4fbeca4cb2cbfe01 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 15 Nov 2024 21:12:08 +0500 Subject: [PATCH 20/25] timeout --- helper/helper.go | 2 ++ restore/data.go | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index 82de00e34..c3d9dc941 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "time" "golang.org/x/sys/unix" @@ -294,6 +295,7 @@ func DoCleanup() { logVerbose("Encountered error during cleanup: %v", err) } + time.Sleep(1 * time.Second) pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) for _, pipeName := range pipeFiles { logVerbose("Opening/closing pipe %s", pipeName) diff --git a/restore/data.go b/restore/data.go index e267f9d55..25301501a 100644 --- a/restore/data.go +++ b/restore/data.go @@ -9,7 +9,6 @@ import ( "fmt" "sync" "sync/atomic" - "time" "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" @@ -304,7 +303,6 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co atomic.AddInt32(&numErrors, 1) if !MustGetFlagBool(options.ON_ERROR_CONTINUE) { dataProgressBar.(*pb.ProgressBar).NotPrint = true - time.Sleep(1 * time.Second) cancel() return } From 0c9f756be1cb5f4568e0316ef4175ac31ef9577a Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Wed, 20 Nov 2024 14:49:00 +0500 Subject: [PATCH 21/25] open/close pipe inly if not terminated --- helper/helper.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index c3d9dc941..25d78c2d1 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -10,7 +10,6 @@ import ( "strconv" "strings" "sync" - "time" "golang.org/x/sys/unix" @@ -202,11 +201,11 @@ func openClosePipe(filename string) error { } handle, err := os.OpenFile(filename, flag, os.ModeNamedPipe) if err != nil { - gplog.Debug("Encountered error opening pipe file: %v", err) + return err } err = handle.Close() if err != nil { - gplog.Debug("Encountered error closing pipe file: %v", err) + return err } return nil } @@ -295,13 +294,14 @@ func DoCleanup() { logVerbose("Encountered error during cleanup: %v", err) } - time.Sleep(1 * time.Second) pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) for _, pipeName := range pipeFiles { - logVerbose("Opening/closing pipe %s", pipeName) - err = openClosePipe(pipeName) - if err != nil { - logVerbose("Encountered error opening/closing pipe %s: %v", pipeName, err) + if !wasTerminated { + logVerbose("Opening/closing pipe %s", pipeName) + err = openClosePipe(pipeName) + if err != nil { + logVerbose("Encountered error opening/closing pipe %s: %v", pipeName, err) + } } logVerbose("Removing pipe %s", pipeName) err = deletePipe(pipeName) From a5e1a93ffdb70b614f387610372fbe99016b8903 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Wed, 20 Nov 2024 15:19:51 +0500 Subject: [PATCH 22/25] sigpiped --- helper/helper.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/helper/helper.go b/helper/helper.go index 25d78c2d1..656da7317 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -26,6 +26,7 @@ var ( CleanupGroup *sync.WaitGroup version string wasTerminated bool + wasSigpiped bool writeHandle *os.File writer *bufio.Writer ) @@ -136,6 +137,7 @@ func InitializeSignalHandler() { signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, unix.SIGINT, unix.SIGTERM, unix.SIGPIPE, unix.SIGUSR1) terminatedChan := make(chan bool, 1) + sigpipedChan := make(chan bool, 1) for { go func() { sig := <-signalChan @@ -143,11 +145,14 @@ func InitializeSignalHandler() { switch sig { case unix.SIGINT: gplog.Warn("Received an interrupt signal on segment %d: aborting", *content) + sigpipedChan <- false terminatedChan <- true case unix.SIGTERM: gplog.Warn("Received a termination signal on segment %d: aborting", *content) + sigpipedChan <- false terminatedChan <- true case unix.SIGPIPE: + sigpipedChan <- true if *onErrorContinue { gplog.Warn("Received a broken pipe signal on segment %d: on-error-continue set, continuing", *content) terminatedChan <- false @@ -157,9 +162,11 @@ func InitializeSignalHandler() { } case unix.SIGUSR1: gplog.Warn("Received shutdown request on segment %d: beginning cleanup", *content) + sigpipedChan <- false terminatedChan <- true } }() + wasSigpiped = <-sigpipedChan wasTerminated = <-terminatedChan if wasTerminated { DoCleanup() @@ -296,7 +303,7 @@ func DoCleanup() { pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) for _, pipeName := range pipeFiles { - if !wasTerminated { + if !wasSigpiped { logVerbose("Opening/closing pipe %s", pipeName) err = openClosePipe(pipeName) if err != nil { From 5ddd474d94b1f976b20a782fd17eb2a191972996 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 22 Nov 2024 12:49:12 +0500 Subject: [PATCH 23/25] comment --- helper/helper.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/helper/helper.go b/helper/helper.go index 656da7317..b5035ef29 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -304,6 +304,10 @@ func DoCleanup() { pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) for _, pipeName := range pipeFiles { if !wasSigpiped { + /* + * The main process doesn't know about the error yet, so it needs to + * open/close pipes so that the COPY commands hanging on them can complete. + */ logVerbose("Opening/closing pipe %s", pipeName) err = openClosePipe(pipeName) if err != nil { From 8fa7ff06704e083a82933da33f9e0d78231f9e8b Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 29 Nov 2024 20:42:38 +0500 Subject: [PATCH 24/25] use atomic --- helper/helper.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index b5035ef29..3b5ed000b 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "golang.org/x/sys/unix" @@ -26,7 +27,7 @@ var ( CleanupGroup *sync.WaitGroup version string wasTerminated bool - wasSigpiped bool + wasSigpiped atomic.Bool writeHandle *os.File writer *bufio.Writer ) @@ -137,7 +138,6 @@ func InitializeSignalHandler() { signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, unix.SIGINT, unix.SIGTERM, unix.SIGPIPE, unix.SIGUSR1) terminatedChan := make(chan bool, 1) - sigpipedChan := make(chan bool, 1) for { go func() { sig := <-signalChan @@ -145,14 +145,14 @@ func InitializeSignalHandler() { switch sig { case unix.SIGINT: gplog.Warn("Received an interrupt signal on segment %d: aborting", *content) - sigpipedChan <- false + wasSigpiped.Store(false) terminatedChan <- true case unix.SIGTERM: gplog.Warn("Received a termination signal on segment %d: aborting", *content) - sigpipedChan <- false + wasSigpiped.Store(false) terminatedChan <- true case unix.SIGPIPE: - sigpipedChan <- true + wasSigpiped.Store(true) if *onErrorContinue { gplog.Warn("Received a broken pipe signal on segment %d: on-error-continue set, continuing", *content) terminatedChan <- false @@ -162,11 +162,10 @@ func InitializeSignalHandler() { } case unix.SIGUSR1: gplog.Warn("Received shutdown request on segment %d: beginning cleanup", *content) - sigpipedChan <- false + wasSigpiped.Store(false) terminatedChan <- true } }() - wasSigpiped = <-sigpipedChan wasTerminated = <-terminatedChan if wasTerminated { DoCleanup() @@ -303,7 +302,7 @@ func DoCleanup() { pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) for _, pipeName := range pipeFiles { - if !wasSigpiped { + if !wasSigpiped.Load() { /* * The main process doesn't know about the error yet, so it needs to * open/close pipes so that the COPY commands hanging on them can complete. From e1d0fad0ce2369cea2a0649adb06515e90b1af94 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Sat, 30 Nov 2024 08:14:40 +0500 Subject: [PATCH 25/25] simplify --- helper/helper.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/helper/helper.go b/helper/helper.go index 3b5ed000b..5c0a30675 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -145,11 +145,9 @@ func InitializeSignalHandler() { switch sig { case unix.SIGINT: gplog.Warn("Received an interrupt signal on segment %d: aborting", *content) - wasSigpiped.Store(false) terminatedChan <- true case unix.SIGTERM: gplog.Warn("Received a termination signal on segment %d: aborting", *content) - wasSigpiped.Store(false) terminatedChan <- true case unix.SIGPIPE: wasSigpiped.Store(true) @@ -162,7 +160,6 @@ func InitializeSignalHandler() { } case unix.SIGUSR1: gplog.Warn("Received shutdown request on segment %d: beginning cleanup", *content) - wasSigpiped.Store(false) terminatedChan <- true } }()