Skip to content

Commit

Permalink
Backport reestablish relays from cert-v2 to release-1.9
Browse files Browse the repository at this point in the history
  • Loading branch information
brad-defined committed Nov 19, 2024
1 parent ab81b62 commit 802a4f4
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 126 deletions.
134 changes: 134 additions & 0 deletions e2e/handshakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package e2e
import (
"fmt"
"net/netip"
"slices"
"testing"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula"
"github.com/slackhq/nebula/e2e/router"
Expand Down Expand Up @@ -369,6 +372,137 @@ func TestRelays(t *testing.T) {
//TODO: assert we actually used the relay even though it should be impossible for a tunnel to have occurred without it
}

func TestReestablishRelays(t *testing.T) {
ca, _, caKey, _ := NewTestCaCert(time.Now(), time.Now().Add(10*time.Minute), nil, nil, []string{})
myControl, myVpnIpNet, _, _ := newSimpleServer(ca, caKey, "me ", "10.128.0.1/24", m{"relay": m{"use_relays": true}})
relayControl, relayVpnIpNet, relayUdpAddr, _ := newSimpleServer(ca, caKey, "relay ", "10.128.0.128/24", m{"relay": m{"am_relay": true}})
theirControl, theirVpnIpNet, theirUdpAddr, _ := newSimpleServer(ca, caKey, "them ", "10.128.0.2/24", m{"relay": m{"use_relays": true}})

// Teach my how to get to the relay and that their can be reached via the relay
myControl.InjectLightHouseAddr(relayVpnIpNet.Addr(), relayUdpAddr)
myControl.InjectRelays(theirVpnIpNet.Addr(), []netip.Addr{relayVpnIpNet.Addr()})
relayControl.InjectLightHouseAddr(theirVpnIpNet.Addr(), theirUdpAddr)

// Build a router so we don't have to reason who gets which packet
r := router.NewR(t, myControl, relayControl, theirControl)
defer r.RenderFlow()

// Start the servers
myControl.Start()
relayControl.Start()
theirControl.Start()

t.Log("Trigger a handshake from me to them via the relay")
myControl.InjectTunUDPPacket(theirVpnIpNet.Addr(), 80, 80, []byte("Hi from me"))

p := r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works")
assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet.Addr(), theirVpnIpNet.Addr(), 80, 80)

t.Log("Ensure packet traversal from them to me via the relay")
theirControl.InjectTunUDPPacket(myVpnIpNet.Addr(), 80, 80, []byte("Hi from them"))

p = r.RouteForAllUntilTxTun(myControl)
r.Log("Assert the tunnel works")
assertUdpPacket(t, []byte("Hi from them"), p, theirVpnIpNet.Addr(), myVpnIpNet.Addr(), 80, 80)

// If we break the relay's connection to 'them', 'me' needs to detect and recover the connection
r.Log("Close the tunnel")
relayControl.CloseTunnel(theirVpnIpNet.Addr(), true)

start := len(myControl.GetHostmap().Indexes)
curIndexes := len(myControl.GetHostmap().Indexes)
for curIndexes >= start {
curIndexes = len(myControl.GetHostmap().Indexes)
r.Logf("Wait for the dead index to go away:start=%v indexes, currnet=%v indexes", start, curIndexes)
myControl.InjectTunUDPPacket(theirVpnIpNet.Addr(), 80, 80, []byte("Hi from me should fail"))

r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType {
return router.RouteAndExit
})
time.Sleep(2 * time.Second)
}
r.Log("Dead index went away. Woot!")
r.RenderHostmaps("Me removed hostinfo", myControl, relayControl, theirControl)
// Next packet should re-establish a relayed connection and work just great.

