diff --git a/exporter/loadbalancingexporter/log_exporter_test.go b/exporter/loadbalancingexporter/log_exporter_test.go index 8e0e59f3a1ee..8f33d32fd0b3 100644 --- a/exporter/loadbalancingexporter/log_exporter_test.go +++ b/exporter/loadbalancingexporter/log_exporter_test.go @@ -334,7 +334,6 @@ func TestLogsWithoutTraceID(t *testing.T) { } func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { - t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11116") // this test is based on the discussion in the following issue for this exporter: // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690 // prepare @@ -353,7 +352,7 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { }) resolverCh := make(chan struct{}, 1) - counter := 0 + counter := atomic.NewInt64(0) resolve := [][]net.IPAddr{ { {IP: net.IPv4(127, 0, 0, 1)}, @@ -367,14 +366,14 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { res.resolver = &mockDNSResolver{ onLookupIPAddr: func(context.Context, string) ([]net.IPAddr, error) { defer func() { - counter++ + counter.Inc() }() - if counter <= 2 { - return resolve[counter], nil + if counter.Load() <= 2 { + return resolve[counter.Load()], nil } - if counter == 3 { + if counter.Load() == 3 { // stop as soon as rolling updates end resolverCh <- struct{}{} } @@ -459,7 +458,7 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { // will still pass due to the 10 secs of sleep that is used to simulate // unreachable backends. go func() { - time.Sleep(50 * time.Millisecond) + time.Sleep(1 * time.Second) resolverCh <- struct{}{} }() diff --git a/exporter/loadbalancingexporter/resolver_dns_test.go b/exporter/loadbalancingexporter/resolver_dns_test.go index 25eeab591f7b..a09f2172ccb0 100644 --- a/exporter/loadbalancingexporter/resolver_dns_test.go +++ b/exporter/loadbalancingexporter/resolver_dns_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -134,20 +135,20 @@ func TestOnChange(t *testing.T) { } // test - counter := 0 + counter := atomic.NewInt64(0) res.onChange(func(endpoints []string) { - counter++ + counter.Inc() }) require.NoError(t, res.start(context.Background())) defer func() { require.NoError(t, res.shutdown(context.Background())) }() - require.Equal(t, 1, counter) + require.Equal(t, int64(1), counter.Load()) // now, we run it with the same IPs being resolved, which shouldn't trigger a onChange call _, err = res.resolve(context.Background()) require.NoError(t, err) - require.Equal(t, 1, counter) + require.Equal(t, int64(1), counter.Load()) // change what the resolver will resolve and trigger a resolution resolve = []net.IPAddr{ @@ -156,7 +157,7 @@ func TestOnChange(t *testing.T) { } _, err = res.resolve(context.Background()) require.NoError(t, err) - assert.Equal(t, 2, counter) + assert.Equal(t, int64(2), counter.Load()) } func TestEqualStringSlice(t *testing.T) { @@ -191,7 +192,7 @@ func TestPeriodicallyResolve(t *testing.T) { res, err := newDNSResolver(zap.NewNop(), "service-1", "") require.NoError(t, err) - counter := 0 + counter := atomic.NewInt64(0) resolve := [][]net.IPAddr{ { {IP: net.IPv4(127, 0, 0, 1)}, @@ -207,15 +208,15 @@ func TestPeriodicallyResolve(t *testing.T) { res.resolver = &mockDNSResolver{ onLookupIPAddr: func(context.Context, string) ([]net.IPAddr, error) { defer func() { - counter++ + counter.Inc() }() // for second call, return the second result - if counter == 2 { + if counter.Load() == 2 { return resolve[1], nil } // for subsequent calls, return the last result, because we need more two periodic results // to confirm that it works as expected. - if counter >= 3 { + if counter.Load() >= 3 { return resolve[2], nil } @@ -241,7 +242,7 @@ func TestPeriodicallyResolve(t *testing.T) { wg.Wait() // verify - assert.GreaterOrEqual(t, counter, 3) + assert.GreaterOrEqual(t, counter.Load(), int64(3)) assert.Len(t, res.endpoints, 3) } @@ -252,19 +253,19 @@ func TestPeriodicallyResolveFailure(t *testing.T) { expectedErr := errors.New("some expected error") wg := sync.WaitGroup{} - counter := 0 + counter := atomic.NewInt64(0) resolve := []net.IPAddr{{IP: net.IPv4(127, 0, 0, 1)}} res.resolver = &mockDNSResolver{ onLookupIPAddr: func(context.Context, string) ([]net.IPAddr, error) { - counter++ + counter.Inc() // count down at most two times - if counter <= 2 { + if counter.Load() <= 2 { wg.Done() } // for subsequent calls, return the error - if counter >= 2 { + if counter.Load() >= 2 { return nil, expectedErr } @@ -285,7 +286,7 @@ func TestPeriodicallyResolveFailure(t *testing.T) { wg.Wait() // verify - assert.GreaterOrEqual(t, 2, counter) + assert.GreaterOrEqual(t, int64(2), counter.Load()) assert.Len(t, res.endpoints, 1) // no change to the list of endpoints } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 387d0f86e364..c53d3fe0aaa0 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -21,6 +21,7 @@ import ( "math/rand" "net" "path/filepath" + "sync" "testing" "time" @@ -444,13 +445,16 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { res, err := newDNSResolver(zap.NewNop(), "service-1", "") require.NoError(t, err) + mu := sync.Mutex{} var lastResolved []string res.onChange(func(s []string) { + mu.Lock() lastResolved = s + mu.Unlock() }) resolverCh := make(chan struct{}, 1) - counter := 0 + counter := atomic.NewInt64(0) resolve := [][]net.IPAddr{ { {IP: net.IPv4(127, 0, 0, 1)}, @@ -464,14 +468,14 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { res.resolver = &mockDNSResolver{ onLookupIPAddr: func(context.Context, string) ([]net.IPAddr, error) { defer func() { - counter++ + counter.Inc() }() - if counter <= 2 { - return resolve[counter], nil + if counter.Load() <= 2 { + return resolve[counter.Load()], nil } - if counter == 3 { + if counter.Load() == 3 { // stop as soon as rolling updates end resolverCh <- struct{}{} } @@ -482,6 +486,7 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { res.resInterval = 10 * time.Millisecond cfg := &Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)), Resolver: ResolverSettings{ DNS: &DNSResolver{Hostname: "service-1", Port: ""}, }, @@ -564,7 +569,9 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { <-consumeCh // verify + mu.Lock() require.Equal(t, []string{"127.0.0.2"}, lastResolved) + mu.Unlock() require.Greater(t, counter1.Load(), int64(0)) require.Greater(t, counter2.Load(), int64(0)) }