From aa5bca5fb84334d7d29c26b5ccd7c819f344beac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cganesh=E2=80=9D?= Date: Tue, 28 Mar 2017 13:16:54 -0700 Subject: [PATCH] Non-quorum member support. 1. Currently all nodes participate in quorum decision making process 2. This patch allows nodes to be marked as non-quorum members. 3. Such members do not count as the nodes that need to be alive for quorum to exist. 4. Quorum = Majority of quorum nodes are online. 5. Add tests to ensure: - even if majority of non-quorum members die cluster stays in quorum - add/remove of quorum members does not change cluster quorum --- api.go | 6 +- proto/gossip.go | 8 +- proto/gossip_delegates.go | 17 +- proto/gossip_quorom_test.go | 340 ++++++++++++++++++--- proto/gossip_store.go | 119 ++++---- proto/gossip_store_test.go | 2 +- proto/gossip_test.go | 96 +++--- proto/state/state.go | 10 +- proto/state/state_down.go | 32 +- proto/state/state_not_in_quorum.go | 58 ++-- proto/state/state_suspect_not_in_quorum.go | 62 ++-- proto/state/state_up.go | 59 ++-- types/types.go | 24 +- 13 files changed, 566 insertions(+), 267 deletions(-) diff --git a/api.go b/api.go index 93252b5..0e85a5d 100644 --- a/api.go +++ b/api.go @@ -48,7 +48,7 @@ type GossipStore interface { GetLocalNodeInfo(types.NodeId) (types.NodeInfo, error) // Add a new node in the database - AddNode(types.NodeId, types.NodeStatus) + AddNode(types.NodeId, types.NodeStatus, bool) // Remove a node from the database RemoveNode(types.NodeId) error @@ -72,8 +72,8 @@ type Gossiper interface { // GetNodes returns a list of the connection addresses GetNodes() []string - // UpdateCluster updates gossip with latest peer nodes Id-Ip mapping - UpdateCluster(map[types.NodeId]string) + // UpdateCluster updates gossip with latest peer nodes info + UpdateCluster(map[types.NodeId]types.NodeUpdate) // ExternalNodeLeave is used to indicate gossip that one of the nodes might be down. // It checks quorum and appropriately marks either self down or the other node down. diff --git a/proto/gossip.go b/proto/gossip.go index 73fdd35..011dc1f 100644 --- a/proto/gossip.go +++ b/proto/gossip.go @@ -118,7 +118,7 @@ func (g *GossiperImpl) Init( } func (g *GossiperImpl) Start(knownIps []string) error { - g.InitCurrentState(len(knownIps) + 1) + g.InitCurrentState(uint(len(knownIps) + 1)) list, err := ml.Create(g.mlConf) if err != nil { log.Warnf("gossip: Unable to create memberlist: " + err.Error()) @@ -166,10 +166,9 @@ func (g *GossiperImpl) GetNodes() []string { nodeList[i] = node.Addr.String() } return nodeList - } -func (g *GossiperImpl) UpdateCluster(peers map[types.NodeId]string) { +func (g *GossiperImpl) UpdateCluster(peers map[types.NodeId]types.NodeUpdate) { g.updateCluster(peers) g.triggerStateEvent(types.UPDATE_CLUSTER_SIZE) } @@ -181,7 +180,8 @@ func (g *GossiperImpl) ExternalNodeLeave(nodeId types.NodeId) types.NodeId { return nodeId } else { // We are the culprit as we are not in quorum - log.Infof("gossip: Our Status: %v. We should go down.", g.GetSelfStatus()) + log.Infof("gossip: Our Status: %v. We should go down.", + g.GetSelfStatus()) return g.NodeId() } } diff --git a/proto/gossip_delegates.go b/proto/gossip_delegates.go index 9870ffd..7f66421 100644 --- a/proto/gossip_delegates.go +++ b/proto/gossip_delegates.go @@ -51,9 +51,10 @@ func (gd *GossipDelegate) InitGossipDelegate( gd.quorumTimeout = quorumTimeout } -func (gd *GossipDelegate) InitCurrentState(clusterSize int) { +func (gd *GossipDelegate) InitCurrentState(clusterSize uint) { // Our initial state is NOT_IN_QUORUM - gd.currentState = state.GetNotInQuorum(clusterSize, types.NodeId(gd.nodeId), gd.stateEvent) + gd.currentState = state.GetNotInQuorum( + uint(clusterSize), types.NodeId(gd.nodeId), gd.stateEvent) // Start the go routine which handles all the events // and changes state of the node go gd.handleStateEvents() @@ -302,16 +303,20 @@ func (gd *GossipDelegate) handleStateEvents() { case types.NODE_LEAVE: gd.currentState, _ = gd.currentState.NodeLeave(gd.GetLocalState()) case types.UPDATE_CLUSTER_SIZE: - gd.currentState, _ = gd.currentState.UpdateClusterSize(gd.getClusterSize(), gd.GetLocalState()) + gd.currentState, _ = gd.currentState.UpdateClusterSize( + gd.getNumQuorumMembers(), gd.GetLocalState()) case types.TIMEOUT: - newState, _ := gd.currentState.Timeout(gd.getClusterSize(), gd.GetLocalState()) + newState, _ := gd.currentState.Timeout( + gd.getNumQuorumMembers(), gd.GetLocalState()) if newState.NodeStatus() != gd.currentState.NodeStatus() { - logrus.Infof("gossip: Quorum Timeout. Waited for (%v)", gd.quorumTimeout) + logrus.Infof("gossip: Quorum Timeout. Waited for (%v)", + gd.quorumTimeout) } gd.currentState = newState } newStatus := gd.currentState.NodeStatus() - if previousStatus == types.NODE_STATUS_UP && newStatus == types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM { + if previousStatus == types.NODE_STATUS_UP && + newStatus == types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM { // Start a timer go gd.startQuorumTimer() } diff --git a/proto/gossip_quorom_test.go b/proto/gossip_quorom_test.go index 4aaac25..bf5917d 100644 --- a/proto/gossip_quorom_test.go +++ b/proto/gossip_quorom_test.go @@ -16,8 +16,15 @@ func addKey(g *GossiperImpl) types.StoreKey { return key } -func startNode(t *testing.T, selfIp string, nodeId types.NodeId, peerIps []string, peers map[types.NodeId]string) (*GossiperImpl, types.StoreKey) { - g, _ := NewGossiperImpl(selfIp, nodeId, peerIps, types.DEFAULT_GOSSIP_VERSION) +func startNode( + t *testing.T, + selfIp string, + nodeId types.NodeId, + peerIps []string, + peers map[types.NodeId]types.NodeUpdate, +) (*GossiperImpl, types.StoreKey) { + g, _ := NewGossiperImpl(selfIp, nodeId, peerIps, + types.DEFAULT_GOSSIP_VERSION) g.UpdateCluster(peers) key := addKey(g) return g, key @@ -33,15 +40,22 @@ func TestQuorumAllNodesUpOneByOne(t *testing.T) { // Start Node0 with cluster size 1 node0 := types.NodeId("0") - g0, _ := startNode(t, nodes[0], node0, []string{}, map[types.NodeId]string{node0: nodes[0]}) + g0, _ := startNode(t, nodes[0], node0, []string{}, + map[types.NodeId]types.NodeUpdate{ + node0: types.NodeUpdate{nodes[0], true}}) - if g0.GetSelfStatus() != types.NODE_STATUS_UP { - t.Error("Expected Node 0 to have status: ", types.NODE_STATUS_UP) + time.Sleep(g0.GossipInterval()) + status := g0.GetSelfStatus() + if status != types.NODE_STATUS_UP { + t.Error("Expected Node 0 to have status: ", types.NODE_STATUS_UP, + status) } // Start Node1 with cluster size 2 node1 := types.NodeId("1") - peers := map[types.NodeId]string{node0: nodes[0], node1: nodes[1]} + peers := map[types.NodeId]types.NodeUpdate{ + node0: types.NodeUpdate{nodes[0], true}, + node1: types.NodeUpdate{nodes[1], true}} g1, _ := startNode(t, nodes[1], node1, []string{nodes[0]}, peers) g0.UpdateCluster(peers) @@ -71,8 +85,11 @@ func TestQuorumNodeLoosesQuorumAndGainsBack(t *testing.T) { node0 := types.NodeId("0") node1 := types.NodeId("1") // Start Node 0 - g0, _ := startNode(t, nodes[0], node0, []string{}, map[types.NodeId]string{node0: nodes[0]}) + g0, _ := startNode(t, nodes[0], node0, []string{}, + map[types.NodeId]types.NodeUpdate{ + node0: types.NodeUpdate{nodes[0], true}}) + time.Sleep(g0.GossipInterval()) selfStatus := g0.GetSelfStatus() if selfStatus != types.NODE_STATUS_UP { t.Error("Expected Node 0 to have status: ", types.NODE_STATUS_UP, @@ -81,7 +98,10 @@ func TestQuorumNodeLoosesQuorumAndGainsBack(t *testing.T) { // Simulate new node was added by updating the cluster size, but the new node is not talking to node0 // Node 0 should loose quorom 1/2 - g0.UpdateCluster(map[types.NodeId]string{node0: nodes[0], node1: nodes[1]}) + g0.UpdateCluster(map[types.NodeId]types.NodeUpdate{ + node0: types.NodeUpdate{nodes[0], true}, + node1: types.NodeUpdate{nodes[1], true}}) + time.Sleep(g0.GossipInterval() * time.Duration(len(nodes)+1)) selfStatus = g0.GetSelfStatus() if selfStatus != types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM { t.Error("Expected Node 0 to have status: ", types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM, @@ -98,7 +118,10 @@ func TestQuorumNodeLoosesQuorumAndGainsBack(t *testing.T) { } // Lets start the actual Node 1 - g1, _ := startNode(t, nodes[1], node1, []string{nodes[0]}, map[types.NodeId]string{node0: nodes[0], node1: nodes[1]}) + g1, _ := startNode(t, nodes[1], node1, []string{nodes[0]}, + map[types.NodeId]types.NodeUpdate{ + node0: types.NodeUpdate{nodes[0], true}, + node1: types.NodeUpdate{nodes[1], true}}) // Sleep so that nodes gossip time.Sleep(g1.GossipInterval() * time.Duration(len(nodes)+1)) @@ -125,22 +148,31 @@ func TestQuorumTwoNodesLooseConnectivity(t *testing.T) { node0 := types.NodeId("0") node1 := types.NodeId("1") - g0, _ := startNode(t, nodes[0], node0, []string{}, map[types.NodeId]string{node0: nodes[0]}) + g0, _ := startNode(t, nodes[0], node0, []string{}, + map[types.NodeId]types.NodeUpdate{ + node0: types.NodeUpdate{nodes[0], true}}) + time.Sleep(g0.GossipInterval()) if g0.GetSelfStatus() != types.NODE_STATUS_UP { t.Error("Expected Node 0 to have status: ", types.NODE_STATUS_UP) } // Simulate new node was added by updating the cluster size, but the new node is not talking to node0 // Node 0 should loose quorom 1/2 - g0.UpdateCluster(map[types.NodeId]string{node0: nodes[0], node1: nodes[1]}) + g0.UpdateCluster(map[types.NodeId]types.NodeUpdate{ + node0: types.NodeUpdate{nodes[0], true}, + node1: types.NodeUpdate{nodes[1], true}}) + time.Sleep(g0.GossipInterval() * time.Duration(len(nodes)+1)) if g0.GetSelfStatus() != types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM { t.Error("Expected Node 0 to have status: ", types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM) } // Lets start the actual node 1. We do not supply node 0 Ip address here so that node 1 does not talk to node 0 // to simulate NO connectivity between node 0 and node 1 - g1, _ := startNode(t, nodes[1], node1, []string{}, map[types.NodeId]string{node0: nodes[0], node1: nodes[1]}) + g1, _ := startNode(t, nodes[1], node1, []string{}, + map[types.NodeId]types.NodeUpdate{ + node0: types.NodeUpdate{nodes[0], true}, + node1: types.NodeUpdate{nodes[1], true}}) // For node 0 the status will change from UP_WAITING_QUORUM to WAITING_QUORUM after // the quorum timeout @@ -163,12 +195,7 @@ func TestQuorumOneNodeIsolated(t *testing.T) { "127.0.0.3:9908", } - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } - + peers := getNodeUpdateMap(nodes) var gossipers []*GossiperImpl for i, ip := range nodes { nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) @@ -230,14 +257,19 @@ func TestQuorumNetworkPartition(t *testing.T) { for i := 0; i < 3; i++ { nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) var g *GossiperImpl - g, _ = startNode(t, nodes[i], nodeId, []string{nodes[0], nodes[1], nodes[2]}, map[types.NodeId]string{nodeId: nodes[i]}) + g, _ = startNode(t, nodes[i], nodeId, + []string{nodes[0], nodes[1], nodes[2]}, + map[types.NodeId]types.NodeUpdate{ + nodeId: types.NodeUpdate{nodes[i], true}}) gossipers = append(gossipers, g) } // Parition 2 for i := 3; i < 5; i++ { nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) var g *GossiperImpl - g, _ = startNode(t, nodes[i], nodeId, []string{nodes[3], nodes[4]}, map[types.NodeId]string{nodeId: nodes[i]}) + g, _ = startNode(t, nodes[i], nodeId, []string{nodes[3], nodes[4]}, + map[types.NodeId]types.NodeUpdate{ + nodeId: types.NodeUpdate{nodes[i], true}}) gossipers = append(gossipers, g) } // Let the nodes gossip @@ -248,11 +280,7 @@ func TestQuorumNetworkPartition(t *testing.T) { } } - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodes) // Setup the partition by updating the cluster size for _, g := range gossipers { g.UpdateCluster(peers) @@ -299,18 +327,16 @@ func TestQuorumEventHandling(t *testing.T) { for i := 0; i < len(nodes); i++ { nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) var g *GossiperImpl - g, _ = startNode(t, nodes[i], nodeId, []string{nodes[0]}, map[types.NodeId]string{nodeId: nodes[0]}) + g, _ = startNode(t, nodes[i], nodeId, []string{nodes[0]}, + map[types.NodeId]types.NodeUpdate{ + nodeId: types.NodeUpdate{nodes[0], true}}) gossipers = append(gossipers, g) } // Let the nodes gossip time.Sleep(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodes))) - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodes) // Update the cluster size to 5 for i := 0; i < len(nodes); i++ { gossipers[i].UpdateCluster(peers) @@ -350,14 +376,17 @@ func TestQuorumEventHandling(t *testing.T) { // Node 0 still not in quorum. But should be up as quorum timeout not occured yet if gossipers[0].GetSelfStatus() != types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM { - t.Error("Expected Node 0 status to be ", types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM, " Got: ", gossipers[0].GetSelfStatus()) + t.Error("Expected Node 0 status to be ", + types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM, " Got: ", + gossipers[0].GetSelfStatus()) } // Sleep for quorum timeout to occur time.Sleep(gossipers[0].quorumTimeout + 2*time.Second) if gossipers[0].GetSelfStatus() != types.NODE_STATUS_NOT_IN_QUORUM { - t.Error("Expected Node 0 status to be ", types.NODE_STATUS_NOT_IN_QUORUM, " Got: ", gossipers[0].GetSelfStatus()) + t.Error("Expected Node 0 status to be ", types.NODE_STATUS_NOT_IN_QUORUM, + " Got: ", gossipers[0].GetSelfStatus()) } // Start Node 1 @@ -383,12 +412,7 @@ func TestQuorumRemoveNodes(t *testing.T) { "127.0.0.4:9922", } - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } - + peers := getNodeUpdateMap(nodes) var gossipers []*GossiperImpl for i, ip := range nodes { nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) @@ -442,21 +466,22 @@ func TestQuorumAddNodes(t *testing.T) { printTestInfo() node0Ip := "127.0.0.1:9923" node0 := types.NodeId("0") - peers := make(map[types.NodeId]string) - peers[node0] = node0Ip + peers := make(map[types.NodeId]types.NodeUpdate) + peers[node0] = types.NodeUpdate{node0Ip, true} g0, _ := startNode(t, node0Ip, node0, []string{}, peers) // Lets sleep so that the nodes gossip and update their quorum time.Sleep(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(1)) if g0.GetSelfStatus() != types.NODE_STATUS_UP { - t.Error("Expected Node 0 status to be ", types.NODE_STATUS_UP, " Got: ", g0.GetSelfStatus()) + t.Error("Expected Node 0 status to be ", types.NODE_STATUS_UP, + " Got: ", g0.GetSelfStatus()) } // Add a new node node1 := types.NodeId("1") node1Ip := "127.0.0.2:9924" - peers[node1] = node1Ip + peers[node1] = types.NodeUpdate{node1Ip, true} g0.UpdateCluster(peers) time.Sleep(types.DEFAULT_GOSSIP_INTERVAL) @@ -474,3 +499,232 @@ func TestQuorumAddNodes(t *testing.T) { t.Error("Expected Node 0 status to be ", types.NODE_STATUS_UP, " Got: ", g0.GetSelfStatus()) } } + +func TestNonQuorumMembersAddRemove(t *testing.T) { + printTestInfo() + + nodes := []string{ + "127.0.0.1:9925", + "127.0.0.2:9926", + } + + peers := getNodeUpdateMap(nodes) + for id, peer := range peers { + peer.QuorumMember = false + peers[id] = peer + } + var gossipers []*GossiperImpl + for i, ip := range nodes { + nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) + var g *GossiperImpl + if i == 0 { + g, _ = startNode(t, ip, nodeId, []string{}, peers) + } else { + g, _ = startNode(t, ip, nodeId, []string{nodes[0]}, peers) + } + gossipers = append(gossipers, g) + } + + // Lets sleep so that the nodes gossip and update their quorum + time.Sleep(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodes)+1)) + + for i, g := range gossipers { + if g.GetSelfStatus() != types.NODE_STATUS_NOT_IN_QUORUM { + t.Error("Expected Node ", i, " status to be ", + types.NODE_STATUS_NOT_IN_QUORUM, " Got: ", + g.GetSelfStatus()) + } + } + + // 1. Add another non-quorum member and check cluster still waits for quorum. + // 2. Add quorum member and check cluster gets into quuorum state. + newNodes := []string{ + "127.0.0.1:9927", + "127.0.0.2:9928", + "127.0.0.3:9929", + } + for i, node := range newNodes { + quorumMember := i != 0 + nodeId := types.NodeId(strconv.Itoa(len(peers))) + peers[nodeId] = types.NodeUpdate{node, quorumMember} + for _, g := range gossipers { + g.UpdateCluster(peers) + } + + // Start the new node + newGossiper, _ := startNode(t, node, nodeId, []string{nodes[0]}, peers) + gossipers = append(gossipers, newGossiper) + // Sleep so that the nodes gossip and update their quorum + time.Sleep(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(3)) + + expectedStatus := types.NODE_STATUS_NOT_IN_QUORUM + if quorumMember { + expectedStatus = types.NODE_STATUS_UP + } + for j, g := range gossipers { + if g.GetSelfStatus() != expectedStatus { + t.Error("Expected Node ", j, " status to be ", expectedStatus, + " Got: ", g.GetSelfStatus()) + } + } + } + + // nodes are all non-quorum nodes + nodes = append(nodes, newNodes[0]) + // newNodes are quorum deciding members + newNodes = newNodes[1:] + totalNumNodes := len(nodes) + len(newNodes) + + // Shutdown non-quorum members and check cluster is still in quorum + for i, _ := range nodes { + gossipers[i].Stop(types.DEFAULT_GOSSIP_INTERVAL * time.Duration( + totalNumNodes)) + time.Sleep( + types.DEFAULT_GOSSIP_INTERVAL * time.Duration(totalNumNodes+1)) + for j := i + 1; j < totalNumNodes; j++ { + if gossipers[j].GetSelfStatus() != types.NODE_STATUS_UP { + t.Error("Expected Node ", i, " status to be ", + types.NODE_STATUS_UP, + " Got: ", gossipers[j].GetSelfStatus()) + } + } + + // Remove then node + delete(peers, types.NodeId(strconv.Itoa(i))) + for j := i + 1; j < totalNumNodes; j++ { + gossipers[j].UpdateCluster(peers) + } + time.Sleep( + types.DEFAULT_GOSSIP_INTERVAL * time.Duration(totalNumNodes+1)) + + for j := i + 1; j < totalNumNodes; j++ { + if gossipers[j].GetSelfStatus() != types.NODE_STATUS_UP { + t.Error("Expected Node ", j, " status to be ", + types.NODE_STATUS_UP, " Got: ", + gossipers[j].GetSelfStatus()) + } + } + } +} + +func TestMajorityNonQuorumMembersGoDown(t *testing.T) { + printTestInfo() + + nodes := []string{ + "127.0.0.1:9930", + "127.0.0.2:9931", + "127.0.0.3:9932", + } + + peers := getNodeUpdateMap(nodes) + for id, peer := range peers { + if peer.Addr != nodes[2] { + peer.QuorumMember = false + peers[id] = peer + } + } + var gossipers []*GossiperImpl + for i, ip := range nodes { + nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) + var g *GossiperImpl + if i == 0 { + g, _ = startNode(t, ip, nodeId, []string{}, peers) + } else { + g, _ = startNode(t, ip, nodeId, []string{nodes[0]}, peers) + } + gossipers = append(gossipers, g) + } + + // Sleep so that the nodes gossip and update their quorum + time.Sleep(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodes)+1)) + + for i, g := range gossipers { + if g.GetSelfStatus() != types.NODE_STATUS_UP { + t.Error("Expected Node ", i, " status to be ", + types.NODE_STATUS_UP, " Got: ", + g.GetSelfStatus()) + } + } + + // Shutdown non-quorum members at once, which are in majority + // and check cluster remains in quorum + gossipers[0].Stop(types.DEFAULT_GOSSIP_INTERVAL * time.Duration( + len(nodes))) + gossipers[1].Stop(types.DEFAULT_GOSSIP_INTERVAL * time.Duration( + len(nodes))) + + time.Sleep( + types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodes)+1)) + if gossipers[2].GetSelfStatus() != types.NODE_STATUS_UP { + t.Error("Expected Node 2 status to be ", + types.NODE_STATUS_UP, " Got: ", gossipers[2].GetSelfStatus()) + } +} + +func TestNonMajorityQuorumMembersGoDown(t *testing.T) { + printTestInfo() + + nodes := []string{ + "127.0.0.1:9934", + "127.0.0.2:9935", + "127.0.0.3:9936", + } + + // nodes[2] is only quorum member + peers := getNodeUpdateMap(nodes) + for id, peer := range peers { + if peer.Addr != nodes[2] { + peer.QuorumMember = false + peers[id] = peer + } + } + var gossipers []*GossiperImpl + for i, ip := range nodes { + nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) + var g *GossiperImpl + if i == 0 { + g, _ = startNode(t, ip, nodeId, []string{}, peers) + } else { + g, _ = startNode(t, ip, nodeId, []string{nodes[0]}, peers) + } + gossipers = append(gossipers, g) + } + + // Sleep so that the nodes gossip and update their quorum + time.Sleep(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodes)+1)) + + for i, g := range gossipers { + if g.GetSelfStatus() != types.NODE_STATUS_UP { + t.Error("Expected Node ", i, " status to be ", + types.NODE_STATUS_UP, " Got: ", + g.GetSelfStatus()) + } + } + + // Shutdown quorum members at once, which are in majority + // and check cluster remains in quorum + gossipers[2].Stop(types.DEFAULT_GOSSIP_INTERVAL * time.Duration( + len(nodes))) + + time.Sleep( + types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodes)+1)) + for i := 0; i < 2; i++ { + if gossipers[i].GetSelfStatus() != + types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM { + t.Error("Expected Node", i, " status to be ", + types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM, " Got: ", + gossipers[i].GetSelfStatus()) + } + } + + // Sleep for quorum timeout + time.Sleep(gossipers[0].quorumTimeout + 2*time.Second) + for i := 0; i < 2; i++ { + if gossipers[i].GetSelfStatus() != + types.NODE_STATUS_NOT_IN_QUORUM { + t.Error("Expected Node", i, " status to be ", + types.NODE_STATUS_NOT_IN_QUORUM, " Got: ", + gossipers[i].GetSelfStatus()) + } + } +} diff --git a/proto/gossip_store.go b/proto/gossip_store.go index 74848df..534d984 100644 --- a/proto/gossip_store.go +++ b/proto/gossip_store.go @@ -1,11 +1,11 @@ package proto import ( + "bytes" + "encoding/gob" "fmt" "sync" "time" - "bytes" - "encoding/gob" "github.com/Sirupsen/logrus" "github.com/libopenstorage/gossip/types" @@ -28,7 +28,9 @@ type GossipStoreImpl struct { // number of nodes in the cluster other than just relying on // memberlist and the length of nodeMap. It is used in // determining the cluster quorum - clusterSize int + clusterSize uint + // numQuorumMembers is the number of members which participate in quorum + numQuorumMembers uint // Ts at which we lost quorum lostQuorumTs time.Time } @@ -96,13 +98,7 @@ func (s *GossipStoreImpl) UpdateSelf(key types.StoreKey, val interface{}) { } func (s *GossipStoreImpl) UpdateSelfStatus(status types.NodeStatus) { - s.Lock() - defer s.Unlock() - - nodeInfo, _ := s.nodeMap[s.id] - nodeInfo.Status = status - nodeInfo.LastUpdateTs = time.Now() - s.nodeMap[s.id] = nodeInfo + s.UpdateNodeStatus(s.id, status) } func (s *GossipStoreImpl) GetSelfStatus() types.NodeStatus { @@ -183,38 +179,53 @@ func statusValid(s types.NodeStatus) bool { s != types.NODE_STATUS_NEVER_GOSSIPED) } -func (s *GossipStoreImpl) AddNode(id types.NodeId, status types.NodeStatus) { +func (s *GossipStoreImpl) AddNode( + id types.NodeId, + status types.NodeStatus, + quorumMember bool, +) { s.Lock() + defer s.Unlock() + s.addNodeUnlocked(id, status, quorumMember) +} + +func (s *GossipStoreImpl) addNodeUnlocked( + id types.NodeId, + status types.NodeStatus, + quorumMember bool, +) { if nodeInfo, ok := s.nodeMap[id]; ok { nodeInfo.Status = status nodeInfo.LastUpdateTs = time.Now() + nodeInfo.QuorumMember = quorumMember s.nodeMap[id] = nodeInfo - s.Unlock() return } - newNodeInfo := types.NodeInfo{ + s.nodeMap[id] = types.NodeInfo{ Id: id, GenNumber: 0, LastUpdateTs: time.Now(), WaitForGenUpdateTs: time.Now(), Status: status, Value: make(types.StoreMap), + QuorumMember: quorumMember, } - s.nodeMap[id] = newNodeInfo logrus.Infof("gossip: Adding Node to gossip map: %v", id) - s.Unlock() } func (s *GossipStoreImpl) RemoveNode(id types.NodeId) error { s.Lock() + defer s.Unlock() + return s.removeNodeUnlocked(id) +} + +func (s *GossipStoreImpl) removeNodeUnlocked(id types.NodeId) error { if _, ok := s.nodeMap[id]; !ok { - s.Unlock() return fmt.Errorf("Node %v does not exist in map", id) } logrus.Infof("gossip: Removing node from gossip map: %v", id) delete(s.nodeMap, id) - s.Unlock() return nil } @@ -265,66 +276,64 @@ func (s *GossipStoreImpl) Update(diff types.NodeInfoMap) { } selfValue, ok := s.nodeMap[id] if !ok { - // We got an update for a node which we do not have in our map - // Lets add it with an offline state + // Ignore updates for a node which we do not know about. continue } if !statusValid(selfValue.Status) || selfValue.LastUpdateTs.Before(newNodeInfo.LastUpdateTs) { - // Our view of Status of a Node, should only be determined by memberlist. - // We should not update the Status field in our nodeInfo based on what other node's - // value is. + // Our view of Status of a Node, should only be determined by + // memberlist. We should not update the Status field in our + // nodeInfo based on what other node's value is. newNodeInfo.Status = selfValue.Status s.nodeMap[id] = newNodeInfo } } } -func (s *GossipStoreImpl) updateCluster(peers map[types.NodeId]string) { +func (s *GossipStoreImpl) updateCluster( + peers map[types.NodeId]types.NodeUpdate, +) { removeNodeIds := []types.NodeId{} addNodeIds := []types.NodeId{} s.Lock() - s.clusterSize = len(peers) - // Lets check if a node was added or removed. - if len(s.nodeMap) > len(peers) { - // Node removed - for id := range s.nodeMap { - if _, ok := peers[id]; !ok { - removeNodeIds = append(removeNodeIds, id) - } - } - } else if len(s.nodeMap) < len(peers) { - // Node added - for id := range peers { - if _, ok := s.nodeMap[id]; !ok { - addNodeIds = append(addNodeIds, id) - } - } - } else { - // Nodes removed - for id := range s.nodeMap { - if _, ok := peers[id]; !ok { - removeNodeIds = append(removeNodeIds, id) - } + defer s.Unlock() + s.clusterSize = uint(len(peers)) + // Nodes removed + for id := range s.nodeMap { + if _, ok := peers[id]; !ok { + removeNodeIds = append(removeNodeIds, id) } - // Nodes added - for id := range peers { - if _, ok := s.nodeMap[id]; !ok { - addNodeIds = append(addNodeIds, id) - } + } + // Nodes added + for id := range peers { + if _, ok := s.nodeMap[id]; !ok { + addNodeIds = append(addNodeIds, id) } } - s.Unlock() + for _, nodeId := range removeNodeIds { - s.RemoveNode(nodeId) + s.removeNodeUnlocked(nodeId) } for _, nodeId := range addNodeIds { - s.AddNode(nodeId, types.NODE_STATUS_DOWN) + update, _ := peers[nodeId] + s.addNodeUnlocked(nodeId, types.NODE_STATUS_DOWN, update.QuorumMember) + } + + // Update quorum members + s.numQuorumMembers = 0 + for id, nodeInfo := range s.nodeMap { + if update, ok := peers[id]; ok { + nodeInfo.QuorumMember = update.QuorumMember + s.nodeMap[id] = nodeInfo + } + if nodeInfo.QuorumMember { + s.numQuorumMembers++ + } } } -func (s *GossipStoreImpl) getClusterSize() int { - return s.clusterSize +func (s *GossipStoreImpl) getNumQuorumMembers() uint { + return s.numQuorumMembers } func (s *GossipStoreImpl) convertToBytes(obj interface{}) ([]byte, error) { diff --git a/proto/gossip_store_test.go b/proto/gossip_store_test.go index f544952..842878b 100644 --- a/proto/gossip_store_test.go +++ b/proto/gossip_store_test.go @@ -235,7 +235,7 @@ func TestGossipStoreUpdateData(t *testing.T) { diff = make(types.NodeInfoMap) nodeLen := 5 for i := 0; i < nodeLen; i++ { - g.AddNode(types.NodeId(strconv.Itoa(i)), types.NODE_STATUS_UP) + g.AddNode(types.NodeId(strconv.Itoa(i)), types.NODE_STATUS_UP, true) } keyList := []types.StoreKey{"key1", "key2", "key3", "key4", "key5"} for _, key := range keyList { diff --git a/proto/gossip_test.go b/proto/gossip_test.go index 6afb1e3..9f83808 100644 --- a/proto/gossip_test.go +++ b/proto/gossip_test.go @@ -30,14 +30,34 @@ func newGossiperImpl(ip string, selfNodeId types.NodeId, knownIps []string, vers return g, err } -func NewGossiperImpl(ip string, selfNodeId types.NodeId, knownIps []string, version string) (*GossiperImpl, error) { +func NewGossiperImpl( + ip string, + selfNodeId types.NodeId, + knownIps []string, + version string, +) (*GossiperImpl, error) { return newGossiperImpl(ip, selfNodeId, knownIps, version, DEFAULT_CLUSTER_ID) } -func NewGossiperImplWithClusterId(ip string, selfNodeId types.NodeId, knownIps []string, version, clusterId string) (*GossiperImpl, error) { +func NewGossiperImplWithClusterId( + ip string, + selfNodeId types.NodeId, + knownIps []string, + version, + clusterId string, +) (*GossiperImpl, error) { return newGossiperImpl(ip, selfNodeId, knownIps, version, clusterId) } +func getNodeUpdateMap(nodesIp []string) map[types.NodeId]types.NodeUpdate { + peers := make(map[types.NodeId]types.NodeUpdate) + for i, ip := range nodesIp { + nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) + peers[nodeId] = types.NodeUpdate{ip, true} + } + return peers +} + func TestGossiperStartStopGetNode(t *testing.T) { printTestInfo() @@ -49,11 +69,7 @@ func TestGossiperStartStopGetNode(t *testing.T) { "127.0.0.5:8127", } - peers := make(map[types.NodeId]string) - for i, ip := range nodesIp { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodesIp) clusterSize := len(nodesIp) gossipers := make([]*GossiperImpl, clusterSize) @@ -113,11 +129,7 @@ func TestGossiperOnlyOneNodeGossips(t *testing.T) { "127.0.0.3:9224", } - peers := make(map[types.NodeId]string) - for i, ip := range nodesIp { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodesIp) rand.Seed(time.Now().UnixNano()) id := types.NodeId(strconv.Itoa(0)) @@ -196,11 +208,7 @@ func TestGossiperOneNodeNeverGossips(t *testing.T) { "127.0.0.2:9623", "127.0.0.3:9624", } - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodes) rand.Seed(time.Now().UnixNano()) gossipers := make(map[int]*GossiperImpl) @@ -367,14 +375,14 @@ func TestGossiperGroupingOfNodesWithSameVersion(t *testing.T) { "127.0.0.5:9825", } - peers1 := make(map[types.NodeId]string) - peers2 := make(map[types.NodeId]string) + peers1 := make(map[types.NodeId]types.NodeUpdate) + peers2 := make(map[types.NodeId]types.NodeUpdate) for i, ip := range nodes { nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) if i != 0 && i%2 == 0 { - peers2[nodeId] = ip + peers2[nodeId] = types.NodeUpdate{ip, true} } else { - peers1[nodeId] = ip + peers1[nodeId] = types.NodeUpdate{ip, true} } } @@ -466,11 +474,7 @@ func TestGossiperUpdateNodeIp(t *testing.T) { "127.0.0.3:9327", } - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodes) rand.Seed(time.Now().UnixNano()) gossipers := make(map[int]*GossiperImpl) @@ -704,11 +708,7 @@ func TestGossiperAddNodeExternally(t *testing.T) { "127.0.0.2:9159", } - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodes) rand.Seed(time.Now().UnixNano()) gossipers := make(map[int]*GossiperImpl) @@ -721,7 +721,7 @@ func TestGossiperAddNodeExternally(t *testing.T) { } nodes = append(nodes, "127.0.0.3:9160") - peers[types.NodeId("2")] = nodes[2] + peers[types.NodeId("2")] = types.NodeUpdate{nodes[2], true} for _, g := range gossipers { g.UpdateCluster(peers) @@ -786,11 +786,7 @@ func TestGossiperRemoveNodeExternally(t *testing.T) { "127.0.0.3:9163", } - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodes) rand.Seed(time.Now().UnixNano()) gossipers := make(map[int]*GossiperImpl) @@ -857,11 +853,7 @@ func TestGossiperExternalNodeLeaveSelfKill(t *testing.T) { "127.0.0.3:9166", } - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodes) rand.Seed(time.Now().UnixNano()) gossipers := make(map[int]*GossiperImpl) @@ -901,11 +893,7 @@ func TestGossiperExternalNodeLeavePeerKill(t *testing.T) { "127.0.0.3:9169", } - peers := make(map[types.NodeId]string) - for i, ip := range nodes { - nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) - peers[nodeId] = ip - } + peers := getNodeUpdateMap(nodes) rand.Seed(time.Now().UnixNano()) gossipers := make(map[int]*GossiperImpl) @@ -945,14 +933,14 @@ func TestGossiperNodesWithDifferentClusterId(t *testing.T) { rand.Seed(time.Now().UnixNano()) gossipers := make(map[int]*GossiperImpl) - peers1 := make(map[types.NodeId]string) - peers2 := make(map[types.NodeId]string) + peers1 := make(map[types.NodeId]types.NodeUpdate) + peers2 := make(map[types.NodeId]types.NodeUpdate) for i, ip := range nodes { nodeId := types.NodeId(strconv.FormatInt(int64(i), 10)) if i == 2 || i == 4 { - peers2[nodeId] = ip + peers2[nodeId] = types.NodeUpdate{ip, true} } else { - peers1[nodeId] = ip + peers1[nodeId] = types.NodeUpdate{ip, true} } } @@ -962,10 +950,12 @@ func TestGossiperNodesWithDifferentClusterId(t *testing.T) { var g *GossiperImpl if i == 2 || i == 4 { // Set a different clusterId - g, _ = NewGossiperImplWithClusterId(nodeId, id, nodes, types.DEFAULT_GOSSIP_VERSION, "test-cluster-1") + g, _ = NewGossiperImplWithClusterId(nodeId, id, nodes, + types.DEFAULT_GOSSIP_VERSION, "test-cluster-1") g.UpdateCluster(peers2) } else { - g, _ = NewGossiperImplWithClusterId(nodeId, id, nodes, types.DEFAULT_GOSSIP_VERSION, "test-cluster-2") + g, _ = NewGossiperImplWithClusterId(nodeId, id, nodes, + types.DEFAULT_GOSSIP_VERSION, "test-cluster-2") g.UpdateCluster(peers1) } diff --git a/proto/state/state.go b/proto/state/state.go index 17623b2..523ad53 100644 --- a/proto/state/state.go +++ b/proto/state/state.go @@ -26,10 +26,16 @@ type State interface { NodeLeave(nodeInfoMap types.NodeInfoMap) (State, error) // UpdateClusterSize is an event indicating the change in cluster size - UpdateClusterSize(clusterSize int, nodeInfoMap types.NodeInfoMap) (State, error) + UpdateClusterSize( + numQuorumMembers uint, + nodeInfoMap types.NodeInfoMap, + ) (State, error) // Timeout is an event triggered when quorum timeout has reached - Timeout(clusterSize int, nodeInfoMap types.NodeInfoMap) (State, error) + Timeout( + numQuorumMembers uint, + nodeInfoMap types.NodeInfoMap, + ) (State, error) // String String() string diff --git a/proto/state/state_down.go b/proto/state/state_down.go index a12d319..e93e570 100644 --- a/proto/state/state_down.go +++ b/proto/state/state_down.go @@ -5,18 +5,22 @@ import ( ) type down struct { - nodeStatus types.NodeStatus - id types.NodeId - clusterSize int - stateEvent chan types.StateEvent + nodeStatus types.NodeStatus + id types.NodeId + numQuorumMembers uint + stateEvent chan types.StateEvent } -func GetDown(clusterSize int, selfId types.NodeId, stateEvent chan types.StateEvent) State { +func GetDown( + numQuorumMembers uint, + selfId types.NodeId, + stateEvent chan types.StateEvent, +) State { return &down{ - nodeStatus: types.NODE_STATUS_DOWN, - clusterSize: clusterSize, - id: selfId, - stateEvent: stateEvent, + nodeStatus: types.NODE_STATUS_DOWN, + numQuorumMembers: numQuorumMembers, + id: selfId, + stateEvent: stateEvent, } } @@ -44,10 +48,16 @@ func (d *down) NodeLeave(localNodeInfoMap types.NodeInfoMap) (State, error) { return d, nil } -func (d *down) UpdateClusterSize(clusterSize int, localNodeInfoMap types.NodeInfoMap) (State, error) { +func (d *down) UpdateClusterSize( + numQuorumMembers uint, + localNodeInfoMap types.NodeInfoMap, +) (State, error) { return d, nil } -func (d *down) Timeout(clusterSize int, localNodeInfoMap types.NodeInfoMap) (State, error) { +func (d *down) Timeout( + numQuorumMembers uint, + localNodeInfoMap types.NodeInfoMap, +) (State, error) { return d, nil } diff --git a/proto/state/state_not_in_quorum.go b/proto/state/state_not_in_quorum.go index 7691b19..da694ff 100644 --- a/proto/state/state_not_in_quorum.go +++ b/proto/state/state_not_in_quorum.go @@ -5,20 +5,24 @@ import ( ) type notInQuorum struct { - nodeStatus types.NodeStatus - id types.NodeId - clusterSize int - stateEvent chan types.StateEvent + nodeStatus types.NodeStatus + id types.NodeId + numQuorumMembers uint + stateEvent chan types.StateEvent } var instanceNotInQuorum *notInQuorum -func GetNotInQuorum(clusterSize int, selfId types.NodeId, stateEvent chan types.StateEvent) State { +func GetNotInQuorum( + numQuorumMembers uint, + selfId types.NodeId, + stateEvent chan types.StateEvent, +) State { return ¬InQuorum{ - nodeStatus: types.NODE_STATUS_NOT_IN_QUORUM, - clusterSize: clusterSize, - id: selfId, - stateEvent: stateEvent, + nodeStatus: types.NODE_STATUS_NOT_IN_QUORUM, + numQuorumMembers: numQuorumMembers, + id: selfId, + stateEvent: stateEvent, } } @@ -31,48 +35,56 @@ func (niq *notInQuorum) NodeStatus() types.NodeStatus { } func (niq *notInQuorum) SelfAlive(localNodeInfoMap types.NodeInfoMap) (State, error) { - quorum := (niq.clusterSize / 2) + 1 - upNodes := calculateUpNodes(localNodeInfoMap) + quorum := (niq.numQuorumMembers / 2) + 1 + upNodes := numQuorumMembersUp(localNodeInfoMap) if upNodes < quorum { return niq, nil } else { - up := GetUp(niq.clusterSize, niq.id, niq.stateEvent) + up := GetUp(niq.numQuorumMembers, niq.id, niq.stateEvent) return up, nil } } func (niq *notInQuorum) NodeAlive(localNodeInfoMap types.NodeInfoMap) (State, error) { - quorum := (niq.clusterSize / 2) + 1 - upNodes := calculateUpNodes(localNodeInfoMap) + quorum := (niq.numQuorumMembers / 2) + 1 + upNodes := numQuorumMembersUp(localNodeInfoMap) if upNodes < quorum { return niq, nil } else { - up := GetUp(niq.clusterSize, niq.id, niq.stateEvent) + up := GetUp(niq.numQuorumMembers, niq.id, niq.stateEvent) return up, nil } } func (niq *notInQuorum) SelfLeave() (State, error) { - down := GetDown(niq.clusterSize, niq.id, niq.stateEvent) + down := GetDown(niq.numQuorumMembers, niq.id, niq.stateEvent) return down, nil } -func (niq *notInQuorum) NodeLeave(localNodeInfoMap types.NodeInfoMap) (State, error) { +func (niq *notInQuorum) NodeLeave( + localNodeInfoMap types.NodeInfoMap, +) (State, error) { return niq, nil } -func (niq *notInQuorum) UpdateClusterSize(clusterSize int, localNodeInfoMap types.NodeInfoMap) (State, error) { - niq.clusterSize = clusterSize - quorum := (niq.clusterSize / 2) + 1 - upNodes := calculateUpNodes(localNodeInfoMap) +func (niq *notInQuorum) UpdateClusterSize( + numQuorumMembers uint, + localNodeInfoMap types.NodeInfoMap, +) (State, error) { + niq.numQuorumMembers = numQuorumMembers + quorum := (niq.numQuorumMembers / 2) + 1 + upNodes := numQuorumMembersUp(localNodeInfoMap) if upNodes < quorum { return niq, nil } else { - up := GetUp(niq.clusterSize, niq.id, niq.stateEvent) + up := GetUp(niq.numQuorumMembers, niq.id, niq.stateEvent) return up, nil } } -func (niq *notInQuorum) Timeout(clusterSize int, localNodeInfoMap types.NodeInfoMap) (State, error) { +func (niq *notInQuorum) Timeout( + numQuorumMembers uint, + localNodeInfoMap types.NodeInfoMap, +) (State, error) { return niq, nil } diff --git a/proto/state/state_suspect_not_in_quorum.go b/proto/state/state_suspect_not_in_quorum.go index b478040..9363144 100644 --- a/proto/state/state_suspect_not_in_quorum.go +++ b/proto/state/state_suspect_not_in_quorum.go @@ -5,20 +5,24 @@ import ( ) type suspectNotInQuorum struct { - nodeStatus types.NodeStatus - id types.NodeId - clusterSize int - stateEvent chan types.StateEvent + nodeStatus types.NodeStatus + id types.NodeId + numQuorumMembers uint + stateEvent chan types.StateEvent } var instanceSuspectNotInQuorum *suspectNotInQuorum -func GetSuspectNotInQuorum(clusterSize int, selfId types.NodeId, stateEvent chan types.StateEvent) State { +func GetSuspectNotInQuorum( + numQuorumMembers uint, + selfId types.NodeId, + stateEvent chan types.StateEvent, +) State { return &suspectNotInQuorum{ - nodeStatus: types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM, - clusterSize: clusterSize, - id: selfId, - stateEvent: stateEvent, + nodeStatus: types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM, + numQuorumMembers: numQuorumMembers, + id: selfId, + stateEvent: stateEvent, } } @@ -35,46 +39,54 @@ func (siq *suspectNotInQuorum) SelfAlive(localNodeInfoMap types.NodeInfoMap) (St } func (siq *suspectNotInQuorum) NodeAlive(localNodeInfoMap types.NodeInfoMap) (State, error) { - quorum := (siq.clusterSize / 2) + 1 - upNodes := calculateUpNodes(localNodeInfoMap) + quorum := (siq.numQuorumMembers / 2) + 1 + upNodes := numQuorumMembersUp(localNodeInfoMap) if upNodes < quorum { return siq, nil } else { - up := GetUp(siq.clusterSize, siq.id, siq.stateEvent) + up := GetUp(siq.numQuorumMembers, siq.id, siq.stateEvent) return up, nil } } func (siq *suspectNotInQuorum) SelfLeave() (State, error) { - down := GetDown(siq.clusterSize, siq.id, siq.stateEvent) + down := GetDown(siq.numQuorumMembers, siq.id, siq.stateEvent) return down, nil } -func (siq *suspectNotInQuorum) NodeLeave(localNodeInfoMap types.NodeInfoMap) (State, error) { +func (siq *suspectNotInQuorum) NodeLeave( + localNodeInfoMap types.NodeInfoMap, +) (State, error) { return siq, nil } -func (siq *suspectNotInQuorum) UpdateClusterSize(clusterSize int, localNodeInfoMap types.NodeInfoMap) (State, error) { - siq.clusterSize = clusterSize - quorum := (siq.clusterSize / 2) + 1 - upNodes := calculateUpNodes(localNodeInfoMap) +func (siq *suspectNotInQuorum) UpdateClusterSize( + numQuorumMembers uint, + localNodeInfoMap types.NodeInfoMap, +) (State, error) { + siq.numQuorumMembers = numQuorumMembers + quorum := (siq.numQuorumMembers / 2) + 1 + upNodes := numQuorumMembersUp(localNodeInfoMap) if upNodes < quorum { return siq, nil } else { - up := GetUp(siq.clusterSize, siq.id, siq.stateEvent) + up := GetUp(siq.numQuorumMembers, siq.id, siq.stateEvent) return up, nil } } -func (siq *suspectNotInQuorum) Timeout(clusterSize int, localNodeInfoMap types.NodeInfoMap) (State, error) { - siq.clusterSize = clusterSize - quorum := (siq.clusterSize / 2) + 1 - upNodes := calculateUpNodes(localNodeInfoMap) +func (siq *suspectNotInQuorum) Timeout( + numQuorumMembers uint, + localNodeInfoMap types.NodeInfoMap, +) (State, error) { + siq.numQuorumMembers = numQuorumMembers + quorum := (siq.numQuorumMembers / 2) + 1 + upNodes := numQuorumMembersUp(localNodeInfoMap) if upNodes < quorum { - notInQuorum := GetNotInQuorum(siq.clusterSize, siq.id, siq.stateEvent) + notInQuorum := GetNotInQuorum(siq.numQuorumMembers, siq.id, siq.stateEvent) return notInQuorum, nil } else { - up := GetUp(siq.clusterSize, siq.id, siq.stateEvent) + up := GetUp(siq.numQuorumMembers, siq.id, siq.stateEvent) return up, nil } } diff --git a/proto/state/state_up.go b/proto/state/state_up.go index 2361ea1..7843b27 100644 --- a/proto/state/state_up.go +++ b/proto/state/state_up.go @@ -5,18 +5,22 @@ import ( ) type up struct { - nodeStatus types.NodeStatus - id types.NodeId - clusterSize int - stateEvent chan types.StateEvent + nodeStatus types.NodeStatus + id types.NodeId + numQuorumMembers uint + stateEvent chan types.StateEvent } -func GetUp(clusterSize int, selfId types.NodeId, stateEvent chan types.StateEvent) State { +func GetUp( + numQuorumMembers uint, + selfId types.NodeId, + stateEvent chan types.StateEvent, +) State { return &up{ - nodeStatus: types.NODE_STATUS_UP, - clusterSize: clusterSize, - id: selfId, - stateEvent: stateEvent, + nodeStatus: types.NODE_STATUS_UP, + numQuorumMembers: numQuorumMembers, + id: selfId, + stateEvent: stateEvent, } } @@ -37,16 +41,17 @@ func (u *up) NodeAlive(localNodeInfoMap types.NodeInfoMap) (State, error) { } func (u *up) SelfLeave() (State, error) { - down := GetDown(u.clusterSize, u.id, u.stateEvent) + down := GetDown(u.numQuorumMembers, u.id, u.stateEvent) return down, nil } -func calculateUpNodes(localNodeInfoMap types.NodeInfoMap) int { - upNodes := 0 +func numQuorumMembersUp(localNodeInfoMap types.NodeInfoMap) uint { + upNodes := uint(0) for _, nodeInfo := range localNodeInfoMap { - if nodeInfo.Status == types.NODE_STATUS_UP || - nodeInfo.Status == types.NODE_STATUS_NOT_IN_QUORUM || - nodeInfo.Status == types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM { + if nodeInfo.QuorumMember && + (nodeInfo.Status == types.NODE_STATUS_UP || + nodeInfo.Status == types.NODE_STATUS_NOT_IN_QUORUM || + nodeInfo.Status == types.NODE_STATUS_SUSPECT_NOT_IN_QUORUM) { upNodes++ } } @@ -54,28 +59,34 @@ func calculateUpNodes(localNodeInfoMap types.NodeInfoMap) int { } func (u *up) NodeLeave(localNodeInfoMap types.NodeInfoMap) (State, error) { - quorum := (u.clusterSize / 2) + 1 - upNodes := calculateUpNodes(localNodeInfoMap) + quorum := (u.numQuorumMembers / 2) + 1 + upNodes := numQuorumMembersUp(localNodeInfoMap) if upNodes < quorum { // Caller of this function should start a timer - return GetSuspectNotInQuorum(u.clusterSize, u.id, u.stateEvent), nil + return GetSuspectNotInQuorum(u.numQuorumMembers, u.id, u.stateEvent), nil } else { return u, nil } } -func (u *up) UpdateClusterSize(clusterSize int, localNodeInfoMap types.NodeInfoMap) (State, error) { - u.clusterSize = clusterSize - quorum := (u.clusterSize / 2) + 1 - upNodes := calculateUpNodes(localNodeInfoMap) +func (u *up) UpdateClusterSize( + numQuorumMembers uint, + localNodeInfoMap types.NodeInfoMap, +) (State, error) { + u.numQuorumMembers = numQuorumMembers + quorum := (u.numQuorumMembers / 2) + 1 + upNodes := numQuorumMembersUp(localNodeInfoMap) if upNodes < quorum { // Caller of this function should start a timer - return GetSuspectNotInQuorum(u.clusterSize, u.id, u.stateEvent), nil + return GetSuspectNotInQuorum(u.numQuorumMembers, u.id, u.stateEvent), nil } else { return u, nil } } -func (u *up) Timeout(clusterSize int, localNodeInfoMap types.NodeInfoMap) (State, error) { +func (u *up) Timeout( + numQuorumMembers uint, + localNodeInfoMap types.NodeInfoMap, +) (State, error) { return u, nil } diff --git a/types/types.go b/types/types.go index f57a8dc..669730f 100644 --- a/types/types.go +++ b/types/types.go @@ -43,23 +43,12 @@ const ( TIMEOUT ) -type GossipDirection uint8 - -const ( - // Direction of gossip - GD_ME_TO_PEER GossipDirection = iota - GD_PEER_TO_ME -) - -type GossipOp string - -const ( - LocalPush GossipOp = "Local Push" - MergeRemote GossipOp = "Merge Remote" - NotifyAlive GossipOp = "Notify Alive" - NotifyJoin GossipOp = "Notify Join" - NotifyLeave GossipOp = "Notify Leave" -) +type NodeUpdate struct { + // Addr is the contact address for the node + Addr string + // QuorumMember is true if node participates in quorum decisions + QuorumMember bool +} type NodeMetaInfo struct { ClusterId string @@ -76,6 +65,7 @@ type NodeInfo struct { WaitForGenUpdateTs time.Time Status NodeStatus Value StoreMap + QuorumMember bool } type NodeValue struct {