Skip to content

Commit

Permalink
Add unit test coverage for custom UDP connection identifiers
Browse files Browse the repository at this point in the history
Adds unit testing to ensure that a custom UDP connection identifier
results in expected behavior. Testing adheres to the following steps:

1. Initiate 5 connections using remote address as identifier.
2. Send message on each connection that results in migration from remote
   address identifier to custom identifier.
3. Spawn 20 clients, each assigned to send one message on one of the
   connections (total of 4 messages on each connection).
4. Spawn 5 servers and ensure that each receives the first initiation
   and set messages, then 4 messages, each originating from a different
   remote address.

Signed-off-by: Daniel Mangum <[email protected]>
  • Loading branch information
hasheddan committed Jul 3, 2023
1 parent d4eb546 commit 0205010
Showing 1 changed file with 182 additions and 9 deletions.
191 changes: 182 additions & 9 deletions udp/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package udp

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -79,8 +80,7 @@ func stressDuplex(t *testing.T) {
MsgCount: 1, // Can't rely on UDP message order in CI
}

err = test.StressDuplex(ca, cb, opt)
if err != nil {
if err := test.StressDuplex(ca, cb, opt); err != nil {
t.Fatal(err)
}
}
Expand All @@ -99,14 +99,12 @@ func TestListenerCloseTimeout(t *testing.T) {
t.Fatal(err)
}

err = listener.Close()
if err != nil {
if err := listener.Close(); err != nil {
t.Fatal(err)
}

// Close client after server closes to cleanup
err = ca.Close()
if err != nil {
if err := ca.Close(); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -147,7 +145,7 @@ func TestListenerCloseUnaccepted(t *testing.T) {
time.Sleep(100 * time.Millisecond) // Wait all packets being processed by readLoop

// Unaccepted connections must be closed by listener.Close()
if err = listener.Close(); err != nil {
if err := listener.Close(); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -311,14 +309,189 @@ func TestListenerConcurrent(t *testing.T) {
}()

time.Sleep(100 * time.Millisecond) // Last Accept should be discarded
err = listener.Close()
if err != nil {
if err := listener.Close(); err != nil {
t.Fatal(err)
}

wg.Wait()
}

func TestListenerCustomConnId(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 20)
defer lim.Stop()

// Check for leaking routines
report := test.CheckRoutines(t)
defer report()

type pkt struct {
Id int
Payload string
}
network, addr := getConfig()
listener, err := (&ListenConfig{
ConnIdFn: func(raddr net.Addr, buf []byte) string {
p := &pkt{}
if err := json.Unmarshal(buf, p); err != nil {
return raddr.String()
}
return fmt.Sprint(p.Id)
},
}).Listen(network, addr)
if err != nil {
t.Fatal(err)
}

clientWg := sync.WaitGroup{}
var readFirst [5]chan struct{}
for i := range readFirst {
readFirst[i] = make(chan struct{})
}
for i := 0; i < 5; i++ {
// pin range variable
i := i
clientWg.Add(1)
go func() {
defer clientWg.Done()
conn, dErr := net.DialUDP(network, nil, listener.Addr().(*net.UDPAddr))
if dErr != nil {
t.Error(dErr)
}
if _, wErr := conn.Write([]byte("hello")); wErr != nil {
t.Error(wErr)
}
// Ensure that the first message, which does not include
// a connection ID is read before sending additional
// messages.
<-readFirst[i]
// Send a message to update the connection ID from the
// remote address to the provided ID.
buf, err := json.Marshal(&pkt{
Id: i,
Payload: "set",
})
if err != nil {
t.Error(err)
}
if _, wErr := conn.Write(buf); wErr != nil {
t.Error(wErr)
}
if cErr := conn.Close(); cErr != nil {
t.Error(cErr)
}
}()
}
var readSecond [5]chan struct{}
for i := range readSecond {
readSecond[i] = make(chan struct{})
}

// Spawn 20 clients sending on 5 connections.
for i := 0; i < 20; i++ {
// pin range variable
connID := i % 5
// Ensure that we are using a connection ID for packet
// routing prior to sending any messages.
clientWg.Add(1)
go func() {
defer clientWg.Done()
<-readSecond[connID]
conn, dErr := net.DialUDP(network, nil, listener.Addr().(*net.UDPAddr))
if dErr != nil {
t.Error(dErr)
}
buf, err := json.Marshal(&pkt{
Id: connID,
Payload: conn.LocalAddr().String(),
})
if err != nil {
t.Error(err)
}
if _, wErr := conn.Write(buf); wErr != nil {
t.Error(wErr)
}
if cErr := conn.Close(); cErr != nil {
t.Error(cErr)
}
}()
}

serverWg := sync.WaitGroup{}
clientMap := map[string]struct{}{}
var clientMapMu sync.Mutex
for i := 0; i < 5; i++ {
// pin range variable
i := i
serverWg.Add(1)
go func() {
defer serverWg.Done()
conn, err := listener.Accept()
if err != nil {
t.Error(err)
}
buf := make([]byte, 40)
n, err := conn.Read(buf)
if err != nil {
t.Error(err)
}
// Ensure first message is sent using the remote address
// as the connection ID.
if string(buf[:n]) != "hello" {
t.Error("Expected hello message")
}
close(readFirst[i])
n, err = conn.Read(buf)
if err != nil {
t.Error(err)
}
p := &pkt{}
if err := json.Unmarshal(buf[:n], p); err != nil {
t.Error(err)
}
// Ensure second message is switching the connecton ID
// from the remote address to the provided value.
if p.Payload != "set" {
t.Error("Expected set message")
}
connID := p.Id
close(readSecond[i])
for j := 0; j < 4; j++ {
n, err := conn.Read(buf)
if err != nil {
t.Error(err)
}
p := &pkt{}
if err := json.Unmarshal(buf[:n], p); err != nil {
t.Error(err)
}
if p.Id != connID {
t.Errorf("Expected connection ID %d, but got %d", connID, p.Id)
}
// Ensure we only ever receive one message from
// a given client.
clientMapMu.Lock()
if _, ok := clientMap[p.Payload]; ok {
t.Errorf("Multiple messages from single client %s", p.Payload)
}
clientMap[p.Payload] = struct{}{}
clientMapMu.Unlock()
}
if err := conn.Close(); err != nil {
t.Error(err)
}
}()
}

// Wait for servers to exit.
serverWg.Wait()
// Wait for clients to exit.
clientWg.Wait()
if err := listener.Close(); err != nil {
t.Fatal(err)
}
}

func pipe() (net.Listener, net.Conn, *net.UDPConn, error) {
// Start listening
network, addr := getConfig()
Expand Down

0 comments on commit 0205010

Please sign in to comment.