Skip to content

Commit

Permalink
Ensure ACKs are sent after fsync in backup and refactor event handling
Browse files Browse the repository at this point in the history
This commit addresses an issue where acknowledgments (ACKs) were sometimes sent to the master before binlog events were fully written and fsynced to disk during backup operations. Sending ACKs prematurely in semi-synchronous replication could lead to data loss if the replica fails after sending the ACK but before persisting the event.

Key changes:

- Introduced an `EventHandler` interface with a `HandleEvent` method for
  processing binlog events. This allows custom event handling logic to
  be injected into the replication stream.

- Added an `eventHandler` field to `BinlogSyncer` and provided a
  `SetEventHandler` method to assign an event handler. This enables
  `BinlogSyncer` to delegate event processing to the assigned handler.

- Implemented `BackupEventHandler` which writes binlog events to disk
  and ensures that each event is fsynced before returning. This ensures
  data durability before ACKs are sent.

- Modified the `onStream` method in `BinlogSyncer` to separate event
  parsing (`parseEvent`) from event handling and ACK sending
  (`handleEventAndACK`). This adheres to the single-responsibility
  principle and makes the code cleaner.

- Moved state updates (e.g., updating `b.nextPos`) and GTID set handling
  from `parseEvent` to `handleEventAndACK` to avoid side effects during
  parsing.

- Ensured that ACKs are sent only after the event has been fully
  processed and fsynced by sending the ACK in `handleEventAndACK` after
  event handling.
  • Loading branch information
Dylan Terry committed Sep 30, 2024
1 parent 3a665d0 commit 9d327ec
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 78 deletions.
126 changes: 78 additions & 48 deletions replication/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"os"
"path"
"sync"
"time"

. "github.com/go-mysql-org/go-mysql/mysql"
Expand Down Expand Up @@ -41,77 +42,106 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
// Force use raw mode
b.parser.SetRawMode(true)

// Set up the backup event handler
backupHandler := &BackupEventHandler{
handler: handler,
}

// Set the event handler in BinlogSyncer
b.SetEventHandler(backupHandler)

// Start syncing
s, err := b.StartSync(p)
if err != nil {
return errors.Trace(err)
}

var filename string
var offset uint32

