From 964dd5f0f52d07d3870d12fbd93ceabce8cfcbdf Mon Sep 17 00:00:00 2001 From: Aditya Dani Date: Fri, 24 Feb 2017 05:38:12 +0000 Subject: [PATCH] Guard the encode of NodeInfoMap with store lock. --- proto/gossip.go | 6 ++--- proto/gossip_delegates.go | 33 ++++------------------- proto/gossip_store.go | 56 ++++++++++++++++++++++++++++++--------- proto/gossip_test.go | 4 +-- 4 files changed, 54 insertions(+), 45 deletions(-) diff --git a/proto/gossip.go b/proto/gossip.go index 347d2a4..c9afd2e 100644 --- a/proto/gossip.go +++ b/proto/gossip.go @@ -115,9 +115,9 @@ type GossiperImpl struct { mlist *ml.Memberlist // node list, maintained separately - nodes GossipNodeList - name string - nodesLock sync.Mutex + nodes GossipNodeList + name string + nodesLock sync.Mutex gossipInterval time.Duration //nodeDeathInterval time.Duration shutDown bool diff --git a/proto/gossip_delegates.go b/proto/gossip_delegates.go index 3326932..0846f40 100644 --- a/proto/gossip_delegates.go +++ b/proto/gossip_delegates.go @@ -1,12 +1,10 @@ package proto import ( - "bytes" - "encoding/gob" "encoding/json" "fmt" - "sync" "strings" + "sync" "time" "github.com/Sirupsen/logrus" @@ -69,26 +67,6 @@ func (gd *GossipDelegate) updateGossipTs() { gd.lastGossipTs = time.Now() } -func (gd *GossipDelegate) convertToBytes(obj interface{}) ([]byte, error) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - err := enc.Encode(obj) - if err != nil { - return []byte{}, err - } - return buf.Bytes(), nil -} - -func (gd *GossipDelegate) convertFromBytes(buf []byte, msg interface{}) error { - msgBuffer := bytes.NewBuffer(buf) - dec := gob.NewDecoder(msgBuffer) - err := dec.Decode(msg) - if err != nil { - return err - } - return nil -} - func (gd *GossipDelegate) gossipChecks(node *memberlist.Node) error { // Check the gossip version of other node var nodeMeta types.NodeMetaInfo @@ -122,7 +100,6 @@ func (gd *GossipDelegate) gossipChecks(node *memberlist.Node) error { return err } - // NodeMeta is used to retrieve meta-data about the current node // when broadcasting an alive message. It's length is limited to // the given byte size. This metadata is available in the Node structure. @@ -172,8 +149,8 @@ func (gd *GossipDelegate) LocalState(join bool) []byte { // We send our local state of nodeMap // The receiver will decide which nodes to merge and which to ignore - localState := gd.GetLocalState() - byteLocalState, err := gd.convertToBytes(&localState) + + byteLocalState, err := gd.GetLocalStateInBytes() if err != nil { gs.Err = fmt.Sprintf("gossip: Error in LocalState. Unable to unmarshal: %v", err.Error()) logrus.Infof(gs.Err) @@ -308,8 +285,8 @@ func (gd *GossipDelegate) NotifyAlive(node *memberlist.Node) error { if err == nil && diffNode.Status != types.NODE_STATUS_UP { gd.UpdateNodeStatus(types.NodeId(nodeName), types.NODE_STATUS_UP) gd.triggerStateEvent(types.NODE_ALIVE) - } // else if err != nil -> A new node sending us data. We do not add node unless it is added - // in our local map externally + } // else if err != nil -> A new node sending us data. We do not add node unless it is added + // in our local map externally gd.history.AddLatest(gs) return nil } diff --git a/proto/gossip_store.go b/proto/gossip_store.go index a5f3cd3..74848df 100644 --- a/proto/gossip_store.go +++ b/proto/gossip_store.go @@ -4,6 +4,8 @@ import ( "fmt" "sync" "time" + "bytes" + "encoding/gob" "github.com/Sirupsen/logrus" "github.com/libopenstorage/gossip/types" @@ -154,14 +156,14 @@ func (s *GossipStoreImpl) GetStoreKeys() []types.StoreKey { keyMap := make(map[types.StoreKey]bool) for _, nodeInfo := range s.nodeMap { if nodeInfo.Value != nil { - for key, _ := range nodeInfo.Value { + for key := range nodeInfo.Value { keyMap[key] = true } } } storeKeys := make([]types.StoreKey, len(keyMap)) i := 0 - for key, _ := range keyMap { + for key := range keyMap { storeKeys[i] = key i++ } @@ -225,7 +227,7 @@ func (s *GossipStoreImpl) MetaInfo() types.NodeMetaInfo { Id: selfNodeInfo.Id, LastUpdateTs: selfNodeInfo.LastUpdateTs, GossipVersion: s.GossipVersion, - ClusterId: s.ClusterId, + ClusterId: s.ClusterId, } return nodeMetaInfo } @@ -233,11 +235,13 @@ func (s *GossipStoreImpl) MetaInfo() types.NodeMetaInfo { func (s *GossipStoreImpl) GetLocalState() types.NodeInfoMap { s.Lock() defer s.Unlock() - localCopy := make(types.NodeInfoMap) - for key, value := range s.nodeMap { - localCopy[key] = value - } - return localCopy + return s.getLocalState() +} + +func (s *GossipStoreImpl) GetLocalStateInBytes() ([]byte, error) { + s.Lock() + defer s.Unlock() + return s.convertToBytes(s.getLocalState()) } func (s *GossipStoreImpl) GetLocalNodeInfo(id types.NodeId) (types.NodeInfo, error) { @@ -284,27 +288,27 @@ func (s *GossipStoreImpl) updateCluster(peers map[types.NodeId]string) { // Lets check if a node was added or removed. if len(s.nodeMap) > len(peers) { // Node removed - for id, _ := range s.nodeMap { + for id := range s.nodeMap { if _, ok := peers[id]; !ok { removeNodeIds = append(removeNodeIds, id) } } } else if len(s.nodeMap) < len(peers) { // Node added - for id, _ := range peers { + for id := range peers { if _, ok := s.nodeMap[id]; !ok { addNodeIds = append(addNodeIds, id) } } } else { // Nodes removed - for id, _ := range s.nodeMap { + for id := range s.nodeMap { if _, ok := peers[id]; !ok { removeNodeIds = append(removeNodeIds, id) } } // Nodes added - for id, _ := range peers { + for id := range peers { if _, ok := s.nodeMap[id]; !ok { addNodeIds = append(addNodeIds, id) } @@ -322,3 +326,31 @@ func (s *GossipStoreImpl) updateCluster(peers map[types.NodeId]string) { func (s *GossipStoreImpl) getClusterSize() int { return s.clusterSize } + +func (s *GossipStoreImpl) convertToBytes(obj interface{}) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(obj) + if err != nil { + return []byte{}, err + } + return buf.Bytes(), nil +} + +func (s *GossipStoreImpl) convertFromBytes(buf []byte, msg interface{}) error { + msgBuffer := bytes.NewBuffer(buf) + dec := gob.NewDecoder(msgBuffer) + err := dec.Decode(msg) + if err != nil { + return err + } + return nil +} + +func (s *GossipStoreImpl) getLocalState() types.NodeInfoMap { + localCopy := make(types.NodeInfoMap) + for key, value := range s.nodeMap { + localCopy[key] = value + } + return localCopy +} diff --git a/proto/gossip_test.go b/proto/gossip_test.go index b55843b..0257f85 100644 --- a/proto/gossip_test.go +++ b/proto/gossip_test.go @@ -125,7 +125,7 @@ func TestGossiperStartStopGetNode(t *testing.T) { time.Sleep(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodesIp))) var peerNodes []string // check the nodelist is same - for i, _ := range nodesIp { + for i := range nodesIp { peerNodes = gossipers[i].GetNodes() if len(peerNodes) != len(nodesIp) { t.Error("Peer nodes len does not match added nodes, got: ", @@ -145,7 +145,7 @@ outer: } // test stop gossiper - for i, _ := range nodesIp { + for i := range nodesIp { // It takes time to propagate the leave message err := gossipers[i].Stop(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodesIp)+1)) if err != nil {