Skip to content

Commit

Permalink
add SyncMode and support for synchronous and asynchronous backup hand…
Browse files Browse the repository at this point in the history
…ling
  • Loading branch information
Dylan Terry committed Oct 2, 2024
1 parent 4fa8f67 commit 1ff1b33
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 91 deletions.
49 changes: 38 additions & 11 deletions replication/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
// - timeout: The maximum duration to wait for new binlog events before stopping the backup process.
// If set to 0, a default very long timeout (30 days) is used instead.
// - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.
func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error) {
func (b *BinlogSyncer) StartBackupWithHandler(
p Position,
timeout time.Duration,
handler func(binlogFilename string) (io.WriteCloser, error),
) (retErr error) {
if timeout == 0 {
// a very long timeout here
timeout = 30 * 3600 * 24 * time.Second
Expand All @@ -47,8 +50,10 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
handler: handler,
}

// Set the event handler in BinlogSyncer
b.SetEventHandler(backupHandler)
if b.cfg.SyncMode == SyncModeSync {
// Set the event handler in BinlogSyncer for synchronous mode
b.SetEventHandler(backupHandler)
}

// Start syncing
s, err := b.StartSync(p)
Expand All @@ -70,13 +75,33 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

select {
case <-ctx.Done():
return nil
case <-b.ctx.Done():
return nil
case err := <-s.ech:
return errors.Trace(err)
if b.cfg.SyncMode == SyncModeSync {
// Synchronous mode: wait for completion or error
select {
case <-ctx.Done():
return nil
case <-b.ctx.Done():
return nil
case err := <-s.ech:
return errors.Trace(err)
}
} else {
// Asynchronous mode: consume events from the streamer
for {
select {
case <-ctx.Done():
return nil
case <-b.ctx.Done():
return nil
case err := <-s.ech:
return errors.Trace(err)
case e := <-s.ch:
err = backupHandler.HandleEvent(e)
if err != nil {
return errors.Trace(err)
}
}
}
}
}

Expand All @@ -87,6 +112,7 @@ type BackupEventHandler struct {
file *os.File
mutex sync.Mutex
fsyncedChan chan struct{}
eventCount int // eventCount used for testing
}

func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
Expand Down Expand Up @@ -148,6 +174,7 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
return errors.Trace(err)
}
}
h.eventCount++

return nil
}
220 changes: 166 additions & 54 deletions replication/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/go-mysql-org/go-mysql/mysql"
)

