diff --git a/README.md b/README.md index 8d9dc1777..c36e9b172 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,6 @@ This repo uses [Changelog](CHANGELOG.md). * [Incremental dumping](#canal) * [Client](#client) * [Fake server](#server) -* [Failover](#failover) * [database/sql like driver](#driver) ## Replication @@ -327,18 +326,6 @@ MySQL [(none)]> > > To customize server configurations, use ```NewServer()``` and create connection via ```NewCustomizedConn()```. - -## Failover - -Failover supports to promote a new master and let replicas replicate from it automatically when the old master was down. - -Failover supports MySQL >= 5.6.9 with GTID mode, if you use lower version, e.g, MySQL 5.0 - 5.5, please use [MHA](http://code.google.com/p/mysql-master-ha/) or [orchestrator](https://github.com/outbrain/orchestrator). - -At the same time, Failover supports MariaDB >= 10.0.9 with GTID mode too. - -Why only GTID? Supporting failover with no GTID mode is very hard, because replicas can not find the proper binlog filename and position with the new master. -Although there are many companies use MySQL 5.0 - 5.5, I think upgrade MySQL to 5.6 or higher is easy. - ## Driver Driver is the package that you can use go-mysql with go database/sql like other drivers. A simple example: diff --git a/failover/const.go b/failover/const.go deleted file mode 100644 index 15e27d240..000000000 --- a/failover/const.go +++ /dev/null @@ -1,11 +0,0 @@ -package failover - -const ( - IOThreadType = "IO_THREAD" - SQLThreadType = "SQL_THREAD" -) - -const ( - GTIDModeOn = "ON" - GTIDModeOff = "OFF" -) diff --git a/failover/doc.go b/failover/doc.go deleted file mode 100644 index feb037d28..000000000 --- a/failover/doc.go +++ /dev/null @@ -1,8 +0,0 @@ -// Failover supports to promote a new master and let other slaves -// replicate from it automatically. -// -// Failover does not support monitoring whether a master is alive or not, -// and will think the master is down. -// -// This package is still in development and could not be used in production environment. -package failover diff --git a/failover/failover.go b/failover/failover.go deleted file mode 100644 index 6b76122ae..000000000 --- a/failover/failover.go +++ /dev/null @@ -1,67 +0,0 @@ -package failover - -import ( - "github.com/go-mysql-org/go-mysql/mysql" - "github.com/pingcap/errors" -) - -// Failover will do below things after the master down -// 1. Elect a slave which has the most up-to-date data with old master -// 2. Promote the slave to new master -// 3. Change other slaves to the new master -// -// Limitation: -// 1, All slaves must have the same master before, Failover will check using master server id or uuid -// 2, If the failover error, the whole topology may be wrong, we must handle this error manually -// 3, Slaves must have same replication mode, all use GTID or not -func Failover(flavor string, slaves []*Server) ([]*Server, error) { - var h Handler - var err error - - switch flavor { - case mysql.MySQLFlavor: - h = new(MysqlGTIDHandler) - case mysql.MariaDBFlavor: - return nil, errors.Errorf("MariaDB failover is not supported now") - default: - return nil, errors.Errorf("invalid flavor %s", flavor) - } - - // First check slaves use gtid or not - if err := h.CheckGTIDMode(slaves); err != nil { - return nil, errors.Trace(err) - } - - // Stop all slave IO_THREAD and wait the relay log done - for _, slave := range slaves { - if err = h.WaitRelayLogDone(slave); err != nil { - return nil, errors.Trace(err) - } - } - - var bestSlave *Server - // Find best slave which has the most up-to-data data - if bestSlaves, err := h.FindBestSlaves(slaves); err != nil { - return nil, errors.Trace(err) - } else { - bestSlave = bestSlaves[0] - } - - // Promote the best slave to master - if err = h.Promote(bestSlave); err != nil { - return nil, errors.Trace(err) - } - - // Change master - for i := 0; i < len(slaves); i++ { - if bestSlave == slaves[i] { - continue - } - - if err = h.ChangeMasterTo(slaves[i], bestSlave); err != nil { - return nil, errors.Trace(err) - } - } - - return slaves, nil -} diff --git a/failover/failover_test.go b/failover/failover_test.go deleted file mode 100644 index fe72305ff..000000000 --- a/failover/failover_test.go +++ /dev/null @@ -1,176 +0,0 @@ -package failover - -import ( - "flag" - "fmt" - "testing" - - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - - "github.com/go-mysql-org/go-mysql/test_util" -) - -var enable_failover_test = flag.Bool("test-failover", false, "enable test failover") - -type failoverTestSuite struct { - suite.Suite - s []*Server -} - -func TestFailoverSuite(t *testing.T) { - suite.Run(t, new(failoverTestSuite)) -} - -func (s *failoverTestSuite) SetupSuite() { - if !*enable_failover_test { - s.T().Skip("skip test failover") - } - - ports := []int{3306, 3307, 3308, 3316, 3317, 3318} - - s.s = make([]*Server, len(ports)) - - for i := 0; i < len(ports); i++ { - s.s[i] = NewServer(fmt.Sprintf("%s:%d", *test_util.MysqlHost, ports[i]), User{"root", ""}, User{"root", ""}) - } - - var err error - for i := 0; i < len(ports); i++ { - err = s.s[i].StopSlave() - require.NoError(s.T(), err) - - err = s.s[i].ResetSlaveALL() - require.NoError(s.T(), err) - - _, err = s.s[i].Execute(`SET GLOBAL BINLOG_FORMAT = "ROW"`) - require.NoError(s.T(), err) - - _, err = s.s[i].Execute("DROP TABLE IF EXISTS test.go_mysql_test") - require.NoError(s.T(), err) - - _, err = s.s[i].Execute("CREATE TABLE IF NOT EXISTS test.go_mysql_test (id INT AUTO_INCREMENT, name VARCHAR(256), PRIMARY KEY(id)) engine=innodb") - require.NoError(s.T(), err) - - err = s.s[i].ResetMaster() - require.NoError(s.T(), err) - } -} - -func (s *failoverTestSuite) TearDownSuite() { -} - -func (s *failoverTestSuite) TestMysqlFailover() { - h := new(MysqlGTIDHandler) - - m := s.s[0] - s1 := s.s[1] - s2 := s.s[2] - - s.testFailover(h, m, s1, s2) -} - -func (s *failoverTestSuite) TestMariadbFailover() { - h := new(MariadbGTIDHandler) - - for i := 3; i <= 5; i++ { - _, err := s.s[i].Execute("SET GLOBAL gtid_slave_pos = ''") - require.NoError(s.T(), err) - } - - m := s.s[3] - s1 := s.s[4] - s2 := s.s[5] - - s.testFailover(h, m, s1, s2) -} - -func (s *failoverTestSuite) testFailover(h Handler, m *Server, s1 *Server, s2 *Server) { - var err error - err = h.ChangeMasterTo(s1, m) - require.NoError(s.T(), err) - - err = h.ChangeMasterTo(s2, m) - require.NoError(s.T(), err) - - id := s.checkInsert(m, "a") - - err = h.WaitCatchMaster(s1, m) - require.NoError(s.T(), err) - - err = h.WaitCatchMaster(s2, m) - require.NoError(s.T(), err) - - s.checkSelect(s1, id, "a") - s.checkSelect(s2, id, "a") - - err = s2.StopSlaveIOThread() - require.NoError(s.T(), err) - - _ = s.checkInsert(m, "b") - id = s.checkInsert(m, "c") - - err = h.WaitCatchMaster(s1, m) - require.NoError(s.T(), err) - - s.checkSelect(s1, id, "c") - - best, err := h.FindBestSlaves([]*Server{s1, s2}) - require.NoError(s.T(), err) - require.Equal(s.T(), []*Server{s1}, best) - - // promote s1 to master - err = h.Promote(s1) - require.NoError(s.T(), err) - - // change s2 to master s1 - err = h.ChangeMasterTo(s2, s1) - require.NoError(s.T(), err) - - err = h.WaitCatchMaster(s2, s1) - require.NoError(s.T(), err) - - s.checkSelect(s2, id, "c") - - // change m to master s1 - err = h.ChangeMasterTo(m, s1) - require.NoError(s.T(), err) - - m, s1 = s1, m - _ = s.checkInsert(m, "d") - - err = h.WaitCatchMaster(s1, m) - require.NoError(s.T(), err) - - err = h.WaitCatchMaster(s2, m) - require.NoError(s.T(), err) - - best, err = h.FindBestSlaves([]*Server{s1, s2}) - require.NoError(s.T(), err) - require.Equal(s.T(), []*Server{s1, s2}, best) - - err = s2.StopSlaveIOThread() - require.NoError(s.T(), err) - - _ = s.checkInsert(m, "e") - err = h.WaitCatchMaster(s1, m) - require.NoError(s.T(), err) - - best, err = h.FindBestSlaves([]*Server{s1, s2}) - require.NoError(s.T(), err) - require.Equal(s.T(), []*Server{s1}, best) -} - -func (s *failoverTestSuite) checkSelect(m *Server, id uint64, name string) { - rr, err := m.Execute("SELECT name FROM test.go_mysql_test WHERE id = ?", id) - require.NoError(s.T(), err) - str, _ := rr.GetString(0, 0) - require.Equal(s.T(), name, str) -} - -func (s *failoverTestSuite) checkInsert(m *Server, name string) uint64 { - r, err := m.Execute("INSERT INTO test.go_mysql_test (name) VALUES (?)", name) - require.NoError(s.T(), err) - - return r.InsertId -} diff --git a/failover/handler.go b/failover/handler.go deleted file mode 100644 index f75076734..000000000 --- a/failover/handler.go +++ /dev/null @@ -1,22 +0,0 @@ -package failover - -type Handler interface { - // Promote slave s to master - Promote(s *Server) error - - // Change slave s to master m and replicate from it - ChangeMasterTo(s *Server, m *Server) error - - // Ensure all relay log done, it will stop slave IO_THREAD - // You must start slave again if you want to do replication continuatively - WaitRelayLogDone(s *Server) error - - // Wait until slave s catch all data from master m at current time - WaitCatchMaster(s *Server, m *Server) error - - // Find best slave which has the most up-to-date data from master - FindBestSlaves(slaves []*Server) ([]*Server, error) - - // Check all slaves have gtid enabled - CheckGTIDMode(slaves []*Server) error -} diff --git a/failover/mariadb_gtid_handler.go b/failover/mariadb_gtid_handler.go deleted file mode 100644 index 84166372b..000000000 --- a/failover/mariadb_gtid_handler.go +++ /dev/null @@ -1,142 +0,0 @@ -package failover - -import ( - "fmt" - "net" - - . "github.com/go-mysql-org/go-mysql/mysql" - "github.com/pingcap/errors" -) - -// Limiatation -// + Multi source replication is not supported -// + Slave can not handle write transactions, so maybe readonly or strict_mode = 1 is better -type MariadbGTIDHandler struct { - Handler -} - -func (h *MariadbGTIDHandler) Promote(s *Server) error { - if err := h.WaitRelayLogDone(s); err != nil { - return errors.Trace(err) - } - - if err := s.StopSlave(); err != nil { - return errors.Trace(err) - } - - return nil -} - -func (h *MariadbGTIDHandler) FindBestSlaves(slaves []*Server) ([]*Server, error) { - var bestSlaves []*Server - - ps := make([]uint64, len(slaves)) - - lastIndex := -1 - var seq uint64 - - for i, slave := range slaves { - rr, err := slave.Execute("SELECT @@gtid_current_pos") - - if err != nil { - return nil, errors.Trace(err) - } - - str, _ := rr.GetString(0, 0) - if len(str) == 0 { - seq = 0 - } else { - g, err := ParseMariadbGTID(str) - if err != nil { - return nil, errors.Trace(err) - } - - seq = g.SequenceNumber - } - - ps[i] = seq - - if lastIndex == -1 { - lastIndex = i - bestSlaves = []*Server{slave} - } else { - if ps[lastIndex] < seq { - lastIndex = i - bestSlaves = []*Server{slave} - } else if ps[lastIndex] == seq { - // these two slaves have same data, - bestSlaves = append(bestSlaves, slave) - } - } - } - - return bestSlaves, nil -} - -const changeMasterToWithCurrentPos = `CHANGE MASTER TO - MASTER_HOST = "%s", MASTER_PORT = %s, - MASTER_USER = "%s", MASTER_PASSWORD = "%s", - MASTER_USE_GTID = current_pos` - -func (h *MariadbGTIDHandler) ChangeMasterTo(s *Server, m *Server) error { - if err := h.WaitRelayLogDone(s); err != nil { - return errors.Trace(err) - } - - if err := s.StopSlave(); err != nil { - return errors.Trace(err) - } - - if err := s.ResetSlave(); err != nil { - return errors.Trace(err) - } - - host, port, _ := net.SplitHostPort(m.Addr) - - if _, err := s.Execute(fmt.Sprintf(changeMasterToWithCurrentPos, - host, port, m.ReplUser.Name, m.ReplUser.Password)); err != nil { - return errors.Trace(err) - } - - if err := s.StartSlave(); err != nil { - return errors.Trace(err) - } - - return nil -} - -func (h *MariadbGTIDHandler) WaitRelayLogDone(s *Server) error { - if err := s.StopSlaveIOThread(); err != nil { - return errors.Trace(err) - } - - r, err := s.SlaveStatus() - if err != nil { - return errors.Trace(err) - } - - fname, _ := r.GetStringByName(0, "Master_Log_File") - pos, _ := r.GetIntByName(0, "Read_Master_Log_Pos") - - return s.MasterPosWait(Position{Name: fname, Pos: uint32(pos)}, 0) -} - -func (h *MariadbGTIDHandler) WaitCatchMaster(s *Server, m *Server) error { - r, err := m.Execute("SELECT @@gtid_binlog_pos") - if err != nil { - return errors.Trace(err) - } - - pos, _ := r.GetString(0, 0) - - return h.waitUntilAfterGTID(s, pos) -} - -func (h *MariadbGTIDHandler) CheckGTIDMode(slaves []*Server) error { - return nil -} - -func (h *MariadbGTIDHandler) waitUntilAfterGTID(s *Server, pos string) error { - _, err := s.Execute(fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos)) - return err -} diff --git a/failover/mysql_gtid_handler.go b/failover/mysql_gtid_handler.go deleted file mode 100644 index 86b155c9c..000000000 --- a/failover/mysql_gtid_handler.go +++ /dev/null @@ -1,141 +0,0 @@ -package failover - -import ( - "fmt" - "net" - - . "github.com/go-mysql-org/go-mysql/mysql" - "github.com/pingcap/errors" -) - -type MysqlGTIDHandler struct { - Handler -} - -func (h *MysqlGTIDHandler) Promote(s *Server) error { - if err := h.WaitRelayLogDone(s); err != nil { - return errors.Trace(err) - } - - if err := s.StopSlave(); err != nil { - return errors.Trace(err) - } - - return nil -} - -func (h *MysqlGTIDHandler) FindBestSlaves(slaves []*Server) ([]*Server, error) { - // MHA use Relay_Master_Log_File and Exec_Master_Log_Pos to determind which is the best slave - - var bestSlaves []*Server - - ps := make([]Position, len(slaves)) - - lastIndex := -1 - - for i, slave := range slaves { - pos, err := slave.FetchSlaveExecutePos() - - if err != nil { - return nil, errors.Trace(err) - } - - ps[i] = pos - - if lastIndex == -1 { - lastIndex = i - bestSlaves = []*Server{slave} - } else { - switch ps[lastIndex].Compare(pos) { - case 1: - //do nothing - case -1: - lastIndex = i - bestSlaves = []*Server{slave} - case 0: - // these two slaves have same data, - bestSlaves = append(bestSlaves, slave) - } - } - } - - return bestSlaves, nil -} - -const changeMasterToWithAuto = `CHANGE MASTER TO - MASTER_HOST = "%s", MASTER_PORT = %s, - MASTER_USER = "%s", MASTER_PASSWORD = "%s", - MASTER_AUTO_POSITION = 1` - -func (h *MysqlGTIDHandler) ChangeMasterTo(s *Server, m *Server) error { - if err := h.WaitRelayLogDone(s); err != nil { - return errors.Trace(err) - } - - if err := s.StopSlave(); err != nil { - return errors.Trace(err) - } - - if err := s.ResetSlave(); err != nil { - return errors.Trace(err) - } - - host, port, _ := net.SplitHostPort(m.Addr) - - if _, err := s.Execute(fmt.Sprintf(changeMasterToWithAuto, - host, port, m.ReplUser.Name, m.ReplUser.Password)); err != nil { - return errors.Trace(err) - } - - if err := s.StartSlave(); err != nil { - return errors.Trace(err) - } - - return nil -} - -func (h *MysqlGTIDHandler) WaitRelayLogDone(s *Server) error { - if err := s.StopSlaveIOThread(); err != nil { - return errors.Trace(err) - } - - r, err := s.SlaveStatus() - if err != nil { - return errors.Trace(err) - } - - retrieved, _ := r.GetStringByName(0, "Retrieved_Gtid_Set") - - // may only support MySQL version >= 5.6.9 - // see http://dev.mysql.com/doc/refman/5.6/en/gtid-functions.html - return h.waitUntilAfterGTIDs(s, retrieved) -} - -func (h *MysqlGTIDHandler) WaitCatchMaster(s *Server, m *Server) error { - r, err := m.MasterStatus() - if err != nil { - return errors.Trace(err) - } - - masterGTIDSet, _ := r.GetStringByName(0, "Executed_Gtid_Set") - - return h.waitUntilAfterGTIDs(s, masterGTIDSet) -} - -func (h *MysqlGTIDHandler) CheckGTIDMode(slaves []*Server) error { - for i := 0; i < len(slaves); i++ { - mode, err := slaves[i].MysqlGTIDMode() - if err != nil { - return errors.Trace(err) - } else if mode != GTIDModeOn { - return errors.Errorf("%s use not GTID mode", slaves[i].Addr) - } - } - - return nil -} - -func (h *MysqlGTIDHandler) waitUntilAfterGTIDs(s *Server, gtids string) error { - _, err := s.Execute(fmt.Sprintf("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')", gtids)) - return err -} diff --git a/failover/server.go b/failover/server.go deleted file mode 100644 index 77d8b1b18..000000000 --- a/failover/server.go +++ /dev/null @@ -1,174 +0,0 @@ -package failover - -import ( - "fmt" - - "github.com/go-mysql-org/go-mysql/client" - . "github.com/go-mysql-org/go-mysql/mysql" -) - -type User struct { - Name string - Password string -} - -type Server struct { - Addr string - - User User - ReplUser User - - conn *client.Conn -} - -func NewServer(addr string, user User, replUser User) *Server { - s := new(Server) - - s.Addr = addr - - s.User = user - s.ReplUser = replUser - - return s -} - -func (s *Server) Close() { - if s.conn != nil { - s.conn.Close() - } -} - -func (s *Server) Execute(cmd string, args ...interface{}) (r *Result, err error) { - retryNum := 3 - for i := 0; i < retryNum; i++ { - if s.conn == nil { - s.conn, err = client.Connect(s.Addr, s.User.Name, s.User.Password, "") - if err != nil { - return nil, err - } - } - - r, err = s.conn.Execute(cmd, args...) - if err != nil && ErrorEqual(err, ErrBadConn) { - return - } else if ErrorEqual(err, ErrBadConn) { - s.conn = nil - continue - } else { - return - } - } - return -} - -func (s *Server) StartSlave() error { - _, err := s.Execute("START SLAVE") - return err -} - -func (s *Server) StopSlave() error { - _, err := s.Execute("STOP SLAVE") - return err -} - -func (s *Server) StopSlaveIOThread() error { - _, err := s.Execute("STOP SLAVE IO_THREAD") - return err -} - -func (s *Server) SlaveStatus() (*Resultset, error) { - r, err := s.Execute("SHOW SLAVE STATUS") - if err != nil { - return nil, err - } else { - return r.Resultset, nil - } -} - -func (s *Server) MasterStatus() (*Resultset, error) { - r, err := s.Execute("SHOW MASTER STATUS") - if err != nil { - return nil, err - } else { - return r.Resultset, nil - } -} - -func (s *Server) ResetSlave() error { - _, err := s.Execute("RESET SLAVE") - return err -} - -func (s *Server) ResetSlaveALL() error { - _, err := s.Execute("RESET SLAVE ALL") - return err -} - -func (s *Server) ResetMaster() error { - _, err := s.Execute("RESET MASTER") - return err -} - -func (s *Server) MysqlGTIDMode() (string, error) { - r, err := s.Execute("SELECT @@gtid_mode") - if err != nil { - return GTIDModeOff, err - } - on, _ := r.GetString(0, 0) - if on != GTIDModeOn { - return GTIDModeOff, nil - } else { - return GTIDModeOn, nil - } -} - -func (s *Server) SetReadonly(b bool) error { - var err error - if b { - _, err = s.Execute("SET GLOBAL read_only = ON") - } else { - _, err = s.Execute("SET GLOBAL read_only = OFF") - } - return err -} - -func (s *Server) LockTables() error { - _, err := s.Execute("FLUSH TABLES WITH READ LOCK") - return err -} - -func (s *Server) UnlockTables() error { - _, err := s.Execute("UNLOCK TABLES") - return err -} - -// FetchSlaveReadPos gets current binlog filename and position read from master -func (s *Server) FetchSlaveReadPos() (Position, error) { - r, err := s.SlaveStatus() - if err != nil { - return Position{}, err - } - - fname, _ := r.GetStringByName(0, "Master_Log_File") - pos, _ := r.GetIntByName(0, "Read_Master_Log_Pos") - - return Position{Name: fname, Pos: uint32(pos)}, nil -} - -// FetchSlaveExecutePos gets current executed binlog filename and position from master -func (s *Server) FetchSlaveExecutePos() (Position, error) { - r, err := s.SlaveStatus() - if err != nil { - return Position{}, err - } - - fname, _ := r.GetStringByName(0, "Relay_Master_Log_File") - pos, _ := r.GetIntByName(0, "Exec_Master_Log_Pos") - - return Position{Name: fname, Pos: uint32(pos)}, nil -} - -func (s *Server) MasterPosWait(pos Position, timeout int) error { - _, err := s.Execute(fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %d)", pos.Name, pos.Pos, timeout)) - return err -}