Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADBDEV-6641: Rework Fix gprestore/gpbackup hanging in case the helper goes down #113

Merged
merged 28 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,6 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors

// Launch a checker that polls if the backup helper has ended with an error. It will cancel all pending
// COPY commands that could be hanging on pipes, that the backup helper didn't close before it died.
if MustGetFlagBool(options.SINGLE_DATA_FILE) {
utils.StartHelperChecker(globalCluster, globalFPInfo, cancel)
}

/*
* Worker 0 is a special database connection that
* 1) Exports the database snapshot if the feature is supported
Expand Down
4 changes: 2 additions & 2 deletions end_to_end/end_to_end_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2579,8 +2579,8 @@ LANGUAGE plpgsql NO SQL;`)
"--resize-cluster", "--jobs", "3")
output, err := gprestoreCmd.CombinedOutput()
Expect(err).To(HaveOccurred())
Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t1`))
Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2`))
Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`))
Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2: timeout: context canceled`))
assertArtifactsCleaned("20240502095933")
testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;")
})
Expand Down
1 change: 0 additions & 1 deletion helper/backup_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func doBackupAgent() error {
return err
}

preloadCreatedPipesForBackup(oidList, *copyQueue)
var currentPipe string
var errBuf bytes.Buffer
/*
Expand Down
47 changes: 30 additions & 17 deletions helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"

"golang.org/x/sys/unix"

Expand All @@ -26,9 +27,9 @@ var (
CleanupGroup *sync.WaitGroup
version string
wasTerminated bool
wasSigpiped atomic.Bool
writeHandle *os.File
writer *bufio.Writer
pipesMap map[string]bool
)

/*
Expand Down Expand Up @@ -129,8 +130,6 @@ func InitializeGlobals() {
}
operating.InitializeSystemFunctions()

pipesMap = make(map[string]bool, 0)

gplog.InitializeLogging("gpbackup_helper", "")
gplog.SetLogFileVerbosity(*verbosity)
}
Expand All @@ -151,6 +150,7 @@ func InitializeSignalHandler() {
gplog.Warn("Received a termination signal on segment %d: aborting", *content)
terminatedChan <- true
case unix.SIGPIPE:
wasSigpiped.Store(true)
if *onErrorContinue {
gplog.Warn("Received a broken pipe signal on segment %d: on-error-continue set, continuing", *content)
terminatedChan <- false
Expand Down Expand Up @@ -183,7 +183,6 @@ func createPipe(pipe string) error {
return err
}

pipesMap[pipe] = true
return nil
}

Expand All @@ -193,23 +192,25 @@ func deletePipe(pipe string) error {
return err
}

delete(pipesMap, pipe)
return nil
}

// Gpbackup creates the first n pipes. Record these pipes.
func preloadCreatedPipesForBackup(oidList []int, queuedPipeCount int) {
for i := 0; i < queuedPipeCount; i++ {
pipeName := fmt.Sprintf("%s_%d", *pipeFile, oidList[i])
pipesMap[pipeName] = true
func openClosePipe(filename string) error {
flag := unix.O_NONBLOCK
if *backupAgent {
flag |= os.O_RDONLY
} else if *restoreAgent {
flag |= os.O_WRONLY
}
}

func preloadCreatedPipesForRestore(oidWithBatchList []oidWithBatch, queuedPipeCount int) {
for i := 0; i < queuedPipeCount; i++ {
pipeName := fmt.Sprintf("%s_%d_%d", *pipeFile, oidWithBatchList[i].oid, oidWithBatchList[i].batch)
pipesMap[pipeName] = true
handle, err := os.OpenFile(filename, flag, os.ModeNamedPipe)
if err != nil {
return err
}
err = handle.Close()
if err != nil {
return err
}
return nil
}

func getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) {
Expand Down Expand Up @@ -296,7 +297,19 @@ func DoCleanup() {
logVerbose("Encountered error during cleanup: %v", err)
}

for pipeName, _ := range pipesMap {
pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile))
for _, pipeName := range pipeFiles {
if !wasSigpiped.Load() {
/*
* The main process doesn't know about the error yet, so it needs to
* open/close pipes so that the COPY commands hanging on them can complete.
*/
logVerbose("Opening/closing pipe %s", pipeName)
err = openClosePipe(pipeName)
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logVerbose("Encountered error opening/closing pipe %s: %v", pipeName, err)
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
}
}
logVerbose("Removing pipe %s", pipeName)
err = deletePipe(pipeName)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@ func doRestoreAgent() error {
}
}

preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue)

var currentPipe string

// If skip file is detected for the particular tableOid, will not process batches related to this oid
Expand Down
6 changes: 0 additions & 6 deletions restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,6 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors

// Launch a checker that polls if the restore helper has ended with an error. It will cancel all pending
// COPY commands that could be hanging on pipes, that the restore helper didn't close before it died.
if backupConfig.SingleDataFile || resizeCluster {
utils.StartHelperChecker(globalCluster, globalFPInfo, cancel)
}

for i := 0; i < connectionPool.NumConns; i++ {
workerPool.Add(1)
go func(whichConn int) {
Expand Down
16 changes: 0 additions & 16 deletions utils/agent_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,19 +362,3 @@ func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster,
return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID), oid)
})
}

func StartHelperChecker(cl *cluster.Cluster, fpInfo filepath.FilePathInfo, cancel func()) {
go func() {
for {
time.Sleep(5 * time.Second)
remoteOutput := cl.GenerateAndExecuteCommand("Checking gpbackup_helper agent failure", cluster.ON_SEGMENTS, func(contentID int) string {
helperErrorFileName := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID))
return fmt.Sprintf("! ls %s", helperErrorFileName)
})
if remoteOutput.NumErrors != 0 {
gplog.Error("gpbackup_helper failed to start on some segments")
cancel()
}
}
}()
}
Loading