t.Logf("Assert the tunnel...")
for {
t.Log("RouteForAllUntilTxTun")
myControl.InjectLightHouseAddr(relayVpnIpNet.Addr(), relayUdpAddr)
myControl.InjectRelays(theirVpnIpNet.Addr(), []netip.Addr{relayVpnIpNet.Addr()})
relayControl.InjectLightHouseAddr(theirVpnIpNet.Addr(), theirUdpAddr)
myControl.InjectTunUDPPacket(theirVpnIpNet.Addr(), 80, 80, []byte("Hi from me"))

p = r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works")
packet := gopacket.NewPacket(p, layers.LayerTypeIPv4, gopacket.Lazy)
v4 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4)
if slices.Compare(v4.SrcIP, myVpnIpNet.Addr().AsSlice()) != 0 {
t.Logf("SrcIP is unexpected...this is not the packet I'm looking for. Keep looking")
continue
}
if slices.Compare(v4.DstIP, theirVpnIpNet.Addr().AsSlice()) != 0 {
t.Logf("DstIP is unexpected...this is not the packet I'm looking for. Keep looking")
continue
}

udp := packet.Layer(layers.LayerTypeUDP).(*layers.UDP)
if udp == nil {
t.Log("Not a UDP packet. This is not the packet I'm looking for. Keep looking")
continue
}
data := packet.ApplicationLayer()
if data == nil {
t.Log("No data found in packet. This is not the packet I'm looking for. Keep looking.")
continue
}
if string(data.Payload()) != "Hi from me" {
t.Logf("Unexpected payload: '%v', keep looking", string(data.Payload()))
continue
}
t.Log("I found my lost packet. I am so happy.")
break
}
t.Log("Assert the tunnel works the other way, too")
for {
t.Log("RouteForAllUntilTxTun")
theirControl.InjectTunUDPPacket(myVpnIpNet.Addr(), 80, 80, []byte("Hi from them"))

p = r.RouteForAllUntilTxTun(myControl)
r.Log("Assert the tunnel works")
packet := gopacket.NewPacket(p, layers.LayerTypeIPv4, gopacket.Lazy)
v4 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4)
if slices.Compare(v4.DstIP, myVpnIpNet.Addr().AsSlice()) != 0 {
t.Logf("Dst is unexpected...this is not the packet I'm looking for. Keep looking")
continue
}
if slices.Compare(v4.SrcIP, theirVpnIpNet.Addr().AsSlice()) != 0 {
t.Logf("SrcIP is unexpected...this is not the packet I'm looking for. Keep looking")
continue
}

udp := packet.Layer(layers.LayerTypeUDP).(*layers.UDP)
if udp == nil {
t.Log("Not a UDP packet. This is not the packet I'm looking for. Keep looking")
continue
}
data := packet.ApplicationLayer()
if data == nil {
t.Log("No data found in packet. This is not the packet I'm looking for. Keep looking.")
continue
}
if string(data.Payload()) != "Hi from them" {
t.Logf("Unexpected payload: '%v', keep looking", string(data.Payload()))
continue
}
t.Log("I found my lost packet. I am so happy.")
break
}
r.RenderHostmaps("Final hostmaps", myControl, relayControl, theirControl)

}

