Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replication,cmd: improve flavor handling #946

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/go-canal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
)

var (
Expand Down Expand Up @@ -41,6 +42,12 @@ var (
func main() {
flag.Parse()

err := mysql.ValidateFlavor(*flavor)
if err != nil {
fmt.Printf("Flavor error: %v\n", errors.ErrorStack(err))
return
}

cfg := canal.NewDefaultConfig()
cfg.Addr = net.JoinHostPort(*host, strconv.Itoa(*port))
cfg.User = *user
Expand Down
6 changes: 6 additions & 0 deletions cmd/go-mysqlbinlog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func main() {
MaxReconnectAttempts: 10,
}

err := mysql.ValidateFlavor(*flavor)
if err != nil {
fmt.Printf("Flavor error: %v\n", errors.ErrorStack(err))
return
}

b := replication.NewBinlogSyncer(cfg)

pos := mysql.Position{Name: *file, Pos: uint32(*pos)}
Expand Down
23 changes: 23 additions & 0 deletions mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,26 @@ func mysqlGTIDfromString(t *testing.T, gtidStr string) MysqlGTIDSet {

return *gtid.(*MysqlGTIDSet)
}

func TestValidateFlavor(t *testing.T) {
tbls := []struct {
flavor string
valid bool
}{
{"mysql", true},
{"mariadb", true},
{"maria", false},
{"MariaDB", true},
{"msql", false},
{"mArIAdb", true},
}

for _, f := range tbls {
err := ValidateFlavor(f.flavor)
if f.valid == true {
require.NoError(t, err)
} else {
require.Error(t, err)
}
}
}
17 changes: 17 additions & 0 deletions mysql/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package mysql

import (
"fmt"
"strings"
)

func ValidateFlavor(flavor string) error {
switch strings.ToLower(flavor) {
case MySQLFlavor:
return nil
case MariaDBFlavor:
return nil
default:
return fmt.Errorf("%s is not a valid flavor", flavor)
}
}
17 changes: 17 additions & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"os"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -424,6 +425,18 @@ func (b *BinlogSyncer) GetNextPosition() Position {
return b.nextPos
}

func (b *BinlogSyncer) checkFlavor() {
serverVersion := b.c.GetServerVersion()
if b.cfg.Flavor != MariaDBFlavor &&
strings.Contains(b.c.GetServerVersion(), "MariaDB") {
dveeden marked this conversation as resolved.
Show resolved Hide resolved
// Setting the flavor to `mysql` causes MariaDB to try and behave
// in a MySQL compatible way. In this mode MariaDB won't use
// MariaDB specific binlog event types, but may used dummy events instead.
b.cfg.Logger.Errorf("misconfigured flavor (%s) for server %s",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think we can add comment to explain why this is only a log not a failure to caller, like "MariaDB still sends binlog events in a compatible way although some functionalities are lost, so we just log the problem"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a code comment. Or did you want to put this in the message instead?

b.cfg.Flavor, serverVersion)
}
}

// StartSync starts syncing from the `pos` position.
func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
b.cfg.Logger.Infof("begin to sync binlog from position %s", pos)
Expand All @@ -439,6 +452,8 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
return nil, errors.Trace(err)
}

b.checkFlavor()

return b.startDumpStream(), nil
}

Expand Down Expand Up @@ -477,6 +492,8 @@ func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
return nil, err
}

b.checkFlavor()

return b.startDumpStream(), nil
}

Expand Down
Loading