var w io.WriteCloser
defer func() {
var closeErr error
if w != nil {
closeErr = w.Close()
}
if retErr == nil {
retErr = closeErr
b.SetEventHandler(nil) // Reset the event handler
if backupHandler.w != nil {
closeErr := backupHandler.w.Close()
if retErr == nil {
retErr = closeErr
}
}
}()

for {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
e, err := s.GetEvent(ctx)
cancel()
// Wait until the context is done or an error occurs
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if err == context.DeadlineExceeded {
return nil
}
select {
case <-ctx.Done():
return nil
case <-b.ctx.Done():
return nil
case err := <-s.ech:
return errors.Trace(err)
}
}

if err != nil {
return errors.Trace(err)
}
// BackupEventHandler handles writing events for backup
type BackupEventHandler struct {
handler func(binlogFilename string) (io.WriteCloser, error)
w io.WriteCloser
file *os.File
mutex sync.Mutex
}

offset = e.Header.LogPos
func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
h.mutex.Lock()
defer h.mutex.Unlock()

if e.Header.EventType == ROTATE_EVENT {
rotateEvent := e.Event.(*RotateEvent)
filename = string(rotateEvent.NextLogName)
switch e.Header.EventType {
case ROTATE_EVENT:
rotateEvent := e.Event.(*RotateEvent)
filename := string(rotateEvent.NextLogName)

if e.Header.Timestamp == 0 || offset == 0 {
// fake rotate event
continue
}
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
// FormateDescriptionEvent is the first event in binlog, we will close old one and create a new

if w != nil {
if err = w.Close(); err != nil {
w = nil
return errors.Trace(err)
}
// Close existing file if open
if h.w != nil {
if err := h.w.Close(); err != nil {
h.w = nil
return errors.Trace(err)
}
}

if len(filename) == 0 {
return errors.Errorf("empty binlog filename for FormateDescriptionEvent")
}
// Open new file
var err error
h.w, err = h.handler(filename)
if err != nil {
return errors.Trace(err)
}

w, err = handler(filename)
if err != nil {
return errors.Trace(err)
}
// Ensure w is an *os.File to call Sync
if f, ok := h.w.(*os.File); ok {
h.file = f
} else {
return errors.New("handler did not return *os.File, cannot fsync")
}

// write binlog header fe'bin'
if _, err = w.Write(BinLogFileHeader); err != nil {
return errors.Trace(err)
}
// Write binlog header
if _, err := h.w.Write(BinLogFileHeader); err != nil {
return errors.Trace(err)
}

// fsync after writing header
if err := h.file.Sync(); err != nil {
return errors.Trace(err)
}

if n, err := w.Write(e.RawData); err != nil {
default:
// Write raw event data
if n, err := h.w.Write(e.RawData); err != nil {
return errors.Trace(err)
} else if n != len(e.RawData) {
return errors.Trace(io.ErrShortWrite)
}

// fsync after writing event
if err := h.file.Sync(); err != nil {
return errors.Trace(err)
}
}

return nil
}
111 changes: 81 additions & 30 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ type BinlogSyncerConfig struct {
EventCacheCount int
}

// EventHandler defines the interface for processing binlog events.
type EventHandler interface {
HandleEvent(e *BinlogEvent) error
}

// BinlogSyncer syncs binlog event from server.
type BinlogSyncer struct {
m sync.RWMutex
Expand Down Expand Up @@ -155,6 +160,8 @@ type BinlogSyncer struct {
lastConnectionID uint32

retryCount int

eventHandler EventHandler
}

// NewBinlogSyncer creates the BinlogSyncer with cfg.
Expand Down Expand Up @@ -382,6 +389,12 @@ func (b *BinlogSyncer) enableSemiSync() error {
return nil
}

func (b *BinlogSyncer) SetEventHandler(handler EventHandler) {
b.m.Lock()
defer b.m.Unlock()
b.eventHandler = handler
}

func (b *BinlogSyncer) prepare() error {
if b.isClosed() {
return errors.Trace(ErrSyncClosed)
Expand Down Expand Up @@ -765,7 +778,16 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {

switch data[0] {
case OK_HEADER:
if err = b.parseEvent(s, data); err != nil {
// Parse the event
e, err := b.parseEvent(data)
if err != nil {
s.closeWithError(err)
return
}

// Handle the event and send ACK if necessary
err = b.handleEventAndACK(s, e, data)
if err != nil {
s.closeWithError(err)
return
}
Expand All @@ -786,53 +808,62 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
}
}

func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
//skip OK byte, 0x00
// parseEvent parses the raw data into a BinlogEvent.
// It only handles parsing and does not perform any side effects.
func (b *BinlogSyncer) parseEvent(data []byte) (*BinlogEvent, error) {
// Skip OK byte (0x00)
data = data[1:]

needACK := false
if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) {
needACK = data[1] == 0x01
//skip semi sync header
data = data[2:]
}

// Parse the event using the BinlogParser
e, err := b.parser.Parse(data)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}

return e, nil
}

func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, rawData []byte) error {
// Check if we need to send an ACK
needACK := false
if b.cfg.SemiSyncEnabled && rawData[1] == SemiSyncIndicator && rawData[2] == 0x01 {
needACK = true
}

// Update the next position based on the event's LogPos
if e.Header.LogPos > 0 {
// Some events like FormatDescriptionEvent return 0, ignore.
b.nextPos.Pos = e.Header.LogPos
}

getCurrentGtidSet := func() GTIDSet {
if b.currGset == nil {
return nil
}
return b.currGset.Clone()
}

// Handle event types to update positions and GTID sets
switch event := e.Event.(type) {
case *RotateEvent:
b.nextPos.Name = string(event.NextLogName)
b.nextPos.Pos = uint32(event.Position)
b.cfg.Logger.Infof("rotate to %s", b.nextPos)

case *GTIDEvent:
if b.prevGset == nil {
break
}
if b.currGset == nil {
b.currGset = b.prevGset.Clone()
}
u, _ := uuid.FromBytes(event.SID)
u, err := uuid.FromBytes(event.SID)
if err != nil {
return errors.Trace(err)
}
b.currGset.(*MysqlGTIDSet).AddGTID(u, event.GNO)
if b.prevMySQLGTIDEvent != nil {
u, _ = uuid.FromBytes(b.prevMySQLGTIDEvent.SID)
u, err = uuid.FromBytes(b.prevMySQLGTIDEvent.SID)
if err != nil {
return errors.Trace(err)
}
b.prevGset.(*MysqlGTIDSet).AddGTID(u, b.prevMySQLGTIDEvent.GNO)
}
b.prevMySQLGTIDEvent = event

case *MariadbGTIDEvent:
if b.prevGset == nil {
break
Expand All @@ -841,45 +872,65 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
b.currGset = b.prevGset.Clone()
}
prev := b.currGset.Clone()
err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID)
err := b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID)
if err != nil {
return errors.Trace(err)
}
// right after reconnect we will see same gtid as we saw before, thus currGset will not get changed
// Right after reconnect we may see the same GTID as before; update prevGset if currGset changed
if !b.currGset.Equal(prev) {
b.prevGset = prev
}

case *XIDEvent:
if !b.cfg.DiscardGTIDSet {
event.GSet = getCurrentGtidSet()
event.GSet = b.getCurrentGtidSet()
}

case *QueryEvent:
if !b.cfg.DiscardGTIDSet {
event.GSet = getCurrentGtidSet()
event.GSet = b.getCurrentGtidSet()
}
}

needStop := false
select {
case s.ch <- e:
case <-b.ctx.Done():
needStop = true
// If an EventHandler is set, handle the event synchronously
if b.eventHandler != nil {
err := b.eventHandler.HandleEvent(e)
if err != nil {
return errors.Trace(err)
}
}

// Send the ACK if needed
if needACK {
err := b.replySemiSyncACK(b.nextPos)
if err != nil {
return errors.Trace(err)
}
}

// Send event to the streamer channel
needStop := false
select {
case s.ch <- e:
case <-b.ctx.Done():
needStop = true
}

if needStop {
return errors.New("sync is been closing...")
return errors.New("sync is being closed...")
}

return nil
}

// getCurrentGtidSet returns a clone of the current GTID set.
func (b *BinlogSyncer) getCurrentGtidSet() GTIDSet {
if b.currGset != nil {
return b.currGset.Clone()
}
return nil
}

// LastConnectionID returns last connectionID.
func (b *BinlogSyncer) LastConnectionID() uint32 {
return b.lastConnectionID
Expand Down

0 comments on commit 9d327ec

Please sign in to comment.