Skip to content

Commit

Permalink
[exporter/loadbalancing] Consolidate rolling update tests, fix race c…
Browse files Browse the repository at this point in the history
…ondition in the traces (open-telemetry#13296)

Consolidate rolling update tests, fix race condition in the traces

Fixes open-telemetry#11116

Signed-off-by: Bogdan Drutu <[email protected]>

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Aug 12, 2022
1 parent 2c27314 commit 4c35474
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
13 changes: 6 additions & 7 deletions exporter/loadbalancingexporter/log_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ func TestLogsWithoutTraceID(t *testing.T) {
}

func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11116")
// this test is based on the discussion in the following issue for this exporter:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690
// prepare
Expand All @@ -353,7 +352,7 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
})

resolverCh := make(chan struct{}, 1)
counter := 0
counter := atomic.NewInt64(0)
resolve := [][]net.IPAddr{
{
{IP: net.IPv4(127, 0, 0, 1)},
Expand All @@ -367,14 +366,14 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
res.resolver = &mockDNSResolver{
onLookupIPAddr: func(context.Context, string) ([]net.IPAddr, error) {
defer func() {
counter++
counter.Inc()
}()

if counter <= 2 {
return resolve[counter], nil
if counter.Load() <= 2 {
return resolve[counter.Load()], nil
}

if counter == 3 {
if counter.Load() == 3 {
// stop as soon as rolling updates end
resolverCh <- struct{}{}
}
Expand Down Expand Up @@ -459,7 +458,7 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
// will still pass due to the 10 secs of sleep that is used to simulate
// unreachable backends.
go func() {
time.Sleep(50 * time.Millisecond)
time.Sleep(1 * time.Second)
resolverCh <- struct{}{}
}()

Expand Down
31 changes: 16 additions & 15 deletions exporter/loadbalancingexporter/resolver_dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -134,20 +135,20 @@ func TestOnChange(t *testing.T) {
}

// test
counter := 0
counter := atomic.NewInt64(0)
res.onChange(func(endpoints []string) {
counter++
counter.Inc()
})
require.NoError(t, res.start(context.Background()))
defer func() {
require.NoError(t, res.shutdown(context.Background()))
}()
require.Equal(t, 1, counter)
require.Equal(t, int64(1), counter.Load())

// now, we run it with the same IPs being resolved, which shouldn't trigger a onChange call
_, err = res.resolve(context.Background())
require.NoError(t, err)
require.Equal(t, 1, counter)
require.Equal(t, int64(1), counter.Load())

// change what the resolver will resolve and trigger a resolution
resolve = []net.IPAddr{
Expand All @@ -156,7 +157,7 @@ func TestOnChange(t *testing.T) {
}
_, err = res.resolve(context.Background())
require.NoError(t, err)
assert.Equal(t, 2, counter)
assert.Equal(t, int64(2), counter.Load())
}

func TestEqualStringSlice(t *testing.T) {
Expand Down Expand Up @@ -191,7 +192,7 @@ func TestPeriodicallyResolve(t *testing.T) {
res, err := newDNSResolver(zap.NewNop(), "service-1", "")
require.NoError(t, err)

counter := 0
counter := atomic.NewInt64(0)
resolve := [][]net.IPAddr{
{
{IP: net.IPv4(127, 0, 0, 1)},
Expand All @@ -207,15 +208,15 @@ func TestPeriodicallyResolve(t *testing.T) {
res.resolver = &mockDNSResolver{
onLookupIPAddr: func(context.Context, string) ([]net.IPAddr, error) {
defer func() {
counter++
counter.Inc()
}()
// for second call, return the second result
if counter == 2 {
if counter.Load() == 2 {
return resolve[1], nil
}
// for subsequent calls, return the last result, because we need more two periodic results
// to confirm that it works as expected.
if counter >= 3 {
if counter.Load() >= 3 {
return resolve[2], nil
}

Expand All @@ -241,7 +242,7 @@ func TestPeriodicallyResolve(t *testing.T) {
wg.Wait()

// verify
assert.GreaterOrEqual(t, counter, 3)
assert.GreaterOrEqual(t, counter.Load(), int64(3))
assert.Len(t, res.endpoints, 3)
}

Expand All @@ -252,19 +253,19 @@ func TestPeriodicallyResolveFailure(t *testing.T) {

expectedErr := errors.New("some expected error")
wg := sync.WaitGroup{}
counter := 0
counter := atomic.NewInt64(0)
resolve := []net.IPAddr{{IP: net.IPv4(127, 0, 0, 1)}}
res.resolver = &mockDNSResolver{
onLookupIPAddr: func(context.Context, string) ([]net.IPAddr, error) {
counter++
counter.Inc()

// count down at most two times
if counter <= 2 {
if counter.Load() <= 2 {
wg.Done()
}

// for subsequent calls, return the error
if counter >= 2 {
if counter.Load() >= 2 {
return nil, expectedErr
}

Expand All @@ -285,7 +286,7 @@ func TestPeriodicallyResolveFailure(t *testing.T) {
wg.Wait()

// verify
assert.GreaterOrEqual(t, 2, counter)
assert.GreaterOrEqual(t, int64(2), counter.Load())
assert.Len(t, res.endpoints, 1) // no change to the list of endpoints
}

Expand Down
17 changes: 12 additions & 5 deletions exporter/loadbalancingexporter/trace_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math/rand"
"net"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -444,13 +445,16 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) {
res, err := newDNSResolver(zap.NewNop(), "service-1", "")
require.NoError(t, err)

mu := sync.Mutex{}
var lastResolved []string
res.onChange(func(s []string) {
mu.Lock()
lastResolved = s
mu.Unlock()
})

resolverCh := make(chan struct{}, 1)
counter := 0
counter := atomic.NewInt64(0)
resolve := [][]net.IPAddr{
{
{IP: net.IPv4(127, 0, 0, 1)},
Expand All @@ -464,14 +468,14 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) {
res.resolver = &mockDNSResolver{
onLookupIPAddr: func(context.Context, string) ([]net.IPAddr, error) {
defer func() {
counter++
counter.Inc()
}()

if counter <= 2 {
return resolve[counter], nil
if counter.Load() <= 2 {
return resolve[counter.Load()], nil
}

if counter == 3 {
if counter.Load() == 3 {
// stop as soon as rolling updates end
resolverCh <- struct{}{}
}
Expand All @@ -482,6 +486,7 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) {
res.resInterval = 10 * time.Millisecond

cfg := &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
Resolver: ResolverSettings{
DNS: &DNSResolver{Hostname: "service-1", Port: ""},
},
Expand Down Expand Up @@ -564,7 +569,9 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) {
<-consumeCh

// verify
mu.Lock()
require.Equal(t, []string{"127.0.0.2"}, lastResolved)
mu.Unlock()
require.Greater(t, counter1.Load(), int64(0))
require.Greater(t, counter2.Load(), int64(0))
}
Expand Down

0 comments on commit 4c35474

Please sign in to comment.