From 99deec2c95a0859c4eee952653c432ea17b169f0 Mon Sep 17 00:00:00 2001 From: Ayush Vishwakarma Date: Thu, 19 Dec 2024 23:00:09 +0530 Subject: [PATCH] Added fixture Convertor Signed-off-by: Ayush Vishwakarma --- plugin/storage/integration/integration.go | 373 +++++++++++++--------- 1 file changed, 230 insertions(+), 143 deletions(-) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index c6f62cf70fe..b9637d5daaf 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -19,16 +19,17 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/ptrace" - samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" ) //go:embed fixtures @@ -45,13 +46,14 @@ var fixtures embed.FS type StorageIntegration struct { SpanWriter spanstore.Writer SpanReader spanstore.Reader - TraceReader tracestore.Reader ArchiveSpanReader spanstore.Reader ArchiveSpanWriter spanstore.Writer DependencyWriter dependencystore.Writer DependencyReader dependencystore.Reader SamplingStore samplingstore.Store Fixtures []*QueryFixtures + TraceReader tracestore.Reader + TraceWriter tracestore.Writer // TODO: remove this after all storage backends return spanKind from GetOperations GetOperationsMissingSpanKind bool @@ -82,7 +84,7 @@ type StorageIntegration struct { // the service name is formatted "query##-service". type QueryFixtures struct { Caption string - Query *spanstore.TraceQueryParameters + Query *tracestore.TraceQueryParams ExpectedFixtures []string } @@ -138,99 +140,56 @@ func (*StorageIntegration) waitForCondition(t *testing.T, predicate func(t *test func (s *StorageIntegration) testGetServices(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) - + expected := []string{"example-service-1", "example-service-2", "example-service-3"} s.loadParseAndWriteExampleTrace(t) - + var actual []string found := s.waitForCondition(t, func(t *testing.T) bool { - var err error - actual, err = s.TraceReader.GetServices(context.Background()) - if err != nil { - t.Log(err) - return false - } - sort.Strings(actual) - t.Logf("Retrieved services: %v", actual) - - if len(actual) > len(expected) { - t.Log("🛑 Found unexpected services!") - for _, service := range actual { - traceSeq := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ - ServiceName: service, - }) - - hasError := false - traceSeq(func(traces []ptrace.Traces, err error) bool { - if err != nil { - t.Log(err) - hasError = true - return false - } - for _, otelTrace := range traces { - t.Logf("Retrieved trace for service '%s': %v", service, otelTrace) - } - return true - }) - - if hasError { - t.Logf("Error processing traces for service '%s'", service) - return false + var err error + actual, err = s.TraceReader.GetServices(context.Background()) + if err != nil { + t.Log(err) + return false + } + sort.Strings(actual) + t.Logf("Retrieved services: %v", actual) + + if len(actual) > len(expected) { + t.Log("🛑 Found unexpected services!") + for _, service := range actual { + traceSeq := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ + ServiceName: service, + }) + + hasError := false + traceSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + t.Log(err) + hasError = true + return false + } + for _, otelTrace := range traces { + t.Logf("Retrieved trace for service '%s': %v", service, otelTrace) + } + return true + }) + + if hasError { + t.Logf("Error processing traces for service '%s'", service) + return false + } } - } - } - return assert.ObjectsAreEqualValues(expected, actual) + } + return assert.ObjectsAreEqualValues(expected, actual) }) - + if !assert.True(t, found) { - t.Log("\t Expected:", expected) - t.Log("\t Actual :", actual) + t.Log("\t Expected:", expected) + t.Log("\t Actual :", actual) } - } - -// func (s *StorageIntegration) testGetServices(t *testing.T) { -// s.skipIfNeeded(t) -// defer s.cleanUp(t) - -// expected := []string{"example-service-1", "example-service-2", "example-service-3"} -// s.loadParseAndWriteExampleTrace(t) +} -// var actual []string -// found := s.waitForCondition(t, func(t *testing.T) bool { -// var err error -// actual, err = s.SpanReader.GetServices(context.Background()) -// if err != nil { -// t.Log(err) -// return false -// } -// sort.Strings(actual) -// t.Logf("Retrieved services: %v", actual) -// if len(actual) > len(expected) { -// // If the storage backend returns more services than expected, let's log traces for those -// t.Log("🛑 Found unexpected services!") -// for _, service := range actual { -// traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ -// ServiceName: service, -// }) -// if err != nil { -// t.Log(err) -// continue -// } -// for _, trace := range traces { -// for _, span := range trace.Spans { -// t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID, span.OperationName) -// } -// } -// } -// } -// return assert.ObjectsAreEqualValues(expected, actual) -// }) - -// if !assert.True(t, found) { -// t.Log("\t Expected:", expected) -// t.Log("\t Actual :", actual) -// } -// } func (s *StorageIntegration) testArchiveTrace(t *testing.T) { s.skipIfNeeded(t) @@ -335,6 +294,34 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { } } +// func (s *StorageIntegration) testGetTrace(t *testing.T) { +// s.skipIfNeeded(t) +// defer s.cleanUp(t) + +// expected := s.loadParseAndWriteExampleTrace(t) +// expectedTraceID := expected.Spans[0].TraceID + +// var actual *model.Trace +// found := s.waitForCondition(t, func(t *testing.T) bool { +// var err error +// actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) +// if err != nil { +// t.Log(err) +// } +// return err == nil && len(actual.Spans) == len(expected.Spans) +// }) +// if !assert.True(t, found) { +// CompareTraces(t, expected, actual) +// } + +// t.Run("NotFound error", func(t *testing.T) { +// fakeTraceID := model.TraceID{High: 0, Low: 1} +// trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID}) +// assert.Equal(t, spanstore.ErrTraceNotFound, err) +// assert.Nil(t, trace) +// }) +// } + func (s *StorageIntegration) testGetTrace(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) @@ -367,103 +354,203 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) - // Note: all cases include ServiceName + StartTime range + // Load query test cases s.Fixtures = append(s.Fixtures, LoadAndParseQueryTestCases(t, "fixtures/queries.json")...) - // Each query test case only specifies matching traces, but does not provide counterexamples. - // To improve coverage we get all possible traces and store all of them before running queries. - allTraceFixtures := make(map[string]*model.Trace) - expectedTracesPerTestCase := make([][]*model.Trace, 0, len(s.Fixtures)) + // Prepare a map to store all trace fixtures and a slice for expected traces per test case + allTraceFixtures := make(map[string]ptrace.Traces) + expectedTracesPerTestCase := make([][]ptrace.Traces, 0, len(s.Fixtures)) + + // Process each query test case to prepare expected traces for _, queryTestCase := range s.Fixtures { - var expected []*model.Trace + var expected []ptrace.Traces for _, traceFixture := range queryTestCase.ExpectedFixtures { trace, ok := allTraceFixtures[traceFixture] if !ok { - trace = s.getTraceFixture(t, traceFixture) - s.writeTrace(t, trace) - allTraceFixtures[traceFixture] = trace + trace = s.getTraceFixture(t, traceFixture) // Load the trace fixture + s.writeTrace(t, trace) // Write the trace into the storage + allTraceFixtures[traceFixture] = trace // Cache the trace fixture } expected = append(expected, trace) } expectedTracesPerTestCase = append(expectedTracesPerTestCase, expected) } + + // Run tests for each query test case for i, queryTestCase := range s.Fixtures { t.Run(queryTestCase.Caption, func(t *testing.T) { s.skipIfNeeded(t) expected := expectedTracesPerTestCase[i] actual := s.findTracesByQuery(t, queryTestCase.Query, expected) + + // Compare the expected and actual traces CompareSliceOfTraces(t, expected, actual) }) } } -func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace { - var traces []*model.Trace +// func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*ptrace.Traces) []ptrace.Traces { +// var traces []*model.Trace +// found := s.waitForCondition(t, func(t *testing.T) bool { +// var err error +// traces, err = s.SpanReader.FindTraces(context.Background(), query) +// if err != nil { +// t.Log(err) +// return false +// } +// if len(expected) != len(traces) { +// t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces)) +// return false +// } +// if spanCount(expected) != spanCount(traces) { +// t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), spanCount(traces)) +// return false +// } +// return true +// }) +// require.True(t, found) +// return traces +// } +func (s *StorageIntegration) findTracesByQuery(t *testing.T, query tracestore.TraceQueryParams, expected []ptrace.Traces) []ptrace.Traces { + var collected []ptrace.Traces + var lastErr error + found := s.waitForCondition(t, func(t *testing.T) bool { - var err error - traces, err = s.SpanReader.FindTraces(context.Background(), query) - if err != nil { + collected = nil // Reset for each attempt + traceSeq := s.TraceReader.FindTraces(context.Background(), query) + + traceSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + lastErr = err t.Log(err) return false - } - if len(expected) != len(traces) { - t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces)) - return false - } - if spanCount(expected) != spanCount(traces) { - t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), spanCount(traces)) - return false - } - return true + } + collected = append(collected, traces...) + return true + }) + + if lastErr != nil { + return false + } + + if len(expected) != len(collected) { + t.Logf("Expecting certain number of traces: expected %d, actual %d", len(expected), len(collected)) + return false + } + + // Compare span counts + expectedSpans := 0 + actualSpans := 0 + for _, trace := range expected { + expectedSpans += trace.SpanCount() + } + for _, trace := range collected { + actualSpans += trace.SpanCount() + } + + if expectedSpans != actualSpans { + t.Logf("Expecting certain number of spans: expected %d, actual %d", expectedSpans, actualSpans) + return false + } + + return true }) + require.True(t, found) - return traces -} + return collected + } + +// func (s *StorageIntegration) writeTrace(t *testing.T, trace *model.Trace) { +// t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), len(trace.Spans)) +// for _, span := range trace.Spans { +// err := s.SpanWriter.WriteSpan(context.Background(), span) +// require.NoError(t, err, "Not expecting error when writing trace to storage") +// } +// } -func (s *StorageIntegration) writeTrace(t *testing.T, trace *model.Trace) { - t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), len(trace.Spans)) - for _, span := range trace.Spans { - err := s.SpanWriter.WriteSpan(context.Background(), span) - require.NoError(t, err, "Not expecting error when writing trace to storage") +func (s *StorageIntegration) writeTrace(t *testing.T, td ptrace.Traces) { + t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), td.SpanCount()) + err := s.TraceWriter.WriteTraces(context.Background(), td) + if err != nil { + t.Log(err) } } -func (s *StorageIntegration) loadParseAndWriteExampleTrace(t *testing.T) *model.Trace { +func (s *StorageIntegration) loadParseAndWriteExampleTrace(t *testing.T) ptrace.Traces { trace := s.getTraceFixture(t, "example_trace") s.writeTrace(t, trace) return trace } - -func (s *StorageIntegration) writeLargeTraceWithDuplicateSpanIds(t *testing.T) *model.Trace { +func (s *StorageIntegration) writeLargeTraceWithDuplicateSpanIds(t *testing.T) ptrace.Traces { + // Load the example trace fixture trace := s.getTraceFixture(t, "example_trace") - repeatedSpan := trace.Spans[0] - trace.Spans = make([]*model.Span, 0, 10008) + + // Get the first resource span to use as a template + resourceSpans := trace.ResourceSpans() + if resourceSpans.Len() == 0 { + t.Fatalf("No ResourceSpans found in the trace fixture") + } + originalSpans := resourceSpans.At(0).ScopeSpans().At(0).Spans() + if originalSpans.Len() == 0 { + t.Fatalf("No spans found in the ResourceSpan") + } + + // Prepare a new trace with a large number of spans + newTrace := ptrace.NewTraces() + newResourceSpans := newTrace.ResourceSpans().AppendEmpty() + resourceSpans.At(0).CopyTo(newResourceSpans) + + newScopeSpans := newResourceSpans.ScopeSpans().AppendEmpty() + originalSpans.At(0).CopyTo(newScopeSpans.Spans().AppendEmpty()) // Copy the first span as a template + for i := 0; i < 10008; i++ { - newSpan := new(model.Span) - *newSpan = *repeatedSpan - switch { - case i%100 == 0: - newSpan.SpanID = repeatedSpan.SpanID - default: - newSpan.SpanID = model.SpanID(i) + newSpan := newScopeSpans.Spans().AppendEmpty() + originalSpans.At(0).CopyTo(newSpan) + + if i%100 == 0 { + // Duplicate span ID every 100 spans + newSpan.SetSpanID(originalSpans.At(0).SpanID()) + } else { + // Set unique span ID + newSpan.SetSpanID(pcommon.NewSpanIDEmpty()) } - newSpan.StartTime = newSpan.StartTime.Add(time.Second * time.Duration(i+1)) - trace.Spans = append(trace.Spans, newSpan) + + // Adjust the start time to make each span unique + newSpan.SetStartTimestamp(originalSpans.At(0).StartTimestamp() + pcommon.Timestamp(i*1e9)) } - s.writeTrace(t, trace) - return trace + + // Write the new trace + s.writeTrace(t, newTrace) + return newTrace } -func (*StorageIntegration) getTraceFixture(t *testing.T, fixture string) *model.Trace { + +func (*StorageIntegration) getTraceFixture(t *testing.T, fixture string) ptrace.Traces { fileName := fmt.Sprintf("fixtures/traces/%s.json", fixture) + return getTraceFixtureExact(t, fileName) } -func getTraceFixtureExact(t *testing.T, fileName string) *model.Trace { - var trace model.Trace - loadAndParseJSONPB(t, fileName, &trace) - return &trace -} +// func getTraceFixtureExact(t *testing.T, fileName string) *model.Trace { +// var trace model.Trace +// loadAndParseJSONPB(t, fileName, &trace) + +// return &trace +// } +func getTraceFixtureExact(t *testing.T, fileName string) ptrace.Traces { + var modelTrace model.Trace + loadAndParseJSONPB(t, fileName, &modelTrace) + + // Create batches for each process + batch := &model.Batch{ + Spans: modelTrace.Spans, + } + traces , err := otlp2jaeger.ProtoToTraces([]*model.Batch{batch}) + if err != nil { + t.Log("Failed to Convert Trace: %v", err ) + } + return traces + } func loadAndParseJSONPB(t *testing.T, path string, object proto.Message) { // #nosec