Skip to content

Commit

Permalink
Guard the encode of NodeInfoMap with store lock.
Browse files Browse the repository at this point in the history
  • Loading branch information
adityadani committed Feb 24, 2017
1 parent a32dce2 commit 964dd5f
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 45 deletions.
6 changes: 3 additions & 3 deletions proto/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ type GossiperImpl struct {
mlist *ml.Memberlist

// node list, maintained separately
nodes GossipNodeList
name string
nodesLock sync.Mutex
nodes GossipNodeList
name string
nodesLock sync.Mutex
gossipInterval time.Duration
//nodeDeathInterval time.Duration
shutDown bool
Expand Down
33 changes: 5 additions & 28 deletions proto/gossip_delegates.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package proto

import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"sync"
"strings"
"sync"
"time"

"github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -69,26 +67,6 @@ func (gd *GossipDelegate) updateGossipTs() {
gd.lastGossipTs = time.Now()
}

func (gd *GossipDelegate) convertToBytes(obj interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(obj)
if err != nil {
return []byte{}, err
}
return buf.Bytes(), nil
}

func (gd *GossipDelegate) convertFromBytes(buf []byte, msg interface{}) error {
msgBuffer := bytes.NewBuffer(buf)
dec := gob.NewDecoder(msgBuffer)
err := dec.Decode(msg)
if err != nil {
return err
}
return nil
}

func (gd *GossipDelegate) gossipChecks(node *memberlist.Node) error {
// Check the gossip version of other node
var nodeMeta types.NodeMetaInfo
Expand Down Expand Up @@ -122,7 +100,6 @@ func (gd *GossipDelegate) gossipChecks(node *memberlist.Node) error {
return err
}


// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
Expand Down Expand Up @@ -172,8 +149,8 @@ func (gd *GossipDelegate) LocalState(join bool) []byte {

// We send our local state of nodeMap
// The receiver will decide which nodes to merge and which to ignore
localState := gd.GetLocalState()
byteLocalState, err := gd.convertToBytes(&localState)

byteLocalState, err := gd.GetLocalStateInBytes()
if err != nil {
gs.Err = fmt.Sprintf("gossip: Error in LocalState. Unable to unmarshal: %v", err.Error())
logrus.Infof(gs.Err)
Expand Down Expand Up @@ -308,8 +285,8 @@ func (gd *GossipDelegate) NotifyAlive(node *memberlist.Node) error {
if err == nil && diffNode.Status != types.NODE_STATUS_UP {
gd.UpdateNodeStatus(types.NodeId(nodeName), types.NODE_STATUS_UP)
gd.triggerStateEvent(types.NODE_ALIVE)
} // else if err != nil -> A new node sending us data. We do not add node unless it is added
// in our local map externally
} // else if err != nil -> A new node sending us data. We do not add node unless it is added
// in our local map externally
gd.history.AddLatest(gs)
return nil
}
Expand Down
56 changes: 44 additions & 12 deletions proto/gossip_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"sync"
"time"
"bytes"
"encoding/gob"

"github.com/Sirupsen/logrus"
"github.com/libopenstorage/gossip/types"
Expand Down Expand Up @@ -154,14 +156,14 @@ func (s *GossipStoreImpl) GetStoreKeys() []types.StoreKey {
keyMap := make(map[types.StoreKey]bool)
for _, nodeInfo := range s.nodeMap {
if nodeInfo.Value != nil {
for key, _ := range nodeInfo.Value {
for key := range nodeInfo.Value {
keyMap[key] = true
}
}
}
storeKeys := make([]types.StoreKey, len(keyMap))
i := 0
for key, _ := range keyMap {
for key := range keyMap {
storeKeys[i] = key
i++
}
Expand Down Expand Up @@ -225,19 +227,21 @@ func (s *GossipStoreImpl) MetaInfo() types.NodeMetaInfo {
Id: selfNodeInfo.Id,
LastUpdateTs: selfNodeInfo.LastUpdateTs,
GossipVersion: s.GossipVersion,
ClusterId: s.ClusterId,
ClusterId: s.ClusterId,
}
return nodeMetaInfo
}

func (s *GossipStoreImpl) GetLocalState() types.NodeInfoMap {
s.Lock()
defer s.Unlock()
localCopy := make(types.NodeInfoMap)
for key, value := range s.nodeMap {
localCopy[key] = value
}
return localCopy
return s.getLocalState()
}

func (s *GossipStoreImpl) GetLocalStateInBytes() ([]byte, error) {
s.Lock()
defer s.Unlock()
return s.convertToBytes(s.getLocalState())
}

func (s *GossipStoreImpl) GetLocalNodeInfo(id types.NodeId) (types.NodeInfo, error) {
Expand Down Expand Up @@ -284,27 +288,27 @@ func (s *GossipStoreImpl) updateCluster(peers map[types.NodeId]string) {
// Lets check if a node was added or removed.
if len(s.nodeMap) > len(peers) {
// Node removed
for id, _ := range s.nodeMap {
for id := range s.nodeMap {
if _, ok := peers[id]; !ok {
removeNodeIds = append(removeNodeIds, id)
}
}
} else if len(s.nodeMap) < len(peers) {
// Node added
for id, _ := range peers {
for id := range peers {
if _, ok := s.nodeMap[id]; !ok {
addNodeIds = append(addNodeIds, id)
}
}
} else {
// Nodes removed
for id, _ := range s.nodeMap {
for id := range s.nodeMap {
if _, ok := peers[id]; !ok {
removeNodeIds = append(removeNodeIds, id)
}
}
// Nodes added
for id, _ := range peers {
for id := range peers {
if _, ok := s.nodeMap[id]; !ok {
addNodeIds = append(addNodeIds, id)
}
Expand All @@ -322,3 +326,31 @@ func (s *GossipStoreImpl) updateCluster(peers map[types.NodeId]string) {
func (s *GossipStoreImpl) getClusterSize() int {
return s.clusterSize
}

func (s *GossipStoreImpl) convertToBytes(obj interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(obj)
if err != nil {
return []byte{}, err
}
return buf.Bytes(), nil
}

func (s *GossipStoreImpl) convertFromBytes(buf []byte, msg interface{}) error {
msgBuffer := bytes.NewBuffer(buf)
dec := gob.NewDecoder(msgBuffer)
err := dec.Decode(msg)
if err != nil {
return err
}
return nil
}

func (s *GossipStoreImpl) getLocalState() types.NodeInfoMap {
localCopy := make(types.NodeInfoMap)
for key, value := range s.nodeMap {
localCopy[key] = value
}
return localCopy
}
4 changes: 2 additions & 2 deletions proto/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestGossiperStartStopGetNode(t *testing.T) {
time.Sleep(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodesIp)))
var peerNodes []string
// check the nodelist is same
for i, _ := range nodesIp {
for i := range nodesIp {
peerNodes = gossipers[i].GetNodes()
if len(peerNodes) != len(nodesIp) {
t.Error("Peer nodes len does not match added nodes, got: ",
Expand All @@ -145,7 +145,7 @@ outer:
}

// test stop gossiper
for i, _ := range nodesIp {
for i := range nodesIp {
// It takes time to propagate the leave message
err := gossipers[i].Stop(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodesIp)+1))
if err != nil {
Expand Down

0 comments on commit 964dd5f

Please sign in to comment.