From 07387d4c91c1305113c66dff7b98bf53b5f4df70 Mon Sep 17 00:00:00 2001 From: ohkinozomu Date: Tue, 30 Jul 2024 18:09:12 +0900 Subject: [PATCH] Add TestCluster --- cluster/agent.go | 14 ----- cluster/agent_test.go | 143 ++++++++++++++++++++++++++++++++++++++++++ cluster/service.go | 9 --- cmd/config/node1.yml | 2 +- cmd/config/node2.yml | 2 +- cmd/config/node3.yml | 2 +- config/config.go | 2 - 7 files changed, 146 insertions(+), 28 deletions(-) create mode 100644 cluster/agent_test.go diff --git a/cluster/agent.go b/cluster/agent.go index 9247c1e..fedb7f0 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -225,20 +225,6 @@ func (a *Agent) Stat() map[string]int64 { return a.membership.Stat() } -func (a *Agent) notifyNewRaftPeer() { - addr := net.JoinHostPort(a.Config.BindAddr, strconv.Itoa(a.Config.RaftPort)) - joinMsg := message.Message{ - Type: message.RaftJoin, - NodeID: a.Config.NodeName, - Payload: []byte(addr)} - - if a.Config.GrpcEnable { - a.grpcClientManager.RaftJoinToOthers() - } else { - a.membership.SendToOthers(joinMsg.MsgpackBytes()) - } -} - func (a *Agent) raftApplyListener() { for { select { diff --git a/cluster/agent_test.go b/cluster/agent_test.go new file mode 100644 index 0000000..0cf30c7 --- /dev/null +++ b/cluster/agent_test.go @@ -0,0 +1,143 @@ +package cluster + +import ( + "net" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/wind-c/comqtt/v2/cluster/log" + "github.com/wind-c/comqtt/v2/config" +) + +func getFreePort() (int, error) { + listener, err := net.Listen("tcp", ":0") + if err != nil { + return 0, err + } + defer listener.Close() + return listener.Addr().(*net.TCPAddr).Port, nil +} + +func TestCluster(t *testing.T) { + log.Init(log.DefaultOptions()) + + bindPort1, err := getFreePort() + require.NoError(t, err, "Failed to get free port for node1") + raftPort1, err := getFreePort() + require.NoError(t, err, "Failed to get free port for node1 Raft") + + bindPort2, err := getFreePort() + require.NoError(t, err, "Failed to get free port for node2") + raftPort2, err := getFreePort() + require.NoError(t, err, "Failed to get free port for node2 Raft") + + bindPort3, err := getFreePort() + require.NoError(t, err, "Failed to get free port for node3") + raftPort3, err := getFreePort() + require.NoError(t, err, "Failed to get free port for node3 Raft") + + members := []string{ + "127.0.0.1:" + strconv.Itoa(bindPort1), + "127.0.0.1:" + strconv.Itoa(bindPort2), + "127.0.0.1:" + strconv.Itoa(bindPort3), + } + + conf1 := &config.Cluster{ + NodeName: "node1", + RaftImpl: config.RaftImplHashicorp, + BindAddr: "127.0.0.1", + BindPort: bindPort1, + RaftPort: raftPort1, + RaftBootstrap: true, + GrpcEnable: false, + Members: members, + DiscoveryWay: config.DiscoveryWaySerf, + } + agent1 := NewAgent(conf1) + err = agent1.Start() + require.NoError(t, err, "Agent start failed for node: %s", conf1.NodeName) + + conf2 := &config.Cluster{ + NodeName: "node2", + RaftImpl: config.RaftImplHashicorp, + BindAddr: "127.0.0.1", + BindPort: bindPort2, + RaftPort: raftPort2, + RaftBootstrap: false, + GrpcEnable: false, + Members: members, + DiscoveryWay: config.DiscoveryWaySerf, + } + agent2 := NewAgent(conf2) + err = agent2.Start() + defer agent2.Stop() + require.NoError(t, err, "Agent start failed for node: %s", conf2.NodeName) + + conf3 := &config.Cluster{ + NodeName: "node3", + RaftImpl: config.RaftImplHashicorp, + BindAddr: "127.0.0.1", + BindPort: bindPort3, + RaftPort: raftPort3, + RaftBootstrap: false, + GrpcEnable: false, + Members: members, + DiscoveryWay: config.DiscoveryWaySerf, + } + agent3 := NewAgent(conf3) + err = agent3.Start() + defer agent3.Stop() + require.NoError(t, err, "Agent start failed for node: %s", conf3.NodeName) + + time.Sleep(5 * time.Second) + + _, leader1 := agent1.raftPeer.GetLeader() + _, leader2 := agent2.raftPeer.GetLeader() + _, leader3 := agent3.raftPeer.GetLeader() + + require.Equal(t, leader1, "node1") + require.Equal(t, leader2, "node1") + require.Equal(t, leader3, "node1") + + members1 := agent1.GetMemberList() + members2 := agent2.GetMemberList() + members3 := agent3.GetMemberList() + + require.Equal(t, len(members1), 3) + require.Equal(t, len(members2), 3) + require.Equal(t, len(members3), 3) + + // Stop agent1 and check new leader + agent1.Stop() + time.Sleep(5 * time.Second) + + _, newLeader2 := agent2.raftPeer.GetLeader() + _, newLeader3 := agent3.raftPeer.GetLeader() + + // Check that either agent2 or agent3 becomes the new leader + if newLeader2 == "node2" || newLeader2 == "node3" { + require.Equal(t, newLeader2, newLeader3, "Leaders should be the same for agent2 and agent3") + } else { + require.Fail(t, "New leader should be either node2 or node3") + } + + // Restart agent1 and verify it is a follower + err = agent1.Start() + require.NoError(t, err, "Agent restart failed for node: %s", conf1.NodeName) + defer agent1.Stop() + + time.Sleep(5 * time.Second) + + _, leaderAfterRestart1 := agent1.raftPeer.GetLeader() + _, leaderAfterRestart2 := agent2.raftPeer.GetLeader() + _, leaderAfterRestart3 := agent3.raftPeer.GetLeader() + + require.Equal(t, leaderAfterRestart2, leaderAfterRestart1) + require.Equal(t, leaderAfterRestart3, leaderAfterRestart1) + + require.NotEqual(t, leaderAfterRestart1, "node1", "After restart, node1 should not be the leader") + + t.Log("Test completed successfully") +} diff --git a/cluster/service.go b/cluster/service.go index 8a79249..0ce33a4 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -134,15 +134,6 @@ func (s *RpcService) RaftJoin(ctx context.Context, req *crpc.JoinRequest) (*crpc return &crpc.Response{Ok: true}, nil } -func genApplyCmd(req *crpc.ApplyRequest) []byte { - msg := message.Message{ - Type: byte(req.Action), - NodeID: req.NodeId, - Payload: req.Filter, - } - return msg.MsgpackBytes() -} - type ClientManager struct { agent *Agent cs map[string]*client diff --git a/cmd/config/node1.yml b/cmd/config/node1.yml index 2ed034f..ae351bb 100644 --- a/cmd/config/node1.yml +++ b/cmd/config/node1.yml @@ -10,7 +10,7 @@ auth: blacklist-path: ./config/blacklist.yml #Special rules outside the usual rules (black and white list),this configuration is invalid for anonymous authentication cluster: - discovery-way: 0 #The node discovery way in the cluster: 0 serf、1 memberlist、2 mDNS + discovery-way: 0 #The node discovery way in the cluster: 0 serf、1 memberlist node-name: c01 #For versatility, use pure numbers. The name of node must be unique in the cluster. bind-addr: 127.0.0.1 #The ip addr used for discovery and communication between nodes. It is usually set to the intranet ip addr. bind-port: 7946 #The port is used for both UDP and TCP gossip. Used for member discovery and join. diff --git a/cmd/config/node2.yml b/cmd/config/node2.yml index 1888224..3af9fe9 100644 --- a/cmd/config/node2.yml +++ b/cmd/config/node2.yml @@ -10,7 +10,7 @@ auth: blacklist-path: ./config/blacklist.yml #Special rules outside the usual rules (black and white list),this configuration is invalid for anonymous authentication cluster: - discovery-way: 0 #The node discovery way in the cluster: 0 serf、1 memberlist、2 mDNS + discovery-way: 0 #The node discovery way in the cluster: 0 serf、1 memberlist node-name: c02 #For versatility, use pure numbers. The name of node must be unique in the cluster. bind-addr: 127.0.0.1 #Configuration related to what address to bind to and ports to listen on. bind-port: 7947 #The port is used for both UDP and TCP gossip. Used for member discovery and join. diff --git a/cmd/config/node3.yml b/cmd/config/node3.yml index f44abd0..4e28b25 100644 --- a/cmd/config/node3.yml +++ b/cmd/config/node3.yml @@ -10,7 +10,7 @@ auth: blacklist-path: ./config/blacklist.yml #Special rules outside the usual rules (black and white list),this configuration is invalid for anonymous authentication cluster: - discovery-way: 0 #The node discovery way in the cluster: 0 serf、1 memberlist、2 mDNS + discovery-way: 0 #The node discovery way in the cluster: 0 serf、1 memberlist node-name: c03 #For versatility, use pure numbers. The name of node must be unique in the cluster. bind-addr: 127.0.0.1 #Configuration related to what address to bind to and ports to listen on. bind-port: 7948 #The port is used for both UDP and TCP gossip. Used for member discovery and join. diff --git a/config/config.go b/config/config.go index 784f384..5fe93f4 100644 --- a/config/config.go +++ b/config/config.go @@ -18,8 +18,6 @@ import ( const ( DiscoveryWaySerf uint = iota DiscoveryWayMemberlist - DiscoveryWayMDNS - DiscoveryWayStatic ) const (