Skip to content

Commit

Permalink
Add DisableFlushBinlogWhileWaiting config and almost every query ev…
Browse files Browse the repository at this point in the history
…ent triggers `OnPosSynced` (#900)

* Add ANALYZE TABLE to canal DDL list

* Change to using flush-less WaitUntilPos

* Add a better test

* Include reviewer feedback

* Add greater check

---------

Co-authored-by: lance6716 <[email protected]>
  • Loading branch information
morgo and lance6716 authored Jul 28, 2024
1 parent ee48e78 commit 24fbb5b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 4 deletions.
28 changes: 28 additions & 0 deletions canal/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,34 @@ func (s *canalTestSuite) TestCanal() {
require.NoError(s.T(), err)
}

func (s *canalTestSuite) TestAnalyzeAdvancesSyncedPos() {
<-s.c.WaitDumpDone()

// We should not need to use FLUSH BINARY LOGS
// An ANALYZE TABLE statement should advance the saved position.
// There are still cases that don't advance, such as
// statements that won't parse like [CREATE|DROP] TRIGGER.
s.c.cfg.DisableFlushBinlogWhileWaiting = true
defer func() {
s.c.cfg.DisableFlushBinlogWhileWaiting = false
}()

startingPos, err := s.c.GetMasterPos()
require.NoError(s.T(), err)

s.execute("ANALYZE TABLE test.canal_test")
err = s.c.CatchMasterPos(10 * time.Second)
require.NoError(s.T(), err)

// Ensure the ending pos is greater than the starting pos
// but the filename is the same. This ensures that
// FLUSH BINARY LOGS was not used.
endingPos, err := s.c.GetMasterPos()
require.NoError(s.T(), err)
require.Equal(s.T(), startingPos.Name, endingPos.Name)
require.Greater(s.T(), endingPos.Pos, startingPos.Pos)
}

func (s *canalTestSuite) TestCanalFilter() {
// included
sch, err := s.c.GetTable("test", "canal_test")
Expand Down
5 changes: 5 additions & 0 deletions canal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ type Config struct {
// whether disable re-sync for broken connection
DisableRetrySync bool `toml:"disable_retry_sync"`

// whether the function WaitUntilPos() can use FLUSH BINARY LOGS
// to ensure we advance past a position. This should not strictly be required,
// and requires additional privileges.
DisableFlushBinlogWhileWaiting bool `toml:"disable_flush_binlog_while_waiting"`

// Set TLS config
TLSConfig *tls.Config

Expand Down
14 changes: 10 additions & 4 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,14 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
case *replication.QueryEvent:
stmts, _, err := c.parser.Parse(string(e.Query), "", "")
if err != nil {
// The parser does not understand all syntax.
// For example, it won't parse [CREATE|DROP] TRIGGER statements.
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
return nil
}
if len(stmts) > 0 {
savePos = true
}
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
Expand All @@ -155,7 +160,6 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
}
}
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
Expand Down Expand Up @@ -300,9 +304,11 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
case <-timer.C:
return errors.Errorf("wait position %v too long > %s", pos, timeout)
default:
err := c.FlushBinlog()
if err != nil {
return errors.Trace(err)
if !c.cfg.DisableFlushBinlogWhileWaiting {
err := c.FlushBinlog()
if err != nil {
return errors.Trace(err)
}
}
curPos := c.master.Position()
if curPos.Compare(pos) >= 0 {
Expand Down

0 comments on commit 24fbb5b

Please sign in to comment.