diff --git a/canal/canal_test.go b/canal/canal_test.go index 879f266bf..14a7056b9 100644 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -138,6 +138,34 @@ func (s *canalTestSuite) TestCanal() { require.NoError(s.T(), err) } +func (s *canalTestSuite) TestAnalyzeAdvancesSyncedPos() { + <-s.c.WaitDumpDone() + + // We should not need to use FLUSH BINARY LOGS + // An ANALYZE TABLE statement should advance the saved position. + // There are still cases that don't advance, such as + // statements that won't parse like [CREATE|DROP] TRIGGER. + s.c.cfg.DisableFlushBinlogWhileWaiting = true + defer func() { + s.c.cfg.DisableFlushBinlogWhileWaiting = false + }() + + startingPos, err := s.c.GetMasterPos() + require.NoError(s.T(), err) + + s.execute("ANALYZE TABLE test.canal_test") + err = s.c.CatchMasterPos(10 * time.Second) + require.NoError(s.T(), err) + + // Ensure the ending pos is greater than the starting pos + // but the filename is the same. This ensures that + // FLUSH BINARY LOGS was not used. + endingPos, err := s.c.GetMasterPos() + require.NoError(s.T(), err) + require.Equal(s.T(), startingPos.Name, endingPos.Name) + require.Greater(s.T(), endingPos.Pos, startingPos.Pos) +} + func (s *canalTestSuite) TestCanalFilter() { // included sch, err := s.c.GetTable("test", "canal_test") diff --git a/canal/config.go b/canal/config.go index 1c2cdab6e..e2b848f5e 100644 --- a/canal/config.go +++ b/canal/config.go @@ -91,6 +91,11 @@ type Config struct { // whether disable re-sync for broken connection DisableRetrySync bool `toml:"disable_retry_sync"` + // whether the function WaitUntilPos() can use FLUSH BINARY LOGS + // to ensure we advance past a position. This should not strictly be required, + // and requires additional privileges. + DisableFlushBinlogWhileWaiting bool `toml:"disable_flush_binlog_while_waiting"` + // Set TLS config TLSConfig *tls.Config diff --git a/canal/sync.go b/canal/sync.go index 9fcc676ec..e9a62e76e 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -141,9 +141,14 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error { case *replication.QueryEvent: stmts, _, err := c.parser.Parse(string(e.Query), "", "") if err != nil { + // The parser does not understand all syntax. + // For example, it won't parse [CREATE|DROP] TRIGGER statements. c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err) return nil } + if len(stmts) > 0 { + savePos = true + } for _, stmt := range stmts { nodes := parseStmt(stmt) for _, node := range nodes { @@ -155,7 +160,6 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error { } } if len(nodes) > 0 { - savePos = true force = true // Now we only handle Table Changed DDL, maybe we will support more later. if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil { @@ -300,9 +304,11 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error { case <-timer.C: return errors.Errorf("wait position %v too long > %s", pos, timeout) default: - err := c.FlushBinlog() - if err != nil { - return errors.Trace(err) + if !c.cfg.DisableFlushBinlogWhileWaiting { + err := c.FlushBinlog() + if err != nil { + return errors.Trace(err) + } } curPos := c.master.Position() if curPos.Compare(pos) >= 0 {