From d5b7730d5985d6e0faf1a61cb3dd26188699e9c8 Mon Sep 17 00:00:00 2001 From: Guvenc Gulce Date: Mon, 6 May 2024 11:30:17 +0200 Subject: [PATCH 1/3] Introduce HA support Change the client behaviour in such a way that in case a second metalbond server distributing the exact some routes as the "main" server is added to the server list then the client is aware of the second server when adding removing the routes from its routing table hence withdraws already received routes only if both servers withdraw the routes in question. Single server operation is same as before. (Backward compatible) Signed-off-by: Guvenc Gulce --- cmd/cmd.go | 5 ++ metalbond.go | 26 ++++----- routetable.go | 145 ++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 160 insertions(+), 16 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 22b2901..348ae0f 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -86,6 +86,7 @@ func main() { log.Infof("Client") log.Infof(" servers: %v", CLI.Client.Server) var err error + var serverCount = 0 if CLI.Client.Verbose { log.SetLevel(log.DebugLevel) @@ -139,6 +140,10 @@ func main() { } for _, server := range CLI.Client.Server { + serverCount++ + if serverCount == 3 { + panic(fmt.Errorf("can not connect to more than 2 servers")) + } if err := m.AddPeer(server, ""); err != nil { panic(fmt.Errorf("failed to add server: %v", err)) } diff --git a/metalbond.go b/metalbond.go index 8342ec2..6e3c0c9 100644 --- a/metalbond.go +++ b/metalbond.go @@ -310,8 +310,20 @@ func (m *MetalBond) GetSubscribedVnis() []VNI { return vnis } +func (m *MetalBond) getHAPeerIfExists(target *metalBondPeer) *metalBondPeer { + m.mtxPeers.RLock() + defer m.mtxPeers.RUnlock() + + for _, peer := range m.peers { + if peer.localAddr != target.localAddr { + return peer + } + } + return nil +} + func (m *MetalBond) addReceivedRoute(fromPeer *metalBondPeer, vni VNI, dest Destination, hop NextHop) error { - err := m.routeTable.AddNextHop(vni, dest, hop, fromPeer) + err := m.routeTable.AddNextHopHAAware(vni, dest, hop, fromPeer) if err != nil { return fmt.Errorf("Cannot add route to route table: %v", err) } @@ -326,16 +338,11 @@ func (m *MetalBond) addReceivedRoute(fromPeer *metalBondPeer, vni VNI, dest Dest m.log().Errorf("Could not distribute route to peers: %v", err) } - err = m.client.AddRoute(vni, dest, hop) - if err != nil { - m.log().Errorf("Client.AddRoute call failed: %v", err) - } - return nil } func (m *MetalBond) removeReceivedRoute(fromPeer *metalBondPeer, vni VNI, dest Destination, hop NextHop) error { - err, remaining := m.routeTable.RemoveNextHop(vni, dest, hop, fromPeer) + err, remaining := m.routeTable.RemoveNextHopHAAware(vni, dest, hop, fromPeer) if err != nil { return fmt.Errorf("Cannot remove route from route table: %v", err) } @@ -352,11 +359,6 @@ func (m *MetalBond) removeReceivedRoute(fromPeer *metalBondPeer, vni VNI, dest D } } - err = m.client.RemoveRoute(vni, dest, hop) - if err != nil { - m.log().Errorf("Client.RemoveRoute call failed: %v", err) - } - return nil } diff --git a/routetable.go b/routetable.go index 5560f3f..bbc0adb 100644 --- a/routetable.go +++ b/routetable.go @@ -145,11 +145,66 @@ func (rt *routeTable) RemoveNextHop(vni VNI, dest Destination, nh NextHop, recei return nil, left } -func (rt *routeTable) AddNextHop(vni VNI, dest Destination, nh NextHop, receivedFrom *metalBondPeer) error { +func (rt *routeTable) RemoveNextHopHAAware(vni VNI, dest Destination, nh NextHop, receivedFrom *metalBondPeer) (error, int) { + var haPeer *metalBondPeer = nil rt.rwmtx.Lock() defer rt.rwmtx.Unlock() + if rt.routes == nil { + rt.routes = make(map[VNI]map[Destination]map[NextHop]map[*metalBondPeer]bool) + } + // TODO Performance: reused found map pointers + if _, exists := rt.routes[vni]; !exists { + return fmt.Errorf("VNI does not exist"), 0 + } + + if _, exists := rt.routes[vni][dest]; !exists { + return fmt.Errorf("Destination does not exist"), 0 + } + + if _, exists := rt.routes[vni][dest][nh]; !exists { + return fmt.Errorf("nexthop does not exist"), 0 + } + + if _, exists := rt.routes[vni][dest][nh][receivedFrom]; !exists { + return fmt.Errorf("ReceivedFrom does not exist"), 0 + } + + delete(rt.routes[vni][dest][nh], receivedFrom) + left := len(rt.routes[vni][dest][nh]) + + if len(rt.routes[vni][dest][nh]) == 0 { + delete(rt.routes[vni][dest], nh) + } + + if len(rt.routes[vni][dest]) == 0 { + delete(rt.routes[vni], dest) + } + + if len(rt.routes[vni]) == 0 { + delete(rt.routes, vni) + } + + if !receivedFrom.isServer { + haPeer = receivedFrom.metalbond.getHAPeerIfExists(receivedFrom) + } + + if rt.IsRouteOkToRemoveFromTheClient(vni, dest, nh, receivedFrom, haPeer) { + err := receivedFrom.metalbond.client.RemoveRoute(vni, dest, nh) + if err != nil { + receivedFrom.metalbond.log().Errorf("Client.RemoveRoute call failed: %v", err) + return err, 0 + } + } + + return nil, left +} + +func (rt *routeTable) AddNextHop(vni VNI, dest Destination, nh NextHop, receivedFrom *metalBondPeer) error { + rt.rwmtx.Lock() + defer rt.rwmtx.Unlock() + if _, exists := rt.routes[vni]; !exists { rt.routes[vni] = make(map[Destination]map[NextHop]map[*metalBondPeer]bool) } @@ -163,7 +218,7 @@ func (rt *routeTable) AddNextHop(vni VNI, dest Destination, nh NextHop, received } if _, exists := rt.routes[vni][dest][nh][receivedFrom]; exists { - return fmt.Errorf("Nexthop already exists") + return fmt.Errorf("nexthop already exists") } rt.routes[vni][dest][nh][receivedFrom] = true @@ -171,11 +226,11 @@ func (rt *routeTable) AddNextHop(vni VNI, dest Destination, nh NextHop, received return nil } -func (rt *routeTable) NextHopExists(vni VNI, dest Destination, nh NextHop, receivedFrom *metalBondPeer) bool { +func (rt *routeTable) AddNextHopHAAware(vni VNI, dest Destination, nh NextHop, receivedFrom *metalBondPeer) error { + var haPeer *metalBondPeer = nil rt.rwmtx.Lock() defer rt.rwmtx.Unlock() - // TODO Performance: reused found map pointers if _, exists := rt.routes[vni]; !exists { rt.routes[vni] = make(map[Destination]map[NextHop]map[*metalBondPeer]bool) } @@ -189,6 +244,88 @@ func (rt *routeTable) NextHopExists(vni VNI, dest Destination, nh NextHop, recei } if _, exists := rt.routes[vni][dest][nh][receivedFrom]; exists { + return fmt.Errorf("nexthop already exists") + } + + rt.routes[vni][dest][nh][receivedFrom] = true + + if !receivedFrom.isServer { + haPeer = receivedFrom.metalbond.getHAPeerIfExists(receivedFrom) + } + + if rt.IsRouteOkToAddToTheClient(vni, dest, nh, receivedFrom, haPeer) { + err := receivedFrom.metalbond.client.AddRoute(vni, dest, nh) + if err != nil { + receivedFrom.metalbond.log().Errorf("Client.AddRoute call failed: %v", err) + return err + } + } + + return nil +} + +func (rt *routeTable) NextHopExists(vni VNI, dest Destination, nh NextHop, receivedFrom *metalBondPeer) bool { + rt.rwmtx.RLock() + defer rt.rwmtx.RUnlock() + return rt.NextHopExistsUnlocked(vni, dest, nh, receivedFrom) +} + +func (rt *routeTable) NextHopExistsUnlocked(vni VNI, dest Destination, nh NextHop, receivedFrom *metalBondPeer) bool { + if _, exists := rt.routes[vni]; !exists { + return false + } + + if _, exists := rt.routes[vni][dest]; !exists { + return false + } + + if _, exists := rt.routes[vni][dest][nh]; !exists { + return false + } + + if _, exists := rt.routes[vni][dest][nh][receivedFrom]; exists { + return true + } + + return false +} + +// Call this function only with r/w lock of the table +func (rt *routeTable) IsRouteOkToAddToTheClient(vni VNI, dest Destination, nh NextHop, receivedFrom *metalBondPeer, haPeer *metalBondPeer) bool { + count := 0 + + if rt.NextHopExistsUnlocked(vni, dest, nh, receivedFrom) { + count++ + } + + if haPeer != nil { + if rt.NextHopExistsUnlocked(vni, dest, nh, haPeer) { + count++ + } + } + + if count == 1 { + return true + } + + return false +} + +// Call this function only with r/w lock of the table +func (rt *routeTable) IsRouteOkToRemoveFromTheClient(vni VNI, dest Destination, nh NextHop, receivedFrom *metalBondPeer, haPeer *metalBondPeer) bool { + count := 0 + + if rt.NextHopExistsUnlocked(vni, dest, nh, receivedFrom) { + count++ + } + + if haPeer != nil { + if rt.NextHopExistsUnlocked(vni, dest, nh, haPeer) { + count++ + } + } + + if count == 0 { return true } From df66edb99cb1d69b37077b4511b7e2b0ba0a89e4 Mon Sep 17 00:00:00 2001 From: Viliam Lorinc Date: Thu, 27 Jun 2024 12:14:02 +0200 Subject: [PATCH 2/3] Add possibility to run specific tests with label --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 3e02411..92a1486 100644 --- a/Makefile +++ b/Makefile @@ -108,7 +108,7 @@ deb: .PHONY: unit-test unit-test: - go test -v + go test -v -ginkgo.label-filter=$(labels) .PHONY: fmt fmt: goimports ## Run goimports against code. From f81c6888d70aaf4e1b3fbeb0dfb88b92e03ae589 Mon Sep 17 00:00:00 2001 From: Viliam Lorinc Date: Thu, 27 Jun 2024 12:14:52 +0200 Subject: [PATCH 3/3] Add test cases for HA mode with 2 servers --- peer_test.go | 339 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 316 insertions(+), 23 deletions(-) diff --git a/peer_test.go b/peer_test.go index 1165c29..73e0b4c 100644 --- a/peer_test.go +++ b/peer_test.go @@ -19,9 +19,9 @@ import ( var _ = Describe("Peer", func() { var ( - mbServer *MetalBond - serverAddress string - client *DummyClient + mbServer, mbServer2 *MetalBond + serverAddress, serverAddress2 string + client *DummyClient ) BeforeEach(func() { @@ -29,18 +29,24 @@ var _ = Describe("Peer", func() { config := Config{} client = NewDummyClient() mbServer = NewMetalBond(config, client) - serverAddress = fmt.Sprintf("127.0.0.1:%d", getRandomTCPPort()) + mbServer2 = NewMetalBond(config, client) + randomPort := getRandomTCPPort() + serverAddress = fmt.Sprintf("127.0.0.1:%d", randomPort) + serverAddress2 = fmt.Sprintf("127.0.0.2:%d", randomPort) err := mbServer.StartServer(serverAddress) Expect(err).ToNot(HaveOccurred()) + err = mbServer2.StartServer(serverAddress2) + Expect(err).ToNot(HaveOccurred()) }) AfterEach(func() { mbServer.Shutdown() + mbServer2.Shutdown() }) - It("should subscribe", func() { + It("should subscribe", Label("subscribe"), func() { mbClient := NewMetalBond(Config{}, client) - localIP := net.ParseIP("127.0.0.2") + localIP := net.ParseIP("127.0.0.3") err := mbClient.AddPeer(serverAddress, localIP.String()) Expect(err).NotTo(HaveOccurred()) @@ -68,9 +74,9 @@ var _ = Describe("Peer", func() { mbClient.Shutdown() }) - It("should reset", func() { + It("should reset", Label("reset"), func() { mbClient := NewMetalBond(Config{}, client) - err := mbClient.AddPeer(serverAddress, "127.0.0.2") + err := mbClient.AddPeer(serverAddress, "127.0.0.3") Expect(err).NotTo(HaveOccurred()) clientAddr := getLocalAddr(mbClient, "") @@ -99,9 +105,9 @@ var _ = Describe("Peer", func() { Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) }) - It("should reconnect", func() { + It("should reconnect", Label("reconnect"), func() { mbClient := NewMetalBond(Config{}, client) - err := mbClient.AddPeer(serverAddress, "127.0.0.2") + err := mbClient.AddPeer(serverAddress, "127.0.0.3") Expect(err).NotTo(HaveOccurred()) clientAddr := getLocalAddr(mbClient, "") @@ -128,9 +134,9 @@ var _ = Describe("Peer", func() { Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) }) - It("client timeout", func() { + It("client timeout", Label("timeout"), func() { mbClient := NewMetalBond(Config{}, client) - err := mbClient.AddPeer(serverAddress, "127.0.0.2") + err := mbClient.AddPeer(serverAddress, "127.0.0.3") Expect(err).NotTo(HaveOccurred()) clientAddr := getLocalAddr(mbClient, "") @@ -163,7 +169,7 @@ var _ = Describe("Peer", func() { Expect(err).NotTo(HaveOccurred()) }) - It("should announce", func() { + It("should announce", Label("announce"), func() { totalClients := 600 // TODO: was 1000 (local test works for this large value), but it is reduced to this value to make CI/CD happy var wg sync.WaitGroup @@ -173,7 +179,7 @@ var _ = Describe("Peer", func() { go func(index int) { defer wg.Done() mbClient := NewMetalBond(Config{}, client) - localIP := net.ParseIP("127.0.0.2") + localIP := net.ParseIP("127.0.0.3") localIP = incrementIPv4(localIP, index) err := mbClient.AddPeer(serverAddress, localIP.String()) Expect(err).NotTo(HaveOccurred()) @@ -185,9 +191,7 @@ var _ = Describe("Peer", func() { Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) - mbServer.mtxPeers.RLock() - p := mbServer.peers[clientAddr] - mbServer.mtxPeers.RUnlock() + p := mbServer.getPeer(clientAddr) Expect(waitForPeerState(mbClient, serverAddress, ESTABLISHED)).NotTo(BeFalse()) vni := VNI(index % 10) @@ -245,9 +249,7 @@ var _ = Describe("Peer", func() { // check if the peer is established again Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) - mbServer.mtxPeers.RLock() - p = mbServer.peers[clientAddr] - mbServer.mtxPeers.RUnlock() + p = mbServer.getPeer(clientAddr) // wait for the route to be received time.Sleep(3 * time.Second) @@ -273,9 +275,290 @@ var _ = Describe("Peer", func() { Expect(exists).To(BeTrue()) }(i) } + wg.Wait() + }) + + It("should announce with 2 servers and recover from removing 1 peer", Label("announceHAremove"), func() { + totalClients := 600 // TODO: was 1000 (local test works for this large value), but it is reduced to this value to make CI/CD happy + var wg sync.WaitGroup + + for i := 1; i < totalClients+1; i++ { + wg.Add(1) + + go func(index int) { + defer GinkgoRecover() + defer wg.Done() + mbClient := NewMetalBond(Config{}, client) + localIP := net.ParseIP("127.0.0.3") + localIP = incrementIPv4(localIP, index) + err := mbClient.AddPeer(serverAddress, localIP.String()) + Expect(err).NotTo(HaveOccurred()) + err = mbClient.AddPeer(serverAddress2, localIP.String()) + Expect(err).NotTo(HaveOccurred()) + + // mbClient should have both servers as peers + Expect(len(mbClient.peers)).To(Equal(2)) + + // wait for the peer loop to start + time.Sleep(1 * time.Second) + clientAddr := getLocalAddr(mbClient, "") + Expect(clientAddr).NotTo(Equal("")) + clientAddrS2 := getLocalAddr(mbClient, clientAddr) + Expect(clientAddr).NotTo(Equal("")) + + // wait for all peerings to be ESTABLISHED + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) + Expect(waitForPeerState(mbServer2, clientAddrS2, ESTABLISHED)).NotTo(BeFalse()) + Expect(waitForPeerState(mbClient, serverAddress, ESTABLISHED)).NotTo(BeFalse()) + + vni := VNI(index % 10) + err = mbClient.Subscribe(vni) + Expect(err).NotTo(HaveOccurred()) + + // prepare the route + startIP := net.ParseIP("100.64.0.0") + ip := incrementIPv4(startIP, index) + addr, err := netip.ParseAddr(ip.String()) + Expect(err).NotTo(HaveOccurred()) + underlayRoute, err := netip.ParseAddr(fmt.Sprintf("b198:5b10:3880:fd32:fb80:80dd:46f7:%d", index)) + Expect(err).NotTo(HaveOccurred()) + dest := Destination{ + Prefix: netip.PrefixFrom(addr, 32), + IPVersion: IPV4, + } + nextHop := NextHop{ + TargetVNI: uint32(vni), + TargetAddress: underlayRoute, + } + + // announce the route + err = mbClient.AnnounceRoute(vni, dest, nextHop) + Expect(err).NotTo(HaveOccurred()) + + // wait for the route to be received + time.Sleep(3 * time.Second) + + // check if the route was received by Server1 + p := mbServer.getPeer(clientAddr) + Expect(p).ToNot(BeNil()) + _, exists := p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + + // check if the route was received by Server2 + p = mbServer2.getPeer(clientAddrS2) + Expect(p).ToNot(BeNil()) + _, exists = p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + By("Removing the Servers from peers") + // remove Server1 from peers + err = mbClient.RemovePeer(serverAddress) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(2 * time.Second) + + p = mbServer.getPeer(clientAddr) + Expect(p).To(BeNil()) + p = mbServer2.getPeer(clientAddrS2) + Expect(p.GetState()).To(Equal(ESTABLISHED)) + + // check if the route was received by Server2 + _, exists = p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(0)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + // restore peering with Server1 + err = mbClient.AddPeer(serverAddress, localIP.String()) + Expect(err).NotTo(HaveOccurred()) + clientAddr = getLocalAddr(mbClient, clientAddrS2) + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) + + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + // check if the route was received by Server1 + p = mbServer.getPeer(clientAddr) + Expect(p.GetState()).To(Equal(ESTABLISHED)) + _, exists = p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + + // remove Server2 from peers + err = mbClient.RemovePeer(serverAddress2) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(2 * time.Second) + + p = mbServer2.getPeer(clientAddrS2) + Expect(p).To(BeNil()) + + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(0)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + // restore peer with Server2 + err = mbClient.AddPeer(serverAddress2, localIP.String()) + Expect(err).NotTo(HaveOccurred()) + clientAddrS2 = getLocalAddr(mbClient, clientAddr) + Expect(waitForPeerState(mbServer2, clientAddrS2, ESTABLISHED)).NotTo(BeFalse()) + + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + // check if the route was received by Server2 + p = mbServer2.getPeer(clientAddrS2) + Expect(p.GetState()).To(Equal(ESTABLISHED)) + _, exists = p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + }(i) + } wg.Wait() }) + + It("should announce with 2 servers and recover from shutdown of 1 server", Label("announceHAshutdown"), func() { + mbClient := NewMetalBond(Config{}, client) + localIP := net.ParseIP("127.0.0.3") + err := mbClient.AddPeer(serverAddress, localIP.String()) + Expect(err).NotTo(HaveOccurred()) + err = mbClient.AddPeer(serverAddress2, localIP.String()) + Expect(err).NotTo(HaveOccurred()) + + // mbClient should have both servers as peers + Expect(len(mbClient.peers)).To(Equal(2)) + + // wait for the peer loop to start + time.Sleep(1 * time.Second) + clientAddr := getLocalAddr(mbClient, "") + Expect(clientAddr).NotTo(Equal("")) + clientAddrS2 := getLocalAddr(mbClient, clientAddr) + Expect(clientAddr).NotTo(Equal("")) + + // wait for all peerings to be ESTABLISHED + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) + Expect(waitForPeerState(mbServer2, clientAddrS2, ESTABLISHED)).NotTo(BeFalse()) + Expect(waitForPeerState(mbClient, serverAddress, ESTABLISHED)).NotTo(BeFalse()) + + vni := VNI(10) + err = mbClient.Subscribe(vni) + Expect(err).NotTo(HaveOccurred()) + + // prepare the route + startIP := net.ParseIP("100.64.0.0") + ip := incrementIPv4(startIP, 1) + addr, err := netip.ParseAddr(ip.String()) + Expect(err).NotTo(HaveOccurred()) + underlayRoute, err := netip.ParseAddr(fmt.Sprintf("b198:5b10:3880:fd32:fb80:80dd:46f7:%d", 1)) + Expect(err).NotTo(HaveOccurred()) + dest := Destination{ + Prefix: netip.PrefixFrom(addr, 32), + IPVersion: IPV4, + } + nextHop := NextHop{ + TargetVNI: uint32(vni), + TargetAddress: underlayRoute, + } + + // announce the route + err = mbClient.AnnounceRoute(vni, dest, nextHop) + Expect(err).NotTo(HaveOccurred()) + + // wait for the route to be received + time.Sleep(3 * time.Second) + + // check if the route was received by Server1 + p := mbServer.getPeer(clientAddr) + Expect(p).ToNot(BeNil()) + _, exists := p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + + // check if the route was received by Server2 + p = mbServer2.getPeer(clientAddrS2) + Expect(p).ToNot(BeNil()) + _, exists = p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + By("Shuting down the Servers") + // shut down Server1 + mbServer.Shutdown() + // wait for keepalive to run out + time.Sleep(time.Duration(mbClient.keepaliveInterval)*time.Second + 1*time.Second) + + p = mbServer.getPeer(clientAddr) + Expect(p).To(BeNil()) + p = mbServer2.getPeer(clientAddrS2) + Expect(p.GetState()).To(Equal(ESTABLISHED)) + + // check if the route was received by Server2 + _, exists = p.receivedRoutes.routes[vni][dest][nextHop][p] + + Expect(exists).To(BeTrue()) + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(0)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + // start Server1 again + // to reuse same mbServer struct, reset shuttingDown to false + mbServer.shuttingDown = false + err = mbServer.StartServer(serverAddress) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(2 * time.Second) + + clientAddr = getLocalAddr(mbClient, clientAddrS2) + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) + + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + // check if the route was received by Server1 + p = mbServer.getPeer(clientAddr) + Expect(p.GetState()).To(Equal(ESTABLISHED)) + _, exists = p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + + // shut down Server2 + mbServer2.Shutdown() + // wait for keepalive to run out + time.Sleep(time.Duration(mbClient.keepaliveInterval)*time.Second + 1*time.Second) + + p = mbServer2.getPeer(clientAddrS2) + Expect(p).To(BeNil()) + + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(0)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + // start Server2 again + // to reuse same mbServer2 struct, reset shuttingDown to false + mbServer2.shuttingDown = false + err = mbServer2.StartServer(serverAddress2) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(2 * time.Second) + + clientAddrS2 = getLocalAddr(mbClient, clientAddr) + Expect(waitForPeerState(mbServer2, clientAddrS2, ESTABLISHED)).NotTo(BeFalse()) + + Expect(len(mbServer.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbServer2.routeTable.routes[vni][dest][nextHop])).To(Equal(1)) + Expect(len(mbClient.routeTable.routes)).To(Equal(1)) + + // check if the route was received by Server2 + p = mbServer2.getPeer(clientAddrS2) + Expect(p.GetState()).To(Equal(ESTABLISHED)) + _, exists = p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + + }) }) func waitForPeerState(mbServer *MetalBond, clientAddr string, expectedState ConnectionState) bool { @@ -284,9 +567,7 @@ func waitForPeerState(mbServer *MetalBond, clientAddr string, expectedState Conn timeout := 30 * time.Second start := time.Now() for { - mbServer.mtxPeers.RLock() - peer := mbServer.peers[clientAddr] - mbServer.mtxPeers.RUnlock() + peer := mbServer.getPeer(clientAddr) if peer != nil && peer.GetState() == expectedState { return true @@ -340,3 +621,15 @@ func incrementIPv4(ip net.IP, count int) net.IP { } return ip } + +func (m *MetalBond) getPeer(peerAddress string) *metalBondPeer { + m.mtxPeers.RLock() + peer := m.peers[peerAddress] + m.mtxPeers.RUnlock() + + if peer != nil { + return peer + } else { + return nil + } +}