func TestStage1RaceRelays(t *testing.T) {
//NOTE: this is a race between me and relay resulting in a full tunnel from me to them via relay
ca, _, caKey, _ := NewTestCaCert(time.Now(), time.Now().Add(10*time.Minute), nil, nil, []string{})
Expand Down
3 changes: 3 additions & 0 deletions handshake_ix.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
return
}
hostinfo.relayState.InsertRelayTo(via.relayHI.vpnIp)
// I successfully received a handshake. Just in case I marked this tunnel as 'Disestablished', ensure
// it's correctly marked as working.
via.relayHI.relayState.UpdateRelayForByIdxState(via.remoteIdx, Established)
f.SendVia(via.relayHI, via.relay, msg, make([]byte, 12), make([]byte, mtu), false)
f.l.WithField("vpnIp", vpnIp).WithField("relay", via.relayHI.vpnIp).
WithField("certName", certName).
Expand Down
90 changes: 48 additions & 42 deletions handshake_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,48 +278,8 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered
continue
}
// Check the relay HostInfo to see if we already established a relay through it
if existingRelay, ok := relayHostInfo.relayState.QueryRelayForByIp(vpnIp); ok {
switch existingRelay.State {
case Established:
hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Send handshake via relay")
hm.f.SendVia(relayHostInfo, existingRelay, hostinfo.HandshakePacket[0], make([]byte, 12), make([]byte, mtu), false)
case Requested:
hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Re-send CreateRelay request")

//TODO: IPV6-WORK
myVpnIpB := hm.f.myVpnNet.Addr().As4()
theirVpnIpB := vpnIp.As4()

// Re-send the CreateRelay request, in case the previous one was lost.
m := NebulaControl{
Type: NebulaControl_CreateRelayRequest,
InitiatorRelayIndex: existingRelay.LocalIndex,
RelayFromIp: binary.BigEndian.Uint32(myVpnIpB[:]),
RelayToIp: binary.BigEndian.Uint32(theirVpnIpB[:]),
}
msg, err := m.Marshal()
if err != nil {
hostinfo.logger(hm.l).
WithError(err).
Error("Failed to marshal Control message to create relay")
} else {
// This must send over the hostinfo, not over hm.Hosts[ip]
hm.f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu))
hm.l.WithFields(logrus.Fields{
"relayFrom": hm.f.myVpnNet.Addr(),
"relayTo": vpnIp,
"initiatorRelayIndex": existingRelay.LocalIndex,
"relay": relay}).
Info("send CreateRelayRequest")
}
default:
hostinfo.logger(hm.l).
WithField("vpnIp", vpnIp).
WithField("state", existingRelay.State).
WithField("relay", relayHostInfo.vpnIp).
Errorf("Relay unexpected state")
}
} else {
existingRelay, ok := relayHostInfo.relayState.QueryRelayForByIp(vpnIp)
if !ok {
// No relays exist or requested yet.
if relayHostInfo.remote.IsValid() {
idx, err := AddRelay(hm.l, relayHostInfo, hm.mainHostMap, vpnIp, nil, TerminalType, Requested)
Expand Down Expand Up @@ -352,6 +312,52 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered
Info("send CreateRelayRequest")
}
}
continue
}
switch existingRelay.State {
case Established:
hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Send handshake via relay")
hm.f.SendVia(relayHostInfo, existingRelay, hostinfo.HandshakePacket[0], make([]byte, 12), make([]byte, mtu), false)
case Disestablished:
// Mark this relay as 'requested'
relayHostInfo.relayState.UpdateRelayForByIpState(vpnIp, Requested)
fallthrough
case Requested:
hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Re-send CreateRelay request")
// Re-send the CreateRelay request, in case the previous one was lost.
relayFrom := hm.f.myVpnNet.Addr().As4()
relayTo := vpnIp.As4()
m := NebulaControl{
Type: NebulaControl_CreateRelayRequest,
InitiatorRelayIndex: existingRelay.LocalIndex,
RelayFromIp: binary.BigEndian.Uint32(relayFrom[:]),
RelayToIp: binary.BigEndian.Uint32(relayTo[:]),
}

msg, err := m.Marshal()
if err != nil {
hostinfo.logger(hm.l).
WithError(err).
Error("Failed to marshal Control message to create relay")
} else {
// This must send over the hostinfo, not over hm.Hosts[ip]
hm.f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu))
hm.l.WithFields(logrus.Fields{
"relayFrom": hm.f.myVpnNet,
"relayTo": vpnIp,
"initiatorRelayIndex": existingRelay.LocalIndex,
"relay": relay}).
Info("send CreateRelayRequest")
}
case PeerRequested:
// PeerRequested only occurs in Forwarding relays, not Terminal relays, and this is a Terminal relay case.
fallthrough
default:
hostinfo.logger(hm.l).
WithField("vpnIp", vpnIp).
WithField("state", existingRelay.State).
WithField("relay", relay).
Errorf("Relay unexpected state")
}
}
}
Expand Down
51 changes: 51 additions & 0 deletions hostmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
Requested = iota
PeerRequested
Established
Disestablished
)