func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
t.setupTest(mysql.MySQLFlavor)
// testStartBackupEndInGivenTime tests the backup functionality with the given SyncMode.
func (t *testSyncerSuite) testStartBackupEndInGivenTime(syncMode SyncMode) {
t.setupTest(mysql.MySQLFlavor, syncMode)

t.b.cfg.SyncMode = syncMode

resetBinaryLogs := "RESET BINARY LOGS AND GTIDS"
if eq, err := t.c.CompareServerVersion("8.4.0"); (err == nil) && (eq < 0) {
Expand Down Expand Up @@ -77,67 +80,130 @@ func (h *CountingEventHandler) HandleEvent(e *BinlogEvent) error {
return nil
}

func (t *testSyncerSuite) TestBackupEventHandlerInvocation() {
t.setupTest(mysql.MySQLFlavor)
// waitForFile waits until the specified file exists or the timeout is reached.
func waitForFile(path string, timeout time.Duration) error {
start := time.Now()
for {
if _, err := os.Stat(path); err == nil {
return nil
}
if time.Since(start) > timeout {
return fmt.Errorf("file %s did not appear within %v", path, timeout)
}
time.Sleep(100 * time.Millisecond)
}
}

func (t *testSyncerSuite) testBackupEventHandlerInvocation(syncMode SyncMode) {
t.setupTest(mysql.MySQLFlavor, syncMode)
t.b.cfg.SyncMode = syncMode

// Define binlogDir and timeout
binlogDir := "./var"
os.RemoveAll(binlogDir)
timeout := 2 * time.Second

// Set up the CountingEventHandler
handler := &CountingEventHandler{}
t.b.SetEventHandler(handler)
// Ensure binlogDir exists
err := os.MkdirAll(binlogDir, 0755)
require.NoError(t.T(), err)

// Set up the BackupEventHandler
backupHandler := &BackupEventHandler{
handler: func(binlogFilename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(binlogDir, binlogFilename), os.O_CREATE|os.O_WRONLY, 0644)
},
}

if syncMode == SyncModeSync {
// Set the event handler in BinlogSyncer
t.b.SetEventHandler(backupHandler)
}

// Start the backup
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
// Start the backup in a separate goroutine to allow interaction
go func() {
err := t.b.StartBackupWithHandler(mysql.Position{Name: "", Pos: uint32(0)}, timeout, backupHandler.handler)
require.NoError(t.T(), err)
}()

// Wait briefly to ensure the backup process has started
time.Sleep(500 * time.Millisecond)

// Execute FLUSH LOGS to trigger binlog rotation and create binlog.000001
_, err = t.c.Execute("FLUSH LOGS")
require.NoError(t.T(), err)

// Define the expected binlog file path
expectedBinlogFile := path.Join(binlogDir, "binlog.000001")

// Wait for the binlog file to be created
err = waitForFile(expectedBinlogFile, 2*time.Second)
require.NoError(t.T(), err, "Binlog file was not created in time")

// Optionally, wait a short duration to ensure events are processed
time.Sleep(500 * time.Millisecond)

// Verify that events were handled
handler.mutex.Lock()
eventCount := handler.count
handler.mutex.Unlock()
require.Greater(t.T(), eventCount, 0, "No events were handled by the EventHandler")
}
require.Greater(t.T(), backupHandler.eventCount, 0, "No events were handled by the BackupEventHandler")

func (t *testSyncerSuite) TestACKSentAfterFsync() {
t.setupTest(mysql.MySQLFlavor)
// Additional verification: Check that the binlog file has content
fileInfo, err := os.Stat(expectedBinlogFile)
require.NoError(t.T(), err, "Failed to stat binlog file")
require.NotZero(t.T(), fileInfo.Size(), "Binlog file is empty")
}

// Define binlogDir and timeout
binlogDir := "./var"
// setupACKAfterFsyncTest sets up the test environment for verifying the relationship
// between fsync completion and ACK sending. It configures the BinlogSyncer based on
// the provided SyncMode, initializes necessary channels and handlers, and returns them
// for use in the test functions.
func (t *testSyncerSuite) setupACKAfterFsyncTest(syncMode SyncMode) (
binlogDir string,
fsyncedChan chan struct{},
ackedChan chan struct{},
handler func(string) (io.WriteCloser, error),
) {
// Initialize the test environment with the specified SyncMode
t.setupTest(mysql.MySQLFlavor, syncMode)

// Define binlogDir
binlogDir = "./var"
os.RemoveAll(binlogDir)
err := os.MkdirAll(binlogDir, 0755)
require.NoError(t.T(), err)
timeout := 5 * time.Second

// Create channels for signaling
fsyncedChan := make(chan struct{}, 1)
ackedChan := make(chan struct{}, 1)
// Create channels for signaling fsync and ACK events
fsyncedChan = make(chan struct{}, 1)
ackedChan = make(chan struct{}, 1)

// Custom handler returning TestWriteCloser
handler := func(binlogFilename string) (io.WriteCloser, error) {
file, err := os.OpenFile(path.Join(binlogDir, binlogFilename), os.O_CREATE|os.O_WRONLY, 0644)
// Define the handler function to open WriteClosers for binlog files
handler = func(binlogFilename string) (io.WriteCloser, error) {
filePath := path.Join(binlogDir, binlogFilename)
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
return &TestWriteCloser{
file: file,
syncCalled: fsyncedChan,
}, nil
return file, nil
}

// Assign the ackedChan to the BinlogSyncer for signaling ACKs
t.b.ackedChan = ackedChan

return binlogDir, fsyncedChan, ackedChan, handler
}

func (t *testSyncerSuite) testACKSentAfterFsync(syncMode SyncMode) {
_, fsyncedChan, ackedChan, handler := t.setupACKAfterFsyncTest(syncMode)
timeout := 5 * time.Second

// Set up the BackupEventHandler with fsyncedChan
backupHandler := &BackupEventHandler{
handler: handler,
fsyncedChan: fsyncedChan,
}

// Set the event handler in BinlogSyncer
t.b.SetEventHandler(backupHandler)

// Set the ackedChan in BinlogSyncer
t.b.ackedChan = ackedChan
t.b.cfg.SemiSyncEnabled = true // Ensure semi-sync is enabled
if syncMode == SyncModeSync {
// Set the event handler in BinlogSyncer
t.b.SetEventHandler(backupHandler)
}

// Start syncing
pos := mysql.Position{Name: "", Pos: uint32(0)}
Expand All @@ -153,28 +219,74 @@ func (t *testSyncerSuite) TestACKSentAfterFsync() {
// Execute a query to generate an event
t.testExecute("FLUSH LOGS")

// Wait for fsync signal
select {
case <-fsyncedChan:
// fsync completed
case <-time.After(2 * time.Second):
t.T().Fatal("fsync did not complete in time")
}
if syncMode == SyncModeSync {
// Wait for fsync signal
select {
case <-fsyncedChan:
// fsync completed
case <-time.After(2 * time.Second):
t.T().Fatal("fsync did not complete in time")
}

// Record the time when fsync completed
fsyncTime := time.Now()
// Record the time when fsync completed
fsyncTime := time.Now()

// Wait for ACK signal
select {
case <-ackedChan:
// ACK sent
case <-time.After(2 * time.Second):
t.T().Fatal("ACK not sent in time")
// Wait for ACK signal
select {
case <-ackedChan:
// ACK sent
case <-time.After(2 * time.Second):
t.T().Fatal("ACK not sent in time")
}

// Record the time when ACK was sent
ackTime := time.Now()

// Assert that ACK was sent after fsync
require.True(t.T(), ackTime.After(fsyncTime), "ACK was sent before fsync completed")
} else {
// In asynchronous mode, fsync may not be directly tracked
// Focus on ensuring that ACK is sent
select {
case <-ackedChan:
// ACK sent
case <-time.After(2 * time.Second):
t.T().Fatal("ACK not sent in time")
}

// Optionally, verify that binlog files are created
binlogDir := "./var"
files, err := os.ReadDir(binlogDir)
require.NoError(t.T(), err)
require.NotEmpty(t.T(), files, "No binlog files were backed up")
for _, file := range files {
fileInfo, err := os.Stat(path.Join(binlogDir, file.Name()))
require.NoError(t.T(), err)
require.NotZero(t.T(), fileInfo.Size(), "Binlog file %s is empty", file.Name())
}
}
}

func (t *testSyncerSuite) TestStartBackupEndInGivenTimeAsync() {
t.testStartBackupEndInGivenTime(SyncModeAsync)
}

// Record the time when ACK was sent
ackTime := time.Now()
func (t *testSyncerSuite) TestStartBackupEndInGivenTimeSync() {
t.testStartBackupEndInGivenTime(SyncModeSync)
}

func (t *testSyncerSuite) TestACKSentAfterFsyncSyncMode() {
t.testACKSentAfterFsync(SyncModeSync)
}

func (t *testSyncerSuite) TestACKSentAfterFsyncAsyncMode() {
t.testACKSentAfterFsync(SyncModeAsync)
}

func (t *testSyncerSuite) TestBackupEventHandlerInvocationSync() {
t.testBackupEventHandlerInvocation(SyncModeSync)
}

// Assert that ACK was sent after fsync
require.True(t.T(), ackTime.After(fsyncTime), "ACK was sent before fsync completed")
func (t *testSyncerSuite) TestBackupEventHandlerInvocationAsync() {
t.testBackupEventHandlerInvocation(SyncModeAsync)
}
Loading

0 comments on commit 1ff1b33

Please sign in to comment.