diff --git a/api.go b/api.go index 6565926..93252b5 100644 --- a/api.go +++ b/api.go @@ -72,9 +72,6 @@ type Gossiper interface { // GetNodes returns a list of the connection addresses GetNodes() []string - // GetGossipHistory returns the gossip records for last 20 sessions. - GetGossipHistory() []*types.GossipSessionInfo - // UpdateCluster updates gossip with latest peer nodes Id-Ip mapping UpdateCluster(map[types.NodeId]string) diff --git a/proto/gossip.go b/proto/gossip.go index c9afd2e..73fdd35 100644 --- a/proto/gossip.go +++ b/proto/gossip.go @@ -1,7 +1,6 @@ package proto import ( - "container/list" "errors" "fmt" "math/rand" @@ -17,75 +16,6 @@ import ( "github.com/libopenstorage/gossip/types" ) -type GossipHistory struct { - // front is the latest, back is the last - nodes *list.List - lock sync.Mutex - maxLen uint8 -} - -func NewGossipSessionInfo(node string, - dir types.GossipDirection) *types.GossipSessionInfo { - gs := new(types.GossipSessionInfo) - gs.Node = node - gs.Dir = dir - gs.Ts = time.Now() - gs.Err = "" - return gs -} - -func NewGossipHistory(maxLen uint8) *GossipHistory { - s := new(GossipHistory) - s.nodes = list.New() - s.nodes.Init() - s.maxLen = maxLen - return s -} - -func (s *GossipHistory) AddLatest(gs *types.GossipSessionInfo) { - s.lock.Lock() - defer s.lock.Unlock() - if uint8(s.nodes.Len()) == s.maxLen { - s.nodes.Remove(s.nodes.Back()) - } - s.nodes.PushFront(gs) -} - -func (s *GossipHistory) GetAllRecords() []*types.GossipSessionInfo { - s.lock.Lock() - defer s.lock.Unlock() - records := make([]*types.GossipSessionInfo, s.nodes.Len(), s.nodes.Len()) - i := 0 - for element := s.nodes.Front(); element != nil; element = element.Next() { - r, ok := element.Value.(*types.GossipSessionInfo) - if !ok || r == nil { - log.Error("gossip: Failed to convert element") - continue - } - records[i] = &types.GossipSessionInfo{Node: r.Node, - Ts: r.Ts, Dir: r.Dir, Err: r.Err} - i++ - } - return records -} - -func (s *GossipHistory) LogRecords() { - s.lock.Lock() - defer s.lock.Unlock() - status := make([]string, 2) - status[types.GD_ME_TO_PEER] = "ME_TO_PEER" - status[types.GD_PEER_TO_ME] = "PEER_TO_ME" - - for element := s.nodes.Front(); element != nil; element = element.Next() { - r, ok := element.Value.(*types.GossipSessionInfo) - if !ok || r == nil { - continue - } - log.Infof("Node: %v LastTs: %v Dir: %v Error: %v", - r.Node, r.Ts, status[r.Dir], r.Err) - } -} - type GossipNode struct { Id types.NodeId Ip string @@ -229,10 +159,6 @@ func (g *GossiperImpl) GossipInterval() time.Duration { return g.gossipInterval } -func (g *GossiperImpl) GetGossipHistory() []*types.GossipSessionInfo { - return g.history.GetAllRecords() -} - func (g *GossiperImpl) GetNodes() []string { nodes := g.mlist.Members() nodeList := make([]string, len(nodes)) diff --git a/proto/gossip_delegates.go b/proto/gossip_delegates.go index 0846f40..9870ffd 100644 --- a/proto/gossip_delegates.go +++ b/proto/gossip_delegates.go @@ -21,7 +21,6 @@ type GossipDelegate struct { // last gossip time lastGossipTsLock sync.Mutex lastGossipTs time.Time - history *GossipHistory // channel to receive state change events stateEvent chan types.StateEvent // current State object @@ -50,7 +49,6 @@ func (gd *GossipDelegate) InitGossipDelegate( clusterId, ) gd.quorumTimeout = quorumTimeout - gd.history = NewGossipHistory(20) } func (gd *GossipDelegate) InitCurrentState(clusterSize int) { @@ -143,22 +141,13 @@ func (gd *GossipDelegate) GetBroadcasts(overhead, limit int) [][]byte { func (gd *GossipDelegate) LocalState(join bool) []byte { gd.updateSelfTs() - // We don't know which node we are talking to. - gs := NewGossipSessionInfo("", types.GD_ME_TO_PEER) - gs.Op = types.LocalPush - // We send our local state of nodeMap // The receiver will decide which nodes to merge and which to ignore - byteLocalState, err := gd.GetLocalStateInBytes() if err != nil { - gs.Err = fmt.Sprintf("gossip: Error in LocalState. Unable to unmarshal: %v", err.Error()) - logrus.Infof(gs.Err) byteLocalState = []byte{} } - gs.Err = "" gd.updateGossipTs() - gd.history.AddLatest(gs) return byteLocalState } @@ -174,18 +163,14 @@ func (gd *GossipDelegate) MergeRemoteState(buf []byte, join bool) { } gd.updateSelfTs() - gs := NewGossipSessionInfo("", types.GD_PEER_TO_ME) err := gd.convertFromBytes(buf, &remoteState) if err != nil { - gs.Err = fmt.Sprintf("gossip: Error in unmarshalling peer's local data. Error : %v", err.Error()) - logrus.Infof(gs.Err) + logrus.Infof("gossip: Error in unmarshalling peer's local data. "+ + "Error : %v", err.Error()) } gd.Update(remoteState) - gs.Op = types.MergeRemote - gs.Err = "" gd.updateGossipTs() - gd.history.AddLatest(gs) return } @@ -198,20 +183,14 @@ func (gd *GossipDelegate) NotifyJoin(node *memberlist.Node) { return } - gs := NewGossipSessionInfo(nodeName, types.GD_PEER_TO_ME) - gs.Op = types.NotifyJoin gd.updateGossipTs() // NotifyAlive should remove a node from memberlist if the // gossip version mismatches. // Nevertheless we are doing an extra check here. - err := gd.gossipChecks(node) - if err != nil { - gs.Err = err.Error() + if err := gd.gossipChecks(node); err != nil { gd.RemoveNode(types.NodeId(nodeName)) } - - gd.history.AddLatest(gs) } // NotifyLeave is invoked when a node is detected to have left. @@ -229,11 +208,7 @@ func (gd *GossipDelegate) NotifyLeave(node *memberlist.Node) { gd.triggerStateEvent(types.NODE_LEAVE) } - gs := NewGossipSessionInfo(nodeName, types.GD_PEER_TO_ME) - gs.Err = "" - gs.Op = types.NotifyLeave gd.updateGossipTs() - gd.history.AddLatest(gs) return } @@ -266,15 +241,10 @@ func (gd *GossipDelegate) NotifyAlive(node *memberlist.Node) error { return nil } - gs := NewGossipSessionInfo(nodeName, types.GD_PEER_TO_ME) - gs.Op = types.NotifyAlive - gs.Err = "" gd.updateGossipTs() err := gd.gossipChecks(node) if err != nil { - gs.Err = err.Error() - gd.history.AddLatest(gs) gd.RemoveNode(types.NodeId(nodeName)) // Do not add this node to the memberlist. // Returning a non-nil err value @@ -287,7 +257,6 @@ func (gd *GossipDelegate) NotifyAlive(node *memberlist.Node) error { gd.triggerStateEvent(types.NODE_ALIVE) } // else if err != nil -> A new node sending us data. We do not add node unless it is added // in our local map externally - gd.history.AddLatest(gs) return nil } diff --git a/proto/gossip_test.go b/proto/gossip_test.go index 0257f85..6afb1e3 100644 --- a/proto/gossip_test.go +++ b/proto/gossip_test.go @@ -38,56 +38,6 @@ func NewGossiperImplWithClusterId(ip string, selfNodeId types.NodeId, knownIps [ return newGossiperImpl(ip, selfNodeId, knownIps, version, clusterId) } -func TestGossiperHistory(t *testing.T) { - var maxLen uint8 = 5 - h := NewGossipHistory(maxLen) - - for i := 0; i < 2*int(maxLen); i++ { - h.AddLatest(NewGossipSessionInfo(strconv.Itoa(i), - types.GD_ME_TO_PEER)) - if i < 5 { - records := h.GetAllRecords() - if len(records) != i+1 { - t.Error("Length of returned records don't match, r:", len(records), - " expected: ", h.nodes.Len()) - } - } - } - - if h.nodes.Len() != int(maxLen) { - t.Error("Len mismatch h: ", h.nodes.Len(), " expected: ", maxLen) - } - - records := h.GetAllRecords() - if len(records) != h.nodes.Len() { - t.Error("Length of returned records don't match, r:", len(records), - " expected: ", h.nodes.Len()) - } - - var p *types.GossipSessionInfo = nil - for _, c := range records { - if p != nil { - pId, ok3 := strconv.Atoi(p.Node) - cId, ok4 := strconv.Atoi(c.Node) - - if ok3 != nil || ok4 != nil { - t.Error("Failed to get elements: p: ", p, " c: ", c) - continue - } - - if pId < cId { - t.Error("Data maintained in wrong order ", p, " c: ", c) - } - - if p.Ts.Before(c.Ts) { - t.Error("Data maintained in wrong order ", p, " c: ", c) - } - } - p = c - } - -} - func TestGossiperStartStopGetNode(t *testing.T) { printTestInfo() diff --git a/types/types.go b/types/types.go index f52cd39..f57a8dc 100644 --- a/types/types.go +++ b/types/types.go @@ -61,14 +61,6 @@ const ( NotifyLeave GossipOp = "Notify Leave" ) -type GossipSessionInfo struct { - Node string - Ts time.Time - Dir GossipDirection - Err string - Op GossipOp -} - type NodeMetaInfo struct { ClusterId string GossipVersion string