diff --git a/replication/backup.go b/replication/backup.go index 86265ae70..f1d7dd28b 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -5,26 +5,31 @@ import ( "io" "os" "path" + "sync" "time" . "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/errors" ) -// StartBackup: Like mysqlbinlog remote raw backup -// Backup remote binlog from position (filename, offset) and write in backupDir +// StartBackup starts the backup process for the binary log and writes to the backup directory. func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error { err := os.MkdirAll(backupDir, 0755) if err != nil { return errors.Trace(err) } - return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) { - return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) - }) + if b.cfg.SynchronousEventHandler == nil { + return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) { + return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) + }) + } else { + return b.StartSynchronousBackup(p, timeout) + } } // StartBackupWithHandler starts the backup process for the binary log using the specified position and handler. // The process will continue until the timeout is reached or an error occurs. +// This method should not be used together with SynchronousEventHandler. // // Parameters: // - p: The starting position in the binlog from which to begin the backup. @@ -37,81 +42,151 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, // a very long timeout here timeout = 30 * 3600 * 24 * time.Second } + if b.cfg.SynchronousEventHandler != nil { + return errors.New("StartBackupWithHandler cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackup instead.") + } // Force use raw mode b.parser.SetRawMode(true) + // Set up the backup event handler + backupHandler := &BackupEventHandler{ + handler: handler, + } + s, err := b.StartSync(p) if err != nil { return errors.Trace(err) } - var filename string - var offset uint32 - - var w io.WriteCloser defer func() { - var closeErr error - if w != nil { - closeErr = w.Close() - } - if retErr == nil { - retErr = closeErr + if backupHandler.w != nil { + closeErr := backupHandler.w.Close() + if retErr == nil { + retErr = closeErr + } } }() for { ctx, cancel := context.WithTimeout(context.Background(), timeout) - e, err := s.GetEvent(ctx) - cancel() + defer cancel() - if err == context.DeadlineExceeded { + select { + case <-ctx.Done(): return nil - } - - if err != nil { + case <-b.ctx.Done(): + return nil + case err := <-s.ech: return errors.Trace(err) + case e := <-s.ch: + err = backupHandler.HandleEvent(e) + if err != nil { + return errors.Trace(err) + } } + } +} - offset = e.Header.LogPos +// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig. +func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) error { + if b.cfg.SynchronousEventHandler == nil { + return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup") + } - if e.Header.EventType == ROTATE_EVENT { - rotateEvent := e.Event.(*RotateEvent) - filename = string(rotateEvent.NextLogName) + s, err := b.StartSync(p) + if err != nil { + return errors.Trace(err) + } - if e.Header.Timestamp == 0 || offset == 0 { - // fake rotate event - continue - } - } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { - // FormateDescriptionEvent is the first event in binlog, we will close old one and create a new - - if w != nil { - if err = w.Close(); err != nil { - w = nil - return errors.Trace(err) - } - } + var ctx context.Context + var cancel context.CancelFunc - if len(filename) == 0 { - return errors.Errorf("empty binlog filename for FormateDescriptionEvent") - } + if timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), timeout) + defer cancel() + } else { + ctx = context.Background() + } - w, err = handler(filename) - if err != nil { - return errors.Trace(err) - } + select { + case <-ctx.Done(): + // The timeout has been reached + return nil + case <-b.ctx.Done(): + // The BinlogSyncer has been closed + return nil + case err := <-s.ech: + // An error occurred during streaming + return errors.Trace(err) + } +} + +// BackupEventHandler handles writing events for backup +type BackupEventHandler struct { + handler func(binlogFilename string) (io.WriteCloser, error) + w io.WriteCloser + mutex sync.Mutex + + filename string +} + +func NewBackupEventHandler(handlerFunction func(filename string) (io.WriteCloser, error)) *BackupEventHandler { + return &BackupEventHandler{ + handler: handlerFunction, + } +} + +// HandleEvent processes a single event for the backup. +func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { + h.mutex.Lock() + defer h.mutex.Unlock() + + var err error + offset := e.Header.LogPos - // write binlog header fe'bin' - if _, err = w.Write(BinLogFileHeader); err != nil { + if e.Header.EventType == ROTATE_EVENT { + rotateEvent := e.Event.(*RotateEvent) + h.filename = string(rotateEvent.NextLogName) + if e.Header.Timestamp == 0 || offset == 0 { + // fake rotate event + return nil + } + } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { + if h.w != nil { + if err = h.w.Close(); err != nil { + h.w = nil return errors.Trace(err) } } - if n, err := w.Write(e.RawData); err != nil { + if len(h.filename) == 0 { + return errors.Errorf("empty binlog filename for FormatDescriptionEvent") + } + + h.w, err = h.handler(h.filename) + if err != nil { return errors.Trace(err) - } else if n != len(e.RawData) { + } + + // Write binlog header 0xfebin + _, err = h.w.Write(BinLogFileHeader) + if err != nil { + return errors.Trace(err) + } + } + + if h.w != nil { + n, err := h.w.Write(e.RawData) + if err != nil { + return errors.Trace(err) + } + if n != len(e.RawData) { return errors.Trace(io.ErrShortWrite) } + } else { + return errors.New("writer is not initialized") } + + return nil } diff --git a/replication/backup_test.go b/replication/backup_test.go index abefd3f8d..769f61e83 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -2,7 +2,9 @@ package replication import ( "context" + "io" "os" + "path" "time" "github.com/stretchr/testify/require" @@ -10,6 +12,7 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" ) +// TestStartBackupEndInGivenTime tests the backup process completes within a given time. func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { t.setupTest(mysql.MySQLFlavor) @@ -30,12 +33,12 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { os.RemoveAll(binlogDir) timeout := 2 * time.Second - done := make(chan bool) + done := make(chan struct{}) go func() { err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) require.NoError(t.T(), err) - done <- true + close(done) }() failTimeout := 5 * timeout ctx, cancel := context.WithTimeout(context.Background(), failTimeout) @@ -47,3 +50,70 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { t.T().Fatal("time out error") } } + +// TestAsyncBackup runs the backup process in asynchronous mode and verifies binlog file creation. +func (t *testSyncerSuite) TestAsyncBackup() { + testBackup(t, false) // false indicates asynchronous mode +} + +// TestSyncBackup runs the backup process in synchronous mode and verifies binlog file creation. +func (t *testSyncerSuite) TestSyncBackup() { + testBackup(t, true) // true indicates synchronous mode +} + +// testBackup is a helper function that runs the backup process in the specified mode and checks if binlog files are written correctly. +func testBackup(t *testSyncerSuite, isSynchronous bool) { + t.setupTest(mysql.MySQLFlavor) + t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled + + binlogDir := "./var" + os.RemoveAll(binlogDir) + timeout := 3 * time.Second + + if isSynchronous { + // Set up a BackupEventHandler for synchronous mode + backupHandler := NewBackupEventHandler( + func(filename string) (io.WriteCloser, error) { + return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0644) + }, + ) + t.b.cfg.SynchronousEventHandler = backupHandler + } else { + // Ensure SynchronousEventHandler is nil for asynchronous mode + t.b.cfg.SynchronousEventHandler = nil + } + + done := make(chan bool) + + // Start the backup process in a goroutine + go func() { + err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) + require.NoError(t.T(), err) + done <- true + }() + + failTimeout := 2 * timeout + ctx, cancel := context.WithTimeout(context.Background(), failTimeout) + defer cancel() + + // Wait for the backup to complete or timeout + select { + case <-done: + // Check if binlog files are written to the specified directory + files, err := os.ReadDir(binlogDir) + require.NoError(t.T(), err, "Failed to read binlog directory") + require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory") + mode := modeLabel(isSynchronous) + t.T().Logf("Backup completed successfully in %s mode with %d binlog file(s).", mode, len(files)) + case <-ctx.Done(): + mode := modeLabel(isSynchronous) + t.T().Fatalf("Timeout error during backup in %s mode.", mode) + } +} + +func modeLabel(isSynchronous bool) string { + if isSynchronous { + return "synchronous" + } + return "asynchronous" +} diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 39e5749ea..6763bf3f7 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -11,11 +11,10 @@ import ( "sync" "time" - "github.com/siddontang/go-log/loggers" - "github.com/google/uuid" "github.com/pingcap/errors" "github.com/siddontang/go-log/log" + "github.com/siddontang/go-log/loggers" "github.com/go-mysql-org/go-mysql/client" . "github.com/go-mysql-org/go-mysql/mysql" @@ -58,7 +57,7 @@ type BinlogSyncerConfig struct { TLSConfig *tls.Config // Use replication.Time structure for timestamp and datetime. - // We will use Local location for timestamp and UTC location for datatime. + // We will use Local location for timestamp and UTC location for datetime. ParseTime bool // If ParseTime is false, convert TIMESTAMP into this specified timezone. If @@ -126,9 +125,19 @@ type BinlogSyncerConfig struct { DiscardGTIDSet bool EventCacheCount int + + // SynchronousEventHandler is used for synchronous event handling. + // This should not be used together with StartBackupWithHandler. + // If this is not nil, GetEvent does not need to be called. + SynchronousEventHandler EventHandler +} + +// EventHandler defines the interface for processing binlog events. +type EventHandler interface { + HandleEvent(e *BinlogEvent) error } -// BinlogSyncer syncs binlog event from server. +// BinlogSyncer syncs binlog events from the server. type BinlogSyncer struct { m sync.RWMutex @@ -157,7 +166,7 @@ type BinlogSyncer struct { retryCount int } -// NewBinlogSyncer creates the BinlogSyncer with cfg. +// NewBinlogSyncer creates the BinlogSyncer with the given configuration. func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { if cfg.Logger == nil { streamHandler, _ := log.NewStreamHandler(os.Stdout) @@ -174,7 +183,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { cfg.EventCacheCount = 10240 } - // Clear the Password to avoid outputing it in log. + // Clear the Password to avoid outputting it in logs. pass := cfg.Password cfg.Password = "" cfg.Logger.Infof("create BinlogSyncer with config %+v", cfg) @@ -765,7 +774,16 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { switch data[0] { case OK_HEADER: - if err = b.parseEvent(s, data); err != nil { + // Parse the event + e, needACK, err := b.parseEvent(data) + if err != nil { + s.closeWithError(err) + return + } + + // Handle the event and send ACK if necessary + err = b.handleEventAndACK(s, e, needACK) + if err != nil { s.closeWithError(err) return } @@ -786,39 +804,45 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { } } -func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { - //skip OK byte, 0x00 +// parseEvent parses the raw data into a BinlogEvent. +// It only handles parsing and does not perform any side effects. +// Returns the parsed BinlogEvent, a boolean indicating if an ACK is needed, and an error if the +// parsing fails +func (b *BinlogSyncer) parseEvent(data []byte) (event *BinlogEvent, needACK bool, err error) { + // Skip OK byte (0x00) data = data[1:] - needACK := false - if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) { + needACK = false + if b.cfg.SemiSyncEnabled && data[0] == SemiSyncIndicator { needACK = data[1] == 0x01 - //skip semi sync header + // Skip semi-sync header data = data[2:] } - e, err := b.parser.Parse(data) + // Parse the event using the BinlogParser + event, err = b.parser.Parse(data) if err != nil { - return errors.Trace(err) + return nil, false, errors.Trace(err) } + return event, needACK, nil +} + +// handleEventAndACK processes an event and sends an ACK if necessary. +func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, needACK bool) error { + // Update the next position based on the event's LogPos if e.Header.LogPos > 0 { // Some events like FormatDescriptionEvent return 0, ignore. b.nextPos.Pos = e.Header.LogPos } - getCurrentGtidSet := func() GTIDSet { - if b.currGset == nil { - return nil - } - return b.currGset.Clone() - } - + // Handle event types to update positions and GTID sets switch event := e.Event.(type) { case *RotateEvent: b.nextPos.Name = string(event.NextLogName) b.nextPos.Pos = uint32(event.Position) b.cfg.Logger.Infof("rotate to %s", b.nextPos) + case *GTIDEvent: if b.prevGset == nil { break @@ -826,13 +850,20 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { if b.currGset == nil { b.currGset = b.prevGset.Clone() } - u, _ := uuid.FromBytes(event.SID) + u, err := uuid.FromBytes(event.SID) + if err != nil { + return errors.Trace(err) + } b.currGset.(*MysqlGTIDSet).AddGTID(u, event.GNO) if b.prevMySQLGTIDEvent != nil { - u, _ = uuid.FromBytes(b.prevMySQLGTIDEvent.SID) + u, err = uuid.FromBytes(b.prevMySQLGTIDEvent.SID) + if err != nil { + return errors.Trace(err) + } b.prevGset.(*MysqlGTIDSet).AddGTID(u, b.prevMySQLGTIDEvent.GNO) } b.prevMySQLGTIDEvent = event + case *MariadbGTIDEvent: if b.prevGset == nil { break @@ -841,29 +872,39 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { b.currGset = b.prevGset.Clone() } prev := b.currGset.Clone() - err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) + err := b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) if err != nil { return errors.Trace(err) } - // right after reconnect we will see same gtid as we saw before, thus currGset will not get changed + // Right after reconnect we may see the same GTID as before; update prevGset if currGset changed if !b.currGset.Equal(prev) { b.prevGset = prev } + case *XIDEvent: if !b.cfg.DiscardGTIDSet { - event.GSet = getCurrentGtidSet() + event.GSet = b.getCurrentGtidSet() } + case *QueryEvent: if !b.cfg.DiscardGTIDSet { - event.GSet = getCurrentGtidSet() + event.GSet = b.getCurrentGtidSet() } } - needStop := false - select { - case s.ch <- e: - case <-b.ctx.Done(): - needStop = true + // Use SynchronousEventHandler if it's set + if b.cfg.SynchronousEventHandler != nil { + err := b.cfg.SynchronousEventHandler.HandleEvent(e) + if err != nil { + return errors.Trace(err) + } + } else { + // Asynchronous mode: send the event to the streamer channel + select { + case s.ch <- e: + case <-b.ctx.Done(): + return errors.New("sync is being closed...") + } } if needACK { @@ -873,10 +914,14 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { } } - if needStop { - return errors.New("sync is been closing...") - } + return nil +} +// getCurrentGtidSet returns a clone of the current GTID set. +func (b *BinlogSyncer) getCurrentGtidSet() GTIDSet { + if b.currGset != nil { + return b.currGset.Clone() + } return nil }