const (
Expand Down Expand Up @@ -79,6 +80,28 @@ func (rs *RelayState) DeleteRelay(ip netip.Addr) {
delete(rs.relays, ip)
}

func (rs *RelayState) UpdateRelayForByIpState(vpnIp netip.Addr, state int) {
rs.Lock()
defer rs.Unlock()
if r, ok := rs.relayForByIp[vpnIp]; ok {
newRelay := *r
newRelay.State = state
rs.relayForByIp[newRelay.PeerIp] = &newRelay
rs.relayForByIdx[newRelay.LocalIndex] = &newRelay
}
}

func (rs *RelayState) UpdateRelayForByIdxState(idx uint32, state int) {
rs.Lock()
defer rs.Unlock()
if r, ok := rs.relayForByIdx[idx]; ok {
newRelay := *r
newRelay.State = state
rs.relayForByIp[newRelay.PeerIp] = &newRelay
rs.relayForByIdx[newRelay.LocalIndex] = &newRelay
}
}

func (rs *RelayState) CopyAllRelayFor() []*Relay {
rs.RLock()
defer rs.RUnlock()
Expand Down Expand Up @@ -361,6 +384,7 @@ func (hm *HostMap) unlockedMakePrimary(hostinfo *HostInfo) {

func (hm *HostMap) unlockedDeleteHostInfo(hostinfo *HostInfo) {
primary, ok := hm.Hosts[hostinfo.vpnIp]
isLastHostinfo := hostinfo.next == nil && hostinfo.prev == nil
if ok && primary == hostinfo {
// The vpnIp pointer points to the same hostinfo as the local index id, we can remove it
delete(hm.Hosts, hostinfo.vpnIp)
Expand Down Expand Up @@ -410,6 +434,12 @@ func (hm *HostMap) unlockedDeleteHostInfo(hostinfo *HostInfo) {
Debug("Hostmap hostInfo deleted")
}

if isLastHostinfo {
// I have lost connectivity to my peers. My relay tunnel is likely broken. Mark the next
// hops as 'Disestablished' so that new relay tunnels are created in the future.
hm.unlockedDisestablishVpnAddrRelayFor(hostinfo)
}
// Clean up any local relay indexes for which I am acting as a relay hop
for _, localRelayIdx := range hostinfo.relayState.CopyRelayForIdxs() {
delete(hm.Relays, localRelayIdx)
}
Expand Down Expand Up @@ -470,6 +500,27 @@ func (hm *HostMap) QueryVpnIpRelayFor(targetIp, relayHostIp netip.Addr) (*HostIn
return nil, nil, errors.New("unable to find host with relay")
}

func (hm *HostMap) unlockedDisestablishVpnAddrRelayFor(hi *HostInfo) {
for _, relayHostIp := range hi.relayState.CopyRelayIps() {
if h, ok := hm.Hosts[relayHostIp]; ok {
for h != nil {
h.relayState.UpdateRelayForByIpState(hi.vpnIp, Disestablished)
h = h.next
}
}
}
for _, rs := range hi.relayState.CopyAllRelayFor() {
if rs.Type == ForwardingType {
if h, ok := hm.Hosts[rs.PeerIp]; ok {
for h != nil {
h.relayState.UpdateRelayForByIpState(hi.vpnIp, Disestablished)
h = h.next
}
}
}
}
}

func (hm *HostMap) queryVpnIp(vpnIp netip.Addr, promoteIfce *Interface) *HostInfo {
hm.RLock()
if h, ok := hm.Hosts[vpnIp]; ok {
Expand Down
Loading

0 comments on commit 802a4f4

Please sign in to comment.