diff --git a/replication/backup.go b/replication/backup.go index d4b1d9eb1..91dd603f4 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -32,8 +32,11 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du // - timeout: The maximum duration to wait for new binlog events before stopping the backup process. // If set to 0, a default very long timeout (30 days) is used instead. // - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to. -func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, - handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error) { +func (b *BinlogSyncer) StartBackupWithHandler( + p Position, + timeout time.Duration, + handler func(binlogFilename string) (io.WriteCloser, error), +) (retErr error) { if timeout == 0 { // a very long timeout here timeout = 30 * 3600 * 24 * time.Second @@ -47,8 +50,10 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, handler: handler, } - // Set the event handler in BinlogSyncer - b.SetEventHandler(backupHandler) + if b.cfg.SyncMode == SyncModeSync { + // Set the event handler in BinlogSyncer for synchronous mode + b.SetEventHandler(backupHandler) + } // Start syncing s, err := b.StartSync(p) @@ -70,13 +75,33 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - select { - case <-ctx.Done(): - return nil - case <-b.ctx.Done(): - return nil - case err := <-s.ech: - return errors.Trace(err) + if b.cfg.SyncMode == SyncModeSync { + // Synchronous mode: wait for completion or error + select { + case <-ctx.Done(): + return nil + case <-b.ctx.Done(): + return nil + case err := <-s.ech: + return errors.Trace(err) + } + } else { + // Asynchronous mode: consume events from the streamer + for { + select { + case <-ctx.Done(): + return nil + case <-b.ctx.Done(): + return nil + case err := <-s.ech: + return errors.Trace(err) + case e := <-s.ch: + err = backupHandler.HandleEvent(e) + if err != nil { + return errors.Trace(err) + } + } + } } } @@ -87,6 +112,7 @@ type BackupEventHandler struct { file *os.File mutex sync.Mutex fsyncedChan chan struct{} + eventCount int // eventCount used for testing } func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { @@ -148,6 +174,7 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { return errors.Trace(err) } } + h.eventCount++ return nil } diff --git a/replication/backup_test.go b/replication/backup_test.go index 6d6217f54..64b389ba9 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -2,6 +2,7 @@ package replication import ( "context" + "fmt" "io" "os" "path" @@ -13,35 +14,60 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" ) -func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { - t.setupTest(mysql.MySQLFlavor) - - resetBinaryLogs := "RESET BINARY LOGS AND GTIDS" - if eq, err := t.c.CompareServerVersion("8.4.0"); (err == nil) && (eq < 0) { - resetBinaryLogs = "RESET MASTER" - } - - t.testExecute(resetBinaryLogs) - - for times := 1; times <= 2; times++ { - t.testSync(nil) - t.testExecute("FLUSH LOGS") - } +// testStartBackupEndInGivenTime tests the backup functionality with the given SyncMode. +func (t *testSyncerSuite) testStartBackupEndInGivenTime(syncMode SyncMode) { + // Setup the test environment with the specified SyncMode + t.setupTest(mysql.MySQLFlavor, syncMode) + t.b.cfg.SyncMode = syncMode + // Define binlogDir and timeout binlogDir := "./var" - os.RemoveAll(binlogDir) + err := os.MkdirAll(binlogDir, 0755) + require.NoError(t.T(), err, "Failed to recreate binlogDir") + timeout := 2 * time.Second done := make(chan bool) - // Start the backup in a goroutine + // Set up the BackupEventHandler + backupHandler := &BackupEventHandler{ + handler: func(binlogFilename string) (io.WriteCloser, error) { + return os.OpenFile(path.Join(binlogDir, binlogFilename), os.O_CREATE|os.O_WRONLY, 0644) + }, + } + + // Start the backup using StartBackupWithHandler in a separate goroutine go func() { - err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) - require.NoError(t.T(), err) + err := t.b.StartBackupWithHandler(mysql.Position{Name: "", Pos: uint32(0)}, timeout, backupHandler.handler) + require.NoError(t.T(), err, "StartBackupWithHandler failed") done <- true }() + // Wait briefly to ensure the backup process has started + time.Sleep(500 * time.Millisecond) + + // Execute FLUSH LOGS to trigger binlog rotation and create binlog.000001 + _, err = t.c.Execute("FLUSH LOGS") + require.NoError(t.T(), err, "Failed to execute FLUSH LOGS") + + // Generate a binlog event by creating a table and inserting data + _, err = t.c.Execute("CREATE TABLE IF NOT EXISTS test_backup (id INT PRIMARY KEY)") + require.NoError(t.T(), err, "Failed to create table_backup") + + _, err = t.c.Execute("INSERT INTO test_backup (id) VALUES (1)") + require.NoError(t.T(), err, "Failed to insert data into test_backup") + + // Define the expected binlog file path + expectedBinlogFile := path.Join(binlogDir, "binlog.000001") + + // Wait for the binlog file to be created + err = waitForFile(expectedBinlogFile, 2*time.Second) + require.NoError(t.T(), err, "Binlog file was not created in time") + + // Optionally, wait a short duration to ensure events are processed + time.Sleep(500 * time.Millisecond) + // Wait for the backup to complete or timeout failTimeout := 5 * timeout ctx, cancel := context.WithTimeout(context.Background(), failTimeout) @@ -50,18 +76,19 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { case <-done: // Backup completed; now verify the backup files files, err := os.ReadDir(binlogDir) - require.NoError(t.T(), err) + require.NoError(t.T(), err, "Failed to read binlogDir") require.NotEmpty(t.T(), files, "No binlog files were backed up") for _, file := range files { fileInfo, err := os.Stat(path.Join(binlogDir, file.Name())) - require.NoError(t.T(), err) + require.NoError(t.T(), err, "Failed to stat binlog file") require.NotZero(t.T(), fileInfo.Size(), "Binlog file %s is empty", file.Name()) } - return + // Additionally, verify that events were handled + require.Greater(t.T(), backupHandler.eventCount, 0, "No events were handled by the BackupEventHandler") case <-ctx.Done(): - t.T().Fatal("time out error") + t.T().Fatal("Backup timed out before completion") } } @@ -77,67 +104,138 @@ func (h *CountingEventHandler) HandleEvent(e *BinlogEvent) error { return nil } -func (t *testSyncerSuite) TestBackupEventHandlerInvocation() { - t.setupTest(mysql.MySQLFlavor) +// waitForFile waits until the specified file exists or the timeout is reached. +func waitForFile(filePath string, timeout time.Duration) error { + start := time.Now() + for { + if _, err := os.Stat(filePath); err == nil { + return nil + } + if time.Since(start) > timeout { + return fmt.Errorf("file %s did not appear within %v", filePath, timeout) + } + time.Sleep(100 * time.Millisecond) + } +} + +func (t *testSyncerSuite) testBackupEventHandlerInvocation(syncMode SyncMode) { + // Setup the test environment with the specified SyncMode + t.setupTest(mysql.MySQLFlavor, syncMode) + t.b.cfg.SyncMode = syncMode // Define binlogDir and timeout binlogDir := "./var" os.RemoveAll(binlogDir) - timeout := 2 * time.Second + timeout := 5 * time.Second // Increased timeout to allow for event processing - // Set up the CountingEventHandler - handler := &CountingEventHandler{} - t.b.SetEventHandler(handler) + // Ensure binlogDir exists + err := os.MkdirAll(binlogDir, 0755) + require.NoError(t.T(), err, "Failed to create binlogDir") - // Start the backup - err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) - require.NoError(t.T(), err) + // Set up the BackupEventHandler + backupHandler := &BackupEventHandler{ + handler: func(binlogFilename string) (io.WriteCloser, error) { + return os.OpenFile(path.Join(binlogDir, binlogFilename), os.O_CREATE|os.O_WRONLY, 0644) + }, + } + + if syncMode == SyncModeSync { + // Set the event handler in BinlogSyncer for synchronous mode + t.b.SetEventHandler(backupHandler) + } + + // Start the backup in a separate goroutine + go func() { + err := t.b.StartBackupWithHandler(mysql.Position{Name: "", Pos: uint32(0)}, timeout, backupHandler.handler) + require.NoError(t.T(), err, "StartBackupWithHandler failed") + }() + + // Wait briefly to ensure the backup process has started + time.Sleep(500 * time.Millisecond) + + // Execute FLUSH LOGS to trigger binlog rotation and create binlog.000001 + _, err = t.c.Execute("FLUSH LOGS") + require.NoError(t.T(), err, "Failed to execute FLUSH LOGS") + + // Generate a binlog event by creating a table and inserting data + _, err = t.c.Execute("CREATE TABLE IF NOT EXISTS test_backup (id INT PRIMARY KEY)") + require.NoError(t.T(), err, "Failed to create table") + + _, err = t.c.Execute("INSERT INTO test_backup (id) VALUES (1)") + require.NoError(t.T(), err, "Failed to insert data") + + // Define the expected binlog file path + expectedBinlogFile := path.Join(binlogDir, "binlog.000001") + + // Wait for the binlog file to be created + err = waitForFile(expectedBinlogFile, 2*time.Second) + require.NoError(t.T(), err, "Binlog file was not created in time") + + // Optionally, wait a short duration to ensure events are processed + time.Sleep(500 * time.Millisecond) // Verify that events were handled - handler.mutex.Lock() - eventCount := handler.count - handler.mutex.Unlock() - require.Greater(t.T(), eventCount, 0, "No events were handled by the EventHandler") -} + require.Greater(t.T(), backupHandler.eventCount, 0, "No events were handled by the BackupEventHandler") -func (t *testSyncerSuite) TestACKSentAfterFsync() { - t.setupTest(mysql.MySQLFlavor) + // Additional verification: Check that the binlog file has content + fileInfo, err := os.Stat(expectedBinlogFile) + require.NoError(t.T(), err, "Failed to stat binlog file") + require.NotZero(t.T(), fileInfo.Size(), "Binlog file is empty") +} - // Define binlogDir and timeout - binlogDir := "./var" +// setupACKAfterFsyncTest sets up the test environment for verifying the relationship +// between fsync completion and ACK sending. It configures the BinlogSyncer based on +// the provided SyncMode, initializes necessary channels and handlers, and returns them +// for use in the test functions. +func (t *testSyncerSuite) setupACKAfterFsyncTest(syncMode SyncMode) ( + binlogDir string, + fsyncedChan chan struct{}, + ackedChan chan struct{}, + handler func(string) (io.WriteCloser, error), +) { + // Initialize the test environment with the specified SyncMode + t.setupTest(mysql.MySQLFlavor, syncMode) + + // Define binlogDir + binlogDir = "./var" os.RemoveAll(binlogDir) err := os.MkdirAll(binlogDir, 0755) require.NoError(t.T(), err) - timeout := 5 * time.Second - // Create channels for signaling - fsyncedChan := make(chan struct{}, 1) - ackedChan := make(chan struct{}, 1) + // Create channels for signaling fsync and ACK events + fsyncedChan = make(chan struct{}, 1) + ackedChan = make(chan struct{}, 1) - // Custom handler returning TestWriteCloser - handler := func(binlogFilename string) (io.WriteCloser, error) { - file, err := os.OpenFile(path.Join(binlogDir, binlogFilename), os.O_CREATE|os.O_WRONLY, 0644) + // Define the handler function to open WriteClosers for binlog files + handler = func(binlogFilename string) (io.WriteCloser, error) { + filePath := path.Join(binlogDir, binlogFilename) + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return nil, err } - return &TestWriteCloser{ - file: file, - syncCalled: fsyncedChan, - }, nil + return file, nil } + // Assign the ackedChan to the BinlogSyncer for signaling ACKs + t.b.ackedChan = ackedChan + + return binlogDir, fsyncedChan, ackedChan, handler +} + +func (t *testSyncerSuite) testACKSentAfterFsync(syncMode SyncMode) { + _, fsyncedChan, ackedChan, handler := t.setupACKAfterFsyncTest(syncMode) + timeout := 5 * time.Second + // Set up the BackupEventHandler with fsyncedChan backupHandler := &BackupEventHandler{ handler: handler, fsyncedChan: fsyncedChan, } - // Set the event handler in BinlogSyncer - t.b.SetEventHandler(backupHandler) - - // Set the ackedChan in BinlogSyncer - t.b.ackedChan = ackedChan - t.b.cfg.SemiSyncEnabled = true // Ensure semi-sync is enabled + if syncMode == SyncModeSync { + // Set the event handler in BinlogSyncer + t.b.SetEventHandler(backupHandler) + } // Start syncing pos := mysql.Position{Name: "", Pos: uint32(0)} @@ -153,28 +251,74 @@ func (t *testSyncerSuite) TestACKSentAfterFsync() { // Execute a query to generate an event t.testExecute("FLUSH LOGS") - // Wait for fsync signal - select { - case <-fsyncedChan: - // fsync completed - case <-time.After(2 * time.Second): - t.T().Fatal("fsync did not complete in time") - } + if syncMode == SyncModeSync { + // Wait for fsync signal + select { + case <-fsyncedChan: + // fsync completed + case <-time.After(2 * time.Second): + t.T().Fatal("fsync did not complete in time") + } - // Record the time when fsync completed - fsyncTime := time.Now() + // Record the time when fsync completed + fsyncTime := time.Now() - // Wait for ACK signal - select { - case <-ackedChan: - // ACK sent - case <-time.After(2 * time.Second): - t.T().Fatal("ACK not sent in time") + // Wait for ACK signal + select { + case <-ackedChan: + // ACK sent + case <-time.After(2 * time.Second): + t.T().Fatal("ACK not sent in time") + } + + // Record the time when ACK was sent + ackTime := time.Now() + + // Assert that ACK was sent after fsync + require.True(t.T(), ackTime.After(fsyncTime), "ACK was sent before fsync completed") + } else { + // In asynchronous mode, fsync may not be directly tracked + // Focus on ensuring that ACK is sent + select { + case <-ackedChan: + // ACK sent + case <-time.After(2 * time.Second): + t.T().Fatal("ACK not sent in time") + } + + // Optionally, verify that binlog files are created + binlogDir := "./var" + files, err := os.ReadDir(binlogDir) + require.NoError(t.T(), err) + require.NotEmpty(t.T(), files, "No binlog files were backed up") + for _, file := range files { + fileInfo, err := os.Stat(path.Join(binlogDir, file.Name())) + require.NoError(t.T(), err) + require.NotZero(t.T(), fileInfo.Size(), "Binlog file %s is empty", file.Name()) + } } +} + +func (t *testSyncerSuite) TestStartBackupEndInGivenTimeAsync() { + t.testStartBackupEndInGivenTime(SyncModeAsync) +} + +func (t *testSyncerSuite) TestStartBackupEndInGivenTimeSync() { + t.testStartBackupEndInGivenTime(SyncModeSync) +} - // Record the time when ACK was sent - ackTime := time.Now() +func (t *testSyncerSuite) TestACKSentAfterFsyncSyncMode() { + t.testACKSentAfterFsync(SyncModeSync) +} + +func (t *testSyncerSuite) TestACKSentAfterFsyncAsyncMode() { + t.testACKSentAfterFsync(SyncModeAsync) +} + +func (t *testSyncerSuite) TestBackupEventHandlerInvocationSync() { + t.testBackupEventHandlerInvocation(SyncModeSync) +} - // Assert that ACK was sent after fsync - require.True(t.T(), ackTime.After(fsyncTime), "ACK was sent before fsync completed") +func (t *testSyncerSuite) TestBackupEventHandlerInvocationAsync() { + t.testBackupEventHandlerInvocation(SyncModeAsync) } diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 03290c53a..2a3fc89e8 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -25,6 +25,13 @@ var ( errSyncRunning = errors.New("Sync is running, must Close first") ) +type SyncMode int + +const ( + SyncModeAsync SyncMode = iota // Asynchronous mode (default) + SyncModeSync // Synchronous mode +) + // BinlogSyncerConfig is the configuration for BinlogSyncer. type BinlogSyncerConfig struct { // ServerID is the unique ID in cluster. @@ -126,6 +133,11 @@ type BinlogSyncerConfig struct { DiscardGTIDSet bool EventCacheCount int + + // SyncMode specifies whether to operate in synchronous or asynchronous mode. + // - SyncModeAsync (default): Events are sent to the BinlogStreamer and can be consumed via GetEvent(). + // - SyncModeSync: Events are processed synchronously using the EventHandler. + SyncMode SyncMode } // EventHandler defines the interface for processing binlog events. @@ -895,11 +907,24 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, rawD } } - // If an EventHandler is set, handle the event synchronously - if b.eventHandler != nil { - err := b.eventHandler.HandleEvent(e) - if err != nil { - return errors.Trace(err) + // Process the event based on the configured SyncMode + switch b.cfg.SyncMode { + case SyncModeSync: + // Synchronous mode: use EventHandler + if b.eventHandler != nil { + err := b.eventHandler.HandleEvent(e) + if err != nil { + return errors.Trace(err) + } + } else { + return errors.New("no EventHandler set for synchronous mode") + } + case SyncModeAsync: + // Asynchronous mode: send the event to the streamer channel + select { + case s.ch <- e: + case <-b.ctx.Done(): + return errors.New("sync is being closed...") } } @@ -911,18 +936,6 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, rawD } } - // Send event to the streamer channel - needStop := false - select { - case s.ch <- e: - case <-b.ctx.Done(): - needStop = true - } - - if needStop { - return errors.New("sync is being closed...") - } - return nil } diff --git a/replication/replication_test.go b/replication/replication_test.go index 441b6198d..f8dd88edc 100644 --- a/replication/replication_test.go +++ b/replication/replication_test.go @@ -270,7 +270,9 @@ func (t *testSyncerSuite) testSync(s *BinlogStreamer) { require.Greater(t.T(), eventCount, 0, "No events were handled by the EventHandler") } -func (t *testSyncerSuite) setupTest(flavor string) { +// setupTest initializes the test environment with the specified flavor and SyncMode. +// It configures the BinlogSyncer accordingly. +func (t *testSyncerSuite) setupTest(flavor string, syncMode SyncMode) { var port uint16 = 3306 switch flavor { case mysql.MariaDBFlavor: @@ -307,6 +309,7 @@ func (t *testSyncerSuite) setupTest(flavor string) { User: "root", Password: "", UseDecimal: true, + SyncMode: syncMode, // Set SyncMode directly } t.b = NewBinlogSyncer(cfg) @@ -352,12 +355,12 @@ func (t *testSyncerSuite) testPositionSync() { } func (t *testSyncerSuite) TestMysqlPositionSync() { - t.setupTest(mysql.MySQLFlavor) + t.setupTest(mysql.MySQLFlavor, SyncModeSync) t.testPositionSync() } func (t *testSyncerSuite) TestMysqlGTIDSync() { - t.setupTest(mysql.MySQLFlavor) + t.setupTest(mysql.MySQLFlavor, SyncModeSync) r, err := t.c.Execute("SELECT @@gtid_mode") require.NoError(t.T(), err) @@ -384,13 +387,13 @@ func (t *testSyncerSuite) TestMysqlGTIDSync() { } func (t *testSyncerSuite) TestMariadbPositionSync() { - t.setupTest(mysql.MariaDBFlavor) + t.setupTest(mysql.MariaDBFlavor, SyncModeSync) t.testPositionSync() } func (t *testSyncerSuite) TestMariadbGTIDSync() { - t.setupTest(mysql.MariaDBFlavor) + t.setupTest(mysql.MariaDBFlavor, SyncModeSync) // get current master gtid binlog pos r, err := t.c.Execute("SELECT @@gtid_binlog_pos") @@ -406,13 +409,13 @@ func (t *testSyncerSuite) TestMariadbGTIDSync() { } func (t *testSyncerSuite) TestMariadbAnnotateRows() { - t.setupTest(mysql.MariaDBFlavor) + t.setupTest(mysql.MariaDBFlavor, SyncModeSync) t.b.cfg.DumpCommandFlag = BINLOG_SEND_ANNOTATE_ROWS_EVENT t.testPositionSync() } func (t *testSyncerSuite) TestMysqlSemiPositionSync() { - t.setupTest(mysql.MySQLFlavor) + t.setupTest(mysql.MySQLFlavor, SyncModeSync) t.b.cfg.SemiSyncEnabled = true @@ -420,7 +423,7 @@ func (t *testSyncerSuite) TestMysqlSemiPositionSync() { } func (t *testSyncerSuite) TestMysqlBinlogCodec() { - t.setupTest(mysql.MySQLFlavor) + t.setupTest(mysql.MySQLFlavor, SyncModeSync) resetBinaryLogs := "RESET BINARY LOGS AND GTIDS" if eq, err := t.c.CompareServerVersion("8.4.0"); (err == nil) && (eq < 0) { @@ -475,7 +478,7 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec() { } func (t *testSyncerSuite) TestGTIDSetHandling() { - t.setupTest(mysql.MySQLFlavor) + t.setupTest(mysql.MySQLFlavor, SyncModeSync) // Ensure GTID mode is enabled r, err := t.c.Execute("SELECT @@gtid_mode")