Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed Dec 27, 2024
1 parent 7b98a38 commit 2e4cce7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 183 deletions.
173 changes: 0 additions & 173 deletions cmd/jaeger/internal/integration/span_reader.go

This file was deleted.

34 changes: 26 additions & 8 deletions cmd/jaeger/internal/integration/trace_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"fmt"
"io"
"math"
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/internal/proto/api_v3"
"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -66,26 +68,29 @@ func (r *traceReader) GetTraces(
return func(yield func([]ptrace.Traces, error) bool) {
// api_v3 does not support multi-get, so loop through IDs
for _, idParams := range traceIDs {
idStr := model.TraceIDFromOTEL(idParams.TraceID).String()
r.logger.Info("Calling api_v3.GetTrace", zap.String("trace_id", idStr))
stream, err := r.client.GetTrace(ctx, &api_v3.GetTraceRequest{
TraceId: model.TraceIDFromOTEL(idParams.TraceID).String(),
TraceId: idStr,
StartTime: idParams.Start,
EndTime: idParams.End,
})
if err != nil {
err = unwrapNotFoundErr(err)
if errors.Is(err, spanstore.ErrTraceNotFound) {
continue
r.logger.Info("Error received", zap.Error(err))
if !errors.Is(err, spanstore.ErrTraceNotFound) {
yield(nil, err)
}
yield(nil, err)
return
}
for received, err := stream.Recv(); !errors.Is(err, io.EOF); received, err = stream.Recv() {
r.logger.Info("Stream chunk received")
if err != nil {
err = unwrapNotFoundErr(err)
if errors.Is(err, spanstore.ErrTraceNotFound) {
continue
r.logger.Info("Error in stream received", zap.Error(err))
if !errors.Is(err, spanstore.ErrTraceNotFound) {
yield(nil, err)
}
yield(nil, err)
return
}
traces := received.ToTraces()
Expand Down Expand Up @@ -132,6 +137,10 @@ func (r *traceReader) FindTraces(
yield(nil, fmt.Errorf("NumTraces must not be greater than %d", math.MaxInt32))
return
}
if query.StartTimeMin.IsZero() ||
query.StartTimeMax.IsZero() {
panic("start time min and max are required parameters")
}
stream, err := r.client.FindTraces(ctx, &api_v3.FindTracesRequest{
Query: &api_v3.TraceQueryParameters{
ServiceName: query.ServiceName,
Expand Down Expand Up @@ -163,6 +172,15 @@ func (r *traceReader) FindTraces(
}
}

func (*traceReader) FindTraceIDs(ctx context.Context, query tracestore.TraceQueryParams) iter.Seq2[[]pcommon.TraceID, error] {
func (*traceReader) FindTraceIDs(_ context.Context, _ tracestore.TraceQueryParams) iter.Seq2[[]pcommon.TraceID, error] {
panic("not implemented")
}

func unwrapNotFoundErr(err error) error {
if s, _ := status.FromError(err); s != nil {
if strings.Contains(s.Message(), spanstore.ErrTraceNotFound.Error()) {
return spanstore.ErrTraceNotFound
}
}
return err
}
14 changes: 12 additions & 2 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"os"
"regexp"
"slices"
"sort"
"strings"
"testing"
Expand Down Expand Up @@ -150,14 +151,20 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
t.Log(err)
return false
}
// self-tracing in Jaeger can create a "jaeger" service, ignore it
if i := slices.Index(actual, "jaeger"); i != -1 {
actual = slices.Delete(actual, i, i+1)
}
sort.Strings(actual)
t.Logf("Retrieved services: %v", actual)
if len(actual) > len(expected) {
// If the storage backend returns more services than expected, let's log traces for those
t.Log("🛑 Found unexpected services!")
for _, service := range actual {
iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{
ServiceName: service,
ServiceName: service,
StartTimeMin: time.Now().Add(-2 * time.Hour),
StartTimeMax: time.Now(),
})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
Expand Down Expand Up @@ -379,10 +386,13 @@ func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.T

func (s *StorageIntegration) writeTrace(t *testing.T, trace *model.Trace) {
t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), len(trace.Spans))
ctx, cx := context.WithTimeout(context.Background(), 60*time.Second)
defer cx()
for _, span := range trace.Spans {
err := s.SpanWriter.WriteSpan(context.Background(), span)
err := s.SpanWriter.WriteSpan(ctx, span)
require.NoError(t, err, "Not expecting error when writing trace to storage")
}
t.Logf("%-23s Finished writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), len(trace.Spans))
}

func (s *StorageIntegration) loadParseAndWriteExampleTrace(t *testing.T) *model.Trace {
Expand Down

0 comments on commit 2e4cce7

Please sign in to comment.