Skip to content

Commit

Permalink
Merge branch 'main' into cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
akagami-harsh authored Apr 29, 2024
2 parents 15a1107 + 28150fc commit 5d35264
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 114 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/ci-badger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ jobs:
run: |
case ${{ matrix.version }} in
v1)
make badger-storage-integration-test
make badger-storage-integration-test
;;
v2)
STORAGE=badger \
make jaeger-v2-storage-integration-test
STORAGE=badger make jaeger-v2-storage-integration-test
;;
esac
Expand All @@ -53,6 +52,6 @@ jobs:
with:
files: cover.out
verbose: true
flags: badger
flags: badger_${{ matrix.version }}
fail_ci_if_error: true
token: ${{ env.CODECOV_TOKEN }}
9 changes: 3 additions & 6 deletions .github/workflows/ci-grpc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ jobs:
run: |
case ${{ matrix.version }} in
v1)
SPAN_STORAGE_TYPE=memory \
make grpc-storage-integration-test
SPAN_STORAGE_TYPE=memory make grpc-storage-integration-test
;;
v2)
STORAGE=grpc \
SPAN_STORAGE_TYPE=memory \
make jaeger-v2-storage-integration-test
STORAGE=grpc SPAN_STORAGE_TYPE=memory make jaeger-v2-storage-integration-test
;;
esac
Expand All @@ -55,6 +52,6 @@ jobs:
with:
files: cover.out
verbose: true
flags: grpc
flags: grpc_${{ matrix.version }}
fail_ci_if_error: true
token: ${{ env.CODECOV_TOKEN }}
3 changes: 1 addition & 2 deletions cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ func (b *ConnBuilder) CreateConnection(ctx context.Context, logger *zap.Logger,
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(b.MaxRetry))))
dialOptions = append(dialOptions, b.AdditionalDialOptions...)

// TODO: Need to replace grpc.Dial with grpc.NewClient and pass test
conn, err := grpc.Dial(dialTarget, dialOptions...)
conn, err := grpc.NewClient(dialTarget, dialOptions...)
if err != nil {
return nil, err
}
Expand Down
110 changes: 33 additions & 77 deletions cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
yaml "gopkg.in/yaml.v2"

Expand Down Expand Up @@ -69,78 +68,52 @@ func TestBuilderFromConfig(t *testing.T) {

func TestBuilderWithCollectors(t *testing.T) {
spanHandler1 := &mockSpanHandler{}
s1, addr1 := initializeGRPCTestServer(t, func(s *grpc.Server) {
s1, _ := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, spanHandler1)
})
defer s1.Stop()

tests := []struct {
target string
name string
hostPorts []string
checkSuffixOnly bool
notifier discovery.Notifier
discoverer discovery.Discoverer
expectedError string
checkConnectionState bool
expectedState string
target string
name string
hostPorts []string
checkSuffixOnly bool
notifier discovery.Notifier
discoverer discovery.Discoverer
expectedError string
}{
{
target: "///round_robin",
name: "with roundrobin schema",
hostPorts: []string{"127.0.0.1:9876", "127.0.0.1:9877", "127.0.0.1:9878"},
checkSuffixOnly: true,
notifier: nil,
discoverer: nil,
checkConnectionState: false,
target: "///round_robin",
name: "with roundrobin schema",
hostPorts: []string{"127.0.0.1:9876", "127.0.0.1:9877", "127.0.0.1:9878"},
checkSuffixOnly: true,
notifier: nil,
discoverer: nil,
},
{
target: "127.0.0.1:9876",
name: "with single host",
hostPorts: []string{"127.0.0.1:9876"},
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
checkConnectionState: false,
target: "127.0.0.1:9876",
name: "with single host",
hostPorts: []string{"127.0.0.1:9876"},
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
},
{
target: "///round_robin",
name: "with custom resolver and fixed discoverer",
hostPorts: []string{"dns://random_stuff"},
checkSuffixOnly: true,
notifier: noopNotifier{},
discoverer: discovery.FixedDiscoverer{},
checkConnectionState: false,
target: "///round_robin",
name: "with custom resolver and fixed discoverer",
hostPorts: []string{"dns://random_stuff"},
checkSuffixOnly: true,
notifier: noopNotifier{},
discoverer: discovery.FixedDiscoverer{},
},
{
target: "",
name: "without collectorPorts and resolver",
hostPorts: nil,
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
expectedError: "at least one collector hostPort address is required when resolver is not available",
checkConnectionState: false,
},
{
target: addr1.String(),
name: "with collector connection status ready",
hostPorts: []string{addr1.String()},
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
checkConnectionState: true,
expectedState: "READY",
},
{
target: "random_stuff",
name: "with collector connection status failure",
hostPorts: []string{"random_stuff"},
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
checkConnectionState: true,
expectedState: "TRANSIENT_FAILURE",
target: "",
name: "without collectorPorts and resolver",
hostPorts: nil,
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
expectedError: "at least one collector hostPort address is required when resolver is not available",
},
}

Expand All @@ -159,9 +132,6 @@ func TestBuilderWithCollectors(t *testing.T) {
require.NoError(t, err)
defer conn.Close()
require.NotNil(t, conn)
if test.checkConnectionState {
assertConnectionState(t, conn, test.expectedState)
}
if test.checkSuffixOnly {
assert.True(t, strings.HasSuffix(conn.Target(), test.target))
} else {
Expand Down Expand Up @@ -395,20 +365,6 @@ func TestProxyClientTLS(t *testing.T) {
}
}

func assertConnectionState(t *testing.T, conn *grpc.ClientConn, expectedState string) {
for {
s := conn.GetState()
if s == connectivity.Ready {
assert.Equal(t, expectedState, s.String())
break
}
if s == connectivity.TransientFailure {
assert.Equal(t, expectedState, s.String())
break
}
}
}

type fakeInterceptor struct {
isCalled bool
}
Expand Down
20 changes: 10 additions & 10 deletions cmd/jaeger/internal/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package integration
import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/integration"
)

Expand All @@ -17,11 +16,7 @@ type GRPCStorageIntegration struct {
}

func (s *GRPCStorageIntegration) initialize(t *testing.T) {
logger, _ := testutils.NewLogger()

s.remoteStorage = integration.StartNewRemoteMemoryStorage(t, logger)

s.CleanUp = s.cleanUp
s.remoteStorage = integration.StartNewRemoteMemoryStorage(t)
}

func (s *GRPCStorageIntegration) cleanUp(t *testing.T) {
Expand All @@ -32,10 +27,15 @@ func (s *GRPCStorageIntegration) cleanUp(t *testing.T) {
func TestGRPCStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "grpc")

s := &GRPCStorageIntegration{}
s.ConfigFile = "../../grpc_config.yaml"
s.SkipBinaryAttrs = true

s := &GRPCStorageIntegration{
E2EStorageIntegration: E2EStorageIntegration{
ConfigFile: "../../grpc_config.yaml",
StorageIntegration: integration.StorageIntegration{
SkipBinaryAttrs: true,
},
},
}
s.CleanUp = s.cleanUp
s.initialize(t)
s.e2eInitialize(t)
t.Cleanup(func() {
Expand Down
22 changes: 21 additions & 1 deletion cmd/jaeger/internal/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
package integration

import (
"context"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -41,7 +45,7 @@ type E2EStorageIntegration struct {
func (s *E2EStorageIntegration) e2eInitialize(t *testing.T) {
logger, _ := testutils.NewLogger()
configFile := createStorageCleanerConfig(t, s.ConfigFile)

t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)
cmd := exec.Cmd{
Path: "./cmd/jaeger/jaeger",
Args: []string{"jaeger", "--config", configFile},
Expand All @@ -53,6 +57,22 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T) {
Stderr: os.Stderr,
}
require.NoError(t, cmd.Start())
require.Eventually(t, func() bool {
url := fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP)
t.Logf("Checking if Jaeger-v2 is available on %s", url)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Log(err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 30*time.Second, 500*time.Millisecond, "Jaeger-v2 did not start")
t.Log("Jaeger-v2 is ready")
t.Cleanup(func() {
require.NoError(t, cmd.Process.Kill())
})
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (g *GRPCHandler) GetTrace(r *api_v2.GetTraceRequest, stream api_v2.QuerySer
}
trace, err := g.queryService.GetTrace(stream.Context(), r.TraceID)
if errors.Is(err, spanstore.ErrTraceNotFound) {
g.logger.Error(msgTraceNotFound, zap.Error(err))
g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err))
return status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err)
}
if err != nil {
Expand All @@ -122,7 +122,7 @@ func (g *GRPCHandler) ArchiveTrace(ctx context.Context, r *api_v2.ArchiveTraceRe
}
err := g.queryService.ArchiveTrace(ctx, r.TraceID)
if errors.Is(err, spanstore.ErrTraceNotFound) {
g.logger.Error("trace not found", zap.Error(err))
g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err))
return nil, status.Errorf(codes.NotFound, "%s: %v", msgTraceNotFound, err)
}
if err != nil {
Expand Down
9 changes: 4 additions & 5 deletions plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
)

Expand All @@ -44,19 +44,18 @@ type GRPCStorageIntegrationTestSuite struct {
}

func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
s.logger, _ = testutils.NewLogger()
s.logger = zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))

if s.useRemoteStorage {
s.remoteStorage = StartNewRemoteMemoryStorage(t, s.logger)
s.remoteStorage = StartNewRemoteMemoryStorage(t)
}

f := grpc.NewFactory()
v, command := config.Viperize(f.AddFlags)
err := command.ParseFlags(s.flags)
require.NoError(t, err)
f.InitFromViper(v, zap.NewNop())
err = f.Initialize(metrics.NullFactory, s.logger)
require.NoError(t, err)
require.NoError(t, f.Initialize(metrics.NullFactory, s.logger))
s.factory = f

s.SpanWriter, err = f.CreateSpanWriter()
Expand Down
9 changes: 3 additions & 6 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
iterations = 100
)

//go:embed fixtures
var fixtures embed.FS

Expand Down Expand Up @@ -130,12 +126,13 @@ func (s *StorageIntegration) skipIfNeeded(t *testing.T) {
}

func (s *StorageIntegration) waitForCondition(t *testing.T, predicate func(t *testing.T) bool) bool {
const iterations = 100 // Will wait at most 100 seconds.
for i := 0; i < iterations; i++ {
t.Logf("Waiting for storage backend to update documents, iteration %d out of %d", i+1, iterations)
if predicate(t) {
return true
}
time.Sleep(time.Second) // Will wait at most 100 seconds.
t.Logf("Waiting for storage backend to update documents, iteration %d out of %d", i+1, iterations)
time.Sleep(time.Second)
}
return predicate(t)
}
Expand Down
Loading

0 comments on commit 5d35264

Please sign in to comment.