Skip to content

Commit

Permalink
Add hashicorp/raft Peer test
Browse files Browse the repository at this point in the history
  • Loading branch information
ohkinozomu committed Aug 5, 2024
1 parent fcc1415 commit e8eb21f
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 2 deletions.
6 changes: 6 additions & 0 deletions cluster/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ func TestCluster(t *testing.T) {
BindPort: bindPort1,
RaftPort: raftPort1,
RaftBootstrap: true,
RaftDir: t.TempDir(),
GrpcEnable: false,
Members: members,
DiscoveryWay: config.DiscoveryWaySerf,
NodesFileDir: t.TempDir(),
}
agent1 := NewAgent(conf1)
err = agent1.Start()
Expand All @@ -66,9 +68,11 @@ func TestCluster(t *testing.T) {
BindPort: bindPort2,
RaftPort: raftPort2,
RaftBootstrap: false,
RaftDir: t.TempDir(),
GrpcEnable: false,
Members: members,
DiscoveryWay: config.DiscoveryWaySerf,
NodesFileDir: t.TempDir(),
}
agent2 := NewAgent(conf2)
err = agent2.Start()
Expand All @@ -82,9 +86,11 @@ func TestCluster(t *testing.T) {
BindPort: bindPort3,
RaftPort: raftPort3,
RaftBootstrap: false,
RaftDir: t.TempDir(),
GrpcEnable: false,
Members: members,
DiscoveryWay: config.DiscoveryWaySerf,
NodesFileDir: t.TempDir(),
}
agent3 := NewAgent(conf3)
err = agent3.Start()
Expand Down
3 changes: 1 addition & 2 deletions cluster/raft/hashicorp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (p *Peer) Stop() {
}

// snapshot
if err := p.snapshot().Error(); err != "" {
if err := p.snapshot(); err != nil {
log.Warn("failed to create snapshot!")
}

Expand Down Expand Up @@ -265,7 +265,6 @@ func (p *Peer) Join(nodeId, addr string) error {
log.Warn("it is already a cluster member, ignoring join request", "node", nodeId, "addr", addr)
return nil
}

future := p.raft.RemoveServer(srv.ID, 0, 0)
if err := future.Error(); err != nil {
return fmt.Errorf("error removing existing node %s at %s: %s", nodeId, addr, err)
Expand Down
134 changes: 134 additions & 0 deletions cluster/raft/hashicorp/peer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package hashicorp

import (
"os"
"path/filepath"
"testing"
"time"

"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
"github.com/wind-c/comqtt/v2/cluster/message"
"github.com/wind-c/comqtt/v2/config"
"github.com/wind-c/comqtt/v2/mqtt/packets"
)

func createTestPeer(t *testing.T) *Peer {
conf := &config.Cluster{
NodeName: "node1",
BindAddr: "127.0.0.1",
RaftPort: 8946,
RaftDir: t.TempDir(),
RaftBootstrap: true,
}
notifyCh := make(chan *message.Message, 1)
peer, err := Setup(conf, notifyCh)
if err != nil {
t.Fatalf("failed to set up peer: %v", err)
}
return peer
}

func TestJoinAndLeave(t *testing.T) {
peer := createTestPeer(t)
defer peer.Stop()

nodeID := "node2"
nodeAddr := "127.0.0.1:8947"

// Test Join
err := peer.Join(nodeID, nodeAddr)
assert.NoError(t, err)

configFuture := peer.raft.GetConfiguration()
assert.NoError(t, configFuture.Error())

var found bool
for _, server := range configFuture.Configuration().Servers {
if server.ID == raft.ServerID(nodeID) && server.Address == raft.ServerAddress(nodeAddr) {
found = true
break
}
}
assert.True(t, found)

// Test Leave
err = peer.Leave(nodeID)
assert.NoError(t, err)

configFuture = peer.raft.GetConfiguration()
assert.NoError(t, configFuture.Error())

found = false
for _, server := range configFuture.Configuration().Servers {
if server.ID == raft.ServerID(nodeID) && server.Address == raft.ServerAddress(nodeAddr) {
found = true
break
}
}
assert.False(t, found)
}

func TestProposeAndLookup(t *testing.T) {
peer := createTestPeer(t)
defer peer.Stop()

msg := &message.Message{
Type: packets.Subscribe,
NodeID: "node1",
Payload: []byte("filter"),
}

err := peer.Propose(msg)
assert.NoError(t, err)

key := "filter"
expectedValue := "node1"

time.Sleep(2 * time.Second)

result := peer.Lookup(key)
assert.Equal(t, []string{expectedValue}, result)
}

func TestIsApplyRight(t *testing.T) {
peer := createTestPeer(t)
defer peer.Stop()

assert.True(t, peer.IsApplyRight())
}

func TestGetLeader(t *testing.T) {
peer := createTestPeer(t)
defer peer.Stop()

addr, id := peer.GetLeader()
assert.Equal(t, "127.0.0.1:8946", addr)
assert.Equal(t, "node1", id)
}

func TestGenPeersFile(t *testing.T) {
peer := createTestPeer(t)
defer peer.Stop()

file := filepath.Join(t.TempDir(), "peers.json")

err := peer.GenPeersFile(file)
assert.NoError(t, err)

_, err = os.Stat(file)
assert.False(t, os.IsNotExist(err))

content, err := os.ReadFile(file)
assert.NoError(t, err)

expectedContent := `[
{
"id": "node1",
"address": "127.0.0.1:8946",
"non_voter":false
}
]`

assert.JSONEq(t, expectedContent, string(content))
}

0 comments on commit e8eb21f

Please sign in to comment.