From 641d0b6b7f4f309180f8f8cc582c523a7f841f2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 8 Nov 2024 15:31:19 +0100 Subject: [PATCH] replication,cmd: improve flavor handling For `go-mysqlbinlog` and `go-canal`: return an error for invalid flavors. For `go-mysqlbinlog`: log a non-critical error if the server version has "MariaDB" in it but the flavor isn't set to "mariadb". --- cmd/go-canal/main.go | 7 +++++++ cmd/go-mysqlbinlog/main.go | 6 ++++++ mysql/mysql_test.go | 22 ++++++++++++++++++++++ mysql/validate.go | 14 ++++++++++++++ replication/binlogsyncer.go | 14 ++++++++++++++ 5 files changed, 63 insertions(+) create mode 100644 mysql/validate.go diff --git a/cmd/go-canal/main.go b/cmd/go-canal/main.go index ced9aabd4..13a34e1fb 100644 --- a/cmd/go-canal/main.go +++ b/cmd/go-canal/main.go @@ -13,6 +13,7 @@ import ( "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" + "github.com/pingcap/errors" ) var ( @@ -41,6 +42,12 @@ var ( func main() { flag.Parse() + err := mysql.ValidateFlavor(*flavor) + if err != nil { + fmt.Printf("Flavor error: %v\n", errors.ErrorStack(err)) + return + } + cfg := canal.NewDefaultConfig() cfg.Addr = net.JoinHostPort(*host, strconv.Itoa(*port)) cfg.User = *user diff --git a/cmd/go-mysqlbinlog/main.go b/cmd/go-mysqlbinlog/main.go index eb97ace9c..3a256480b 100644 --- a/cmd/go-mysqlbinlog/main.go +++ b/cmd/go-mysqlbinlog/main.go @@ -49,6 +49,12 @@ func main() { UseDecimal: true, } + err := mysql.ValidateFlavor(*flavor) + if err != nil { + fmt.Printf("Flavor error: %v\n", errors.ErrorStack(err)) + return + } + b := replication.NewBinlogSyncer(cfg) pos := mysql.Position{Name: *file, Pos: uint32(*pos)} diff --git a/mysql/mysql_test.go b/mysql/mysql_test.go index bf605e162..0d42a4e71 100644 --- a/mysql/mysql_test.go +++ b/mysql/mysql_test.go @@ -337,3 +337,25 @@ func mysqlGTIDfromString(t *testing.T, gtidStr string) MysqlGTIDSet { return *gtid.(*MysqlGTIDSet) } + +func TestValidateFlavor(t *testing.T) { + tbls := []struct { + flavor string + valid bool + }{ + {"mysql", true}, + {"mariadb", true}, + {"maria", false}, + {"MariaDB", false}, + {"msql", false}, + } + + for _, f := range tbls { + err := ValidateFlavor(f.flavor) + if f.valid == true { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } +} diff --git a/mysql/validate.go b/mysql/validate.go new file mode 100644 index 000000000..118de4e37 --- /dev/null +++ b/mysql/validate.go @@ -0,0 +1,14 @@ +package mysql + +import "fmt" + +func ValidateFlavor(flavor string) error { + switch flavor { + case MySQLFlavor: + return nil + case MariaDBFlavor: + return nil + default: + return fmt.Errorf("%s is not a valid flavor", flavor) + } +} diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index aed7f2d77..c3df107bb 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -8,6 +8,7 @@ import ( "net" "os" "strconv" + "strings" "sync" "time" @@ -424,6 +425,15 @@ func (b *BinlogSyncer) GetNextPosition() Position { return b.nextPos } +func (b *BinlogSyncer) checkFlavor() { + serverVersion := b.c.GetServerVersion() + if b.cfg.Flavor != MariaDBFlavor && + strings.Contains(b.c.GetServerVersion(), "MariaDB") { + b.cfg.Logger.Errorf("misconfigured flavor (%s) for server %s", + b.cfg.Flavor, serverVersion) + } +} + // StartSync starts syncing from the `pos` position. func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { b.cfg.Logger.Infof("begin to sync binlog from position %s", pos) @@ -439,6 +449,8 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { return nil, errors.Trace(err) } + b.checkFlavor() + return b.startDumpStream(), nil } @@ -477,6 +489,8 @@ func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { return nil, err } + b.checkFlavor() + return b.startDumpStream(), nil }