Skip to content

Commit

Permalink
Ensure ACKs are sent after backup (#921)
Browse files Browse the repository at this point in the history
* This commit addresses an issue where acknowledgments (ACKs) were sometimes sent to the master before binlog events were fully written and fsynced to disk during backup operations. Sending ACKs prematurely in semi-synchronous replication could lead to data loss if the replica fails after sending the ACK but before persisting the event.

Key changes:

- Introduced an `EventHandler` interface with a `HandleEvent` method for
  processing binlog events. This allows custom event handling logic to
  be injected into the replication stream.

- Added an `eventHandler` field to `BinlogSyncer` and provided a
  `SetEventHandler` method to assign an event handler. This enables
  `BinlogSyncer` to delegate event processing to the assigned handler.

- Implemented `BackupEventHandler` which writes binlog events to disk
  and ensures that each event is fsynced before returning. This ensures
  data durability before ACKs are sent.

- Modified the `onStream` method in `BinlogSyncer` to separate event
  parsing (`parseEvent`) from event handling and ACK sending
  (`handleEventAndACK`). This adheres to the single-responsibility
  principle and makes the code cleaner.

- Moved state updates (e.g., updating `b.nextPos`) and GTID set handling
  from `parseEvent` to `handleEventAndACK` to avoid side effects during
  parsing.

- Ensured that ACKs are sent only after the event has been fully
  processed and fsynced by sending the ACK in `handleEventAndACK` after
  event handling.

* Refactor event handling by replacing SyncMode and EventHandleMode with SynchronousEventHandler. Simplify the event processing in BinlogSyncerConfig by introducing SynchronousEventHandler for synchronous event handling. Update StartBackup, StartBackupWithHandler, and associated tests to reflect these changes.

* Add some comments and remember to remove SetEventHandler and the eventHandler attribute

* Remove the timeout for synchronous backup, revert the timeout move to return the behavior to 30 days _between_ events, restore some comments, use struct instead of bool as recommended, add a note about SynchronousEventHandler and the parseEvent return values

* Make sure to assign the timeout on the syncer so the backup doesn't fail

* Make sure to add NewBackupHandler in order to expose the otherwise private handler outside the package

---------

Co-authored-by: Dylan Terry <[email protected]>
  • Loading branch information
dt8269 and Dylan Terry authored Oct 20, 2024
1 parent a9f9c23 commit c435689
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 86 deletions.
173 changes: 124 additions & 49 deletions replication/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,31 @@ import (
"io"
"os"
"path"
"sync"
"time"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
)

// StartBackup: Like mysqlbinlog remote raw backup
// Backup remote binlog from position (filename, offset) and write in backupDir
// StartBackup starts the backup process for the binary log and writes to the backup directory.
func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error {
err := os.MkdirAll(backupDir, 0755)
if err != nil {
return errors.Trace(err)
}
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
})
if b.cfg.SynchronousEventHandler == nil {
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
})
} else {
return b.StartSynchronousBackup(p, timeout)
}
}

// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
// The process will continue until the timeout is reached or an error occurs.
// This method should not be used together with SynchronousEventHandler.
//
// Parameters:
// - p: The starting position in the binlog from which to begin the backup.
Expand All @@ -37,81 +42,151 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
// a very long timeout here
timeout = 30 * 3600 * 24 * time.Second
}
if b.cfg.SynchronousEventHandler != nil {
return errors.New("StartBackupWithHandler cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackup instead.")
}

// Force use raw mode
b.parser.SetRawMode(true)

// Set up the backup event handler
backupHandler := &BackupEventHandler{
handler: handler,
}

s, err := b.StartSync(p)
if err != nil {
return errors.Trace(err)
}

var filename string
var offset uint32

var w io.WriteCloser
defer func() {
var closeErr error
if w != nil {
closeErr = w.Close()
}
if retErr == nil {
retErr = closeErr
if backupHandler.w != nil {
closeErr := backupHandler.w.Close()
if retErr == nil {
retErr = closeErr
}
}
}()

for {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
e, err := s.GetEvent(ctx)
cancel()
defer cancel()

if err == context.DeadlineExceeded {
select {
case <-ctx.Done():
return nil
}

if err != nil {
case <-b.ctx.Done():
return nil
case err := <-s.ech:
return errors.Trace(err)
case e := <-s.ch:
err = backupHandler.HandleEvent(e)
if err != nil {
return errors.Trace(err)
}
}
}
}

offset = e.Header.LogPos
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) error {
if b.cfg.SynchronousEventHandler == nil {
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup")
}

if e.Header.EventType == ROTATE_EVENT {
rotateEvent := e.Event.(*RotateEvent)
filename = string(rotateEvent.NextLogName)
s, err := b.StartSync(p)
if err != nil {
return errors.Trace(err)
}

if e.Header.Timestamp == 0 || offset == 0 {
// fake rotate event
continue
}
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
// FormateDescriptionEvent is the first event in binlog, we will close old one and create a new

if w != nil {
if err = w.Close(); err != nil {
w = nil
return errors.Trace(err)
}
}
var ctx context.Context
var cancel context.CancelFunc

if len(filename) == 0 {
return errors.Errorf("empty binlog filename for FormateDescriptionEvent")
}
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
} else {
ctx = context.Background()
}

w, err = handler(filename)
if err != nil {
return errors.Trace(err)
}
select {
case <-ctx.Done():
// The timeout has been reached
return nil
case <-b.ctx.Done():
// The BinlogSyncer has been closed
return nil
case err := <-s.ech:
// An error occurred during streaming
return errors.Trace(err)
}
}

// BackupEventHandler handles writing events for backup
type BackupEventHandler struct {
handler func(binlogFilename string) (io.WriteCloser, error)
w io.WriteCloser
mutex sync.Mutex

filename string
}

func NewBackupEventHandler(handlerFunction func(filename string) (io.WriteCloser, error)) *BackupEventHandler {
return &BackupEventHandler{
handler: handlerFunction,
}
}

// HandleEvent processes a single event for the backup.
func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
h.mutex.Lock()
defer h.mutex.Unlock()

var err error
offset := e.Header.LogPos

// write binlog header fe'bin'
if _, err = w.Write(BinLogFileHeader); err != nil {
if e.Header.EventType == ROTATE_EVENT {
rotateEvent := e.Event.(*RotateEvent)
h.filename = string(rotateEvent.NextLogName)
if e.Header.Timestamp == 0 || offset == 0 {
// fake rotate event
return nil
}
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
if h.w != nil {
if err = h.w.Close(); err != nil {
h.w = nil
return errors.Trace(err)
}
}

if n, err := w.Write(e.RawData); err != nil {
if len(h.filename) == 0 {
return errors.Errorf("empty binlog filename for FormatDescriptionEvent")
}

h.w, err = h.handler(h.filename)
if err != nil {
return errors.Trace(err)
} else if n != len(e.RawData) {
}

// Write binlog header 0xfebin
_, err = h.w.Write(BinLogFileHeader)
if err != nil {
return errors.Trace(err)
}
}

if h.w != nil {
n, err := h.w.Write(e.RawData)
if err != nil {
return errors.Trace(err)
}
if n != len(e.RawData) {
return errors.Trace(io.ErrShortWrite)
}
} else {
return errors.New("writer is not initialized")
}

return nil
}
74 changes: 72 additions & 2 deletions replication/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package replication

import (
"context"
"io"
"os"
"path"
"time"

"github.com/stretchr/testify/require"

"github.com/go-mysql-org/go-mysql/mysql"
)

// TestStartBackupEndInGivenTime tests the backup process completes within a given time.
func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
t.setupTest(mysql.MySQLFlavor)

Expand All @@ -30,12 +33,12 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
os.RemoveAll(binlogDir)
timeout := 2 * time.Second

done := make(chan bool)
done := make(chan struct{})

go func() {
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
require.NoError(t.T(), err)
done <- true
close(done)
}()
failTimeout := 5 * timeout
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
Expand All @@ -47,3 +50,70 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
t.T().Fatal("time out error")
}
}

// TestAsyncBackup runs the backup process in asynchronous mode and verifies binlog file creation.
func (t *testSyncerSuite) TestAsyncBackup() {
testBackup(t, false) // false indicates asynchronous mode
}

// TestSyncBackup runs the backup process in synchronous mode and verifies binlog file creation.
func (t *testSyncerSuite) TestSyncBackup() {
testBackup(t, true) // true indicates synchronous mode
}

// testBackup is a helper function that runs the backup process in the specified mode and checks if binlog files are written correctly.
func testBackup(t *testSyncerSuite, isSynchronous bool) {
t.setupTest(mysql.MySQLFlavor)
t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled

binlogDir := "./var"
os.RemoveAll(binlogDir)
timeout := 3 * time.Second

if isSynchronous {
// Set up a BackupEventHandler for synchronous mode
backupHandler := NewBackupEventHandler(
func(filename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
},
)
t.b.cfg.SynchronousEventHandler = backupHandler
} else {
// Ensure SynchronousEventHandler is nil for asynchronous mode
t.b.cfg.SynchronousEventHandler = nil
}

done := make(chan bool)

// Start the backup process in a goroutine
go func() {
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
require.NoError(t.T(), err)
done <- true
}()

failTimeout := 2 * timeout
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
defer cancel()

// Wait for the backup to complete or timeout
select {
case <-done:
// Check if binlog files are written to the specified directory
files, err := os.ReadDir(binlogDir)
require.NoError(t.T(), err, "Failed to read binlog directory")
require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory")
mode := modeLabel(isSynchronous)
t.T().Logf("Backup completed successfully in %s mode with %d binlog file(s).", mode, len(files))
case <-ctx.Done():
mode := modeLabel(isSynchronous)
t.T().Fatalf("Timeout error during backup in %s mode.", mode)
}
}

func modeLabel(isSynchronous bool) string {
if isSynchronous {
return "synchronous"
}
return "asynchronous"
}
Loading

0 comments on commit c435689

Please sign in to comment.