From ed30d5dec46e8ee85dea2c442bc676634cc509a1 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Date: Tue, 31 Dec 2024 14:26:37 -0500 Subject: [PATCH] [v2][storage] Create v2 query service to operate on otlp data model (#6343) ## Which problem is this PR solving? - Towards #6337 ## Description of the changes - Implement a v2 version of the query service that operates on the OTLP data model. This PR will be followed up by a series of PRs where this this new query service will be updated with the existing handlers. Once all the handlers have been migrated to use this query service, we can remove the old one. ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Mahad Zaryab --- .../internal/integration/trace_reader.go | 2 +- .../app/querysvc/v2/querysvc/package_test.go | 14 + cmd/query/app/querysvc/v2/querysvc/service.go | 203 +++++++ .../app/querysvc/v2/querysvc/service_test.go | 496 ++++++++++++++++++ plugin/storage/integration/integration.go | 2 +- storage_v2/tracestore/mocks/Reader.go | 8 +- storage_v2/tracestore/reader.go | 6 +- storage_v2/v1adapter/reader.go | 2 +- storage_v2/v1adapter/reader_test.go | 2 +- 9 files changed, 724 insertions(+), 11 deletions(-) create mode 100644 cmd/query/app/querysvc/v2/querysvc/package_test.go create mode 100644 cmd/query/app/querysvc/v2/querysvc/service.go create mode 100644 cmd/query/app/querysvc/v2/querysvc/service_test.go diff --git a/cmd/jaeger/internal/integration/trace_reader.go b/cmd/jaeger/internal/integration/trace_reader.go index cfa102de13a..a61edf63f43 100644 --- a/cmd/jaeger/internal/integration/trace_reader.go +++ b/cmd/jaeger/internal/integration/trace_reader.go @@ -90,7 +90,7 @@ func (r *traceReader) GetServices(ctx context.Context) ([]string, error) { return res.Services, nil } -func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) { +func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParams) ([]tracestore.Operation, error) { var operations []tracestore.Operation res, err := r.client.GetOperations(ctx, &api_v3.GetOperationsRequest{ Service: query.ServiceName, diff --git a/cmd/query/app/querysvc/v2/querysvc/package_test.go b/cmd/query/app/querysvc/v2/querysvc/package_test.go new file mode 100644 index 00000000000..755423f86da --- /dev/null +++ b/cmd/query/app/querysvc/v2/querysvc/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package querysvc + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/query/app/querysvc/v2/querysvc/service.go b/cmd/query/app/querysvc/v2/querysvc/service.go new file mode 100644 index 00000000000..bfcc98053c3 --- /dev/null +++ b/cmd/query/app/querysvc/v2/querysvc/service.go @@ -0,0 +1,203 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package querysvc + +import ( + "context" + "errors" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster" + "github.com/jaegertracing/jaeger/internal/jptrace" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" + "github.com/jaegertracing/jaeger/storage_v2/depstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" +) + +var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") + +const ( + defaultMaxClockSkewAdjust = time.Second +) + +// QueryServiceOptions holds the configuration options for the query service. +type QueryServiceOptions struct { + // ArchiveTraceReader is used to read archived traces from the storage. + ArchiveTraceReader tracestore.Reader + // ArchiveTraceWriter is used to write traces to the archive storage. + ArchiveTraceWriter tracestore.Writer + // Adjuster is used to adjust traces before they are returned to the client. + // If not set, the default adjuster will be used. + Adjuster adjuster.Adjuster +} + +// StorageCapabilities is a feature flag for query service +type StorageCapabilities struct { + ArchiveStorage bool `json:"archiveStorage"` + // TODO: Maybe add metrics Storage here + // SupportRegex bool + // SupportTagFilter bool +} + +// QueryService provides methods to query data from the storage. +type QueryService struct { + traceReader tracestore.Reader + dependencyReader depstore.Reader + options QueryServiceOptions +} + +// GetTraceParams defines the parameters for retrieving traces using the GetTraces function. +type GetTraceParams struct { + // TraceIDs is a slice of trace identifiers to fetch. + TraceIDs []tracestore.GetTraceParams + // RawTraces indicates whether to retrieve raw traces. + // If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster. + RawTraces bool +} + +// TraceQueryParams represents the parameters for querying a batch of traces. +type TraceQueryParams struct { + tracestore.TraceQueryParams + // RawTraces indicates whether to retrieve raw traces. + // If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster. + RawTraces bool +} + +func NewQueryService( + traceReader tracestore.Reader, + dependencyReader depstore.Reader, + options QueryServiceOptions, +) *QueryService { + qsvc := &QueryService{ + traceReader: traceReader, + dependencyReader: dependencyReader, + options: options, + } + + if qsvc.options.Adjuster == nil { + qsvc.options.Adjuster = adjuster.Sequence( + adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...) + } + return qsvc +} + +// GetTraces retrieves traces with given trace IDs from the primary reader, +// and if any of them are not found it then queries the archive reader. +// The iterator is single-use: once consumed, it cannot be used again. +func (qs QueryService) GetTraces( + ctx context.Context, + params GetTraceParams, +) iter.Seq2[[]ptrace.Traces, error] { + getTracesIter := qs.traceReader.GetTraces(ctx, params.TraceIDs...) + return func(yield func([]ptrace.Traces, error) bool) { + foundTraceIDs, proceed := qs.receiveTraces(getTracesIter, yield, params.RawTraces) + if proceed && qs.options.ArchiveTraceReader != nil { + var missingTraceIDs []tracestore.GetTraceParams + for _, id := range params.TraceIDs { + if _, found := foundTraceIDs[id.TraceID]; !found { + missingTraceIDs = append(missingTraceIDs, id) + } + } + if len(missingTraceIDs) > 0 { + getArchiveTracesIter := qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...) + qs.receiveTraces(getArchiveTracesIter, yield, params.RawTraces) + } + } + } +} + +func (qs QueryService) GetServices(ctx context.Context) ([]string, error) { + return qs.traceReader.GetServices(ctx) +} + +func (qs QueryService) GetOperations( + ctx context.Context, + query tracestore.OperationQueryParams, +) ([]tracestore.Operation, error) { + return qs.traceReader.GetOperations(ctx, query) +} + +func (qs QueryService) FindTraces( + ctx context.Context, + query TraceQueryParams, +) iter.Seq2[[]ptrace.Traces, error] { + return func(yield func([]ptrace.Traces, error) bool) { + tracesIter := qs.traceReader.FindTraces(ctx, query.TraceQueryParams) + qs.receiveTraces(tracesIter, yield, query.RawTraces) + } +} + +// ArchiveTrace archives a trace specified by the given query parameters. +// If the ArchiveTraceWriter is not configured, it returns +// an error indicating that there is no archive span storage available. +func (qs QueryService) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error { + if qs.options.ArchiveTraceWriter == nil { + return errNoArchiveSpanStorage + } + getTracesIter := qs.GetTraces( + ctx, GetTraceParams{TraceIDs: []tracestore.GetTraceParams{query}}, + ) + var archiveErr error + getTracesIter(func(traces []ptrace.Traces, err error) bool { + if err != nil { + archiveErr = err + return false + } + for _, trace := range traces { + err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace) + if err != nil { + archiveErr = errors.Join(archiveErr, err) + } + } + return true + }) + return archiveErr +} + +func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{ + StartTime: endTs.Add(-lookback), + EndTime: endTs, + }) +} + +func (qs QueryService) GetCapabilities() StorageCapabilities { + return StorageCapabilities{ + ArchiveStorage: qs.options.hasArchiveStorage(), + } +} + +func (opts *QueryServiceOptions) hasArchiveStorage() bool { + return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil +} + +func (qs QueryService) receiveTraces( + seq iter.Seq2[[]ptrace.Traces, error], + yield func([]ptrace.Traces, error) bool, + rawTraces bool, +) (map[pcommon.TraceID]struct{}, bool) { + aggregatedTraces := jptrace.AggregateTraces(seq) + foundTraceIDs := make(map[pcommon.TraceID]struct{}) + proceed := true + aggregatedTraces(func(trace ptrace.Traces, err error) bool { + if err != nil { + proceed = yield(nil, err) + return proceed + } + if !rawTraces { + qs.options.Adjuster.Adjust(trace) + } + jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool { + foundTraceIDs[span.TraceID()] = struct{}{} + return true + }) + proceed = yield([]ptrace.Traces{trace}, nil) + return proceed + }) + return foundTraceIDs, proceed +} diff --git a/cmd/query/app/querysvc/v2/querysvc/service_test.go b/cmd/query/app/querysvc/v2/querysvc/service_test.go new file mode 100644 index 00000000000..4794c2d4c99 --- /dev/null +++ b/cmd/query/app/querysvc/v2/querysvc/service_test.go @@ -0,0 +1,496 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package querysvc + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" + "github.com/jaegertracing/jaeger/storage_v2/depstore" + depstoremocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" + tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks" +) + +const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond) + +var ( + defaultDependencyLookbackDuration = time.Hour * 24 + + testTraceID = pcommon.TraceID([16]byte{1}) +) + +type testQueryService struct { + queryService *QueryService + traceReader *tracestoremocks.Reader + depsReader *depstoremocks.Reader + + archiveTraceReader *tracestoremocks.Reader + archiveTraceWriter *tracestoremocks.Writer +} + +type testOption func(*testQueryService, *QueryServiceOptions) + +func withArchiveTraceReader() testOption { + return func(tqs *testQueryService, options *QueryServiceOptions) { + r := &tracestoremocks.Reader{} + tqs.archiveTraceReader = r + options.ArchiveTraceReader = r + } +} + +func withArchiveTraceWriter() testOption { + return func(tqs *testQueryService, options *QueryServiceOptions) { + r := &tracestoremocks.Writer{} + tqs.archiveTraceWriter = r + options.ArchiveTraceWriter = r + } +} + +func initializeTestService(opts ...testOption) *testQueryService { + traceReader := &tracestoremocks.Reader{} + dependencyStorage := &depstoremocks.Reader{} + + options := QueryServiceOptions{} + + tqs := testQueryService{ + traceReader: traceReader, + depsReader: dependencyStorage, + } + + for _, opt := range opts { + opt(&tqs, &options) + } + + tqs.queryService = NewQueryService(traceReader, dependencyStorage, options) + return &tqs +} + +func makeTestTrace() ptrace.Traces { + trace := ptrace.NewTraces() + resources := trace.ResourceSpans().AppendEmpty() + scopes := resources.ScopeSpans().AppendEmpty() + + spanA := scopes.Spans().AppendEmpty() + spanA.SetTraceID(testTraceID) + spanA.SetSpanID(pcommon.SpanID([8]byte{1})) + + spanB := scopes.Spans().AppendEmpty() + spanB.SetTraceID(testTraceID) + spanB.SetSpanID(pcommon.SpanID([8]byte{2})) + spanB.Attributes() + + return trace +} + +func TestGetTraces_ErrorInReader(t *testing.T) { + tqs := initializeTestService() + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield(nil, assert.AnError) + })).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + { + TraceID: testTraceID, + }, + }, + } + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + _, err := iter.FlattenWithErrors(getTracesIter) + require.ErrorIs(t, err, assert.AnError) +} + +func TestGetTraces_Success(t *testing.T) { + tqs := initializeTestService() + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + })).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + {TraceID: testTraceID}, + }, + } + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + require.Len(t, gotTraces, 1) + + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() + require.Equal(t, 2, gotSpans.Len()) + require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) + require.EqualValues(t, [8]byte{1}, gotSpans.At(0).SpanID()) + require.Equal(t, testTraceID, gotSpans.At(1).TraceID()) + require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) +} + +func TestGetTraces_WithRawTraces(t *testing.T) { + tests := []struct { + rawTraces bool + attributes pcommon.Map + expected pcommon.Map + }{ + { + // tags should not get sorted by SortCollections adjuster + rawTraces: true, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + }, + { + // tags should get sorted by SortCollections adjuster + rawTraces: false, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("a", "key") + m.PutStr("z", "key") + return m + }(), + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) { + trace := makeTestTrace() + test.attributes.CopyTo(trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()) + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace}, nil) + }) + + tqs := initializeTestService() + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + { + TraceID: testTraceID, + }, + }, + RawTraces: test.rawTraces, + } + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + + require.Len(t, gotTraces, 1) + gotAttributes := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() + require.Equal(t, test.expected, gotAttributes) + }) + } +} + +func TestGetTraces_TraceInArchiveStorage(t *testing.T) { + tqs := initializeTestService(withArchiveTraceReader()) + + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{}, nil) + })).Once() + + tqs.archiveTraceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + })).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + {TraceID: testTraceID}, + }, + } + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + require.Len(t, gotTraces, 1) + + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() + require.Equal(t, 2, gotSpans.Len()) + require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) + require.EqualValues(t, [8]byte{1}, gotSpans.At(0).SpanID()) + require.Equal(t, testTraceID, gotSpans.At(1).TraceID()) + require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) +} + +func TestGetServices(t *testing.T) { + tqs := initializeTestService() + expected := []string{"trifle", "bling"} + tqs.traceReader.On("GetServices", mock.Anything).Return(expected, nil).Once() + + actualServices, err := tqs.queryService.GetServices(context.Background()) + require.NoError(t, err) + assert.Equal(t, expected, actualServices) +} + +func TestGetOperations(t *testing.T) { + tqs := initializeTestService() + expected := []tracestore.Operation{{Name: "", SpanKind: ""}, {Name: "get", SpanKind: ""}} + operationQuery := tracestore.OperationQueryParams{ServiceName: "abc/trifle"} + tqs.traceReader.On( + "GetOperations", + mock.Anything, + operationQuery, + ).Return(expected, nil).Once() + + actualOperations, err := tqs.queryService.GetOperations(context.Background(), operationQuery) + require.NoError(t, err) + assert.Equal(t, expected, actualOperations) +} + +func TestFindTraces_Success(t *testing.T) { + tqs := initializeTestService() + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + + duration := 20 * time.Millisecond + now := time.Now() + queryParams := tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + } + tqs.traceReader.On("FindTraces", mock.Anything, queryParams).Return(responseIter).Once() + + query := TraceQueryParams{TraceQueryParams: queryParams} + getTracesIter := tqs.queryService.FindTraces(context.Background(), query) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + require.Len(t, gotTraces, 1) + + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() + require.Equal(t, 2, gotSpans.Len()) + require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) + require.EqualValues(t, [8]byte{1}, gotSpans.At(0).SpanID()) + require.Equal(t, testTraceID, gotSpans.At(1).TraceID()) + require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) +} + +func TestFindTraces_WithRawTraces(t *testing.T) { + tests := []struct { + rawTraces bool + attributes pcommon.Map + expected pcommon.Map + }{ + { + // tags should not get sorted by SortTagsAndLogFields adjuster + rawTraces: true, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + }, + { + // tags should get sorted by SortTagsAndLogFields adjuster + rawTraces: false, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("a", "key") + m.PutStr("z", "key") + return m + }(), + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) { + trace := makeTestTrace() + test.attributes.CopyTo(trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()) + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace}, nil) + }) + + tqs := initializeTestService() + duration, err := time.ParseDuration("20ms") + require.NoError(t, err) + now := time.Now() + tqs.traceReader.On("FindTraces", mock.Anything, tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }). + Return(responseIter).Once() + + query := TraceQueryParams{ + TraceQueryParams: tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }, + RawTraces: test.rawTraces, + } + getTracesIter := tqs.queryService.FindTraces(context.Background(), query) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + + require.Len(t, gotTraces, 1) + gotAttributes := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() + require.Equal(t, test.expected, gotAttributes) + }) + } +} + +func TestArchiveTrace(t *testing.T) { + tests := []struct { + name string + options []testOption + setupMocks func(tqs *testQueryService) + expectedError error + }{ + { + name: "no options", + options: nil, + setupMocks: func(*testQueryService) {}, + expectedError: errNoArchiveSpanStorage, + }, + { + name: "get trace error", + options: []testOption{withArchiveTraceWriter()}, + setupMocks: func(tqs *testQueryService) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{}, assert.AnError) + }) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + }, + expectedError: assert.AnError, + }, + { + name: "archive writer error", + options: []testOption{withArchiveTraceWriter()}, + setupMocks: func(tqs *testQueryService) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). + Return(assert.AnError).Once() + }, + expectedError: assert.AnError, + }, + { + name: "success", + options: []testOption{withArchiveTraceWriter()}, + setupMocks: func(tqs *testQueryService) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). + Return(nil).Once() + }, + expectedError: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tqs := initializeTestService(test.options...) + test.setupMocks(tqs) + + query := tracestore.GetTraceParams{ + TraceID: testTraceID, + } + + err := tqs.queryService.ArchiveTrace(context.Background(), query) + if test.expectedError != nil { + require.ErrorIs(t, err, test.expectedError) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestGetDependencies(t *testing.T) { + tqs := initializeTestService() + expected := []model.DependencyLink{ + {Parent: "killer", Child: "queen", CallCount: 12}, + } + endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) + tqs.depsReader.On("GetDependencies", mock.Anything, depstore.QueryParameters{ + StartTime: endTs.Add(-defaultDependencyLookbackDuration), + EndTime: endTs, + }).Return(expected, nil).Once() + + actualDependencies, err := tqs.queryService.GetDependencies(context.Background(), endTs, defaultDependencyLookbackDuration) + require.NoError(t, err) + assert.Equal(t, expected, actualDependencies) +} + +func TestGetCapabilities(t *testing.T) { + tests := []struct { + name string + options []testOption + expected StorageCapabilities + }{ + { + name: "without archive storage", + expected: StorageCapabilities{ + ArchiveStorage: false, + }, + }, + { + name: "with archive storage", + options: []testOption{withArchiveTraceReader(), withArchiveTraceWriter()}, + expected: StorageCapabilities{ + ArchiveStorage: true, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tqs := initializeTestService(test.options...) + assert.Equal(t, test.expected, tqs.queryService.GetCapabilities()) + }) + } +} diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 159e958c5bf..982eba3e9fc 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -270,7 +270,7 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { found := s.waitForCondition(t, func(t *testing.T) bool { var err error actual, err = s.TraceReader.GetOperations(context.Background(), - tracestore.OperationQueryParameters{ServiceName: "example-service-1"}) + tracestore.OperationQueryParams{ServiceName: "example-service-1"}) if err != nil { t.Log(err) return false diff --git a/storage_v2/tracestore/mocks/Reader.go b/storage_v2/tracestore/mocks/Reader.go index 5f87e43581b..fd0d0a7e708 100644 --- a/storage_v2/tracestore/mocks/Reader.go +++ b/storage_v2/tracestore/mocks/Reader.go @@ -66,7 +66,7 @@ func (_m *Reader) FindTraces(ctx context.Context, query tracestore.TraceQueryPar } // GetOperations provides a mock function with given fields: ctx, query -func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) { +func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQueryParams) ([]tracestore.Operation, error) { ret := _m.Called(ctx, query) if len(ret) == 0 { @@ -75,10 +75,10 @@ func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQ var r0 []tracestore.Operation var r1 error - if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParameters) ([]tracestore.Operation, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParams) ([]tracestore.Operation, error)); ok { return rf(ctx, query) } - if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParameters) []tracestore.Operation); ok { + if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParams) []tracestore.Operation); ok { r0 = rf(ctx, query) } else { if ret.Get(0) != nil { @@ -86,7 +86,7 @@ func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQ } } - if rf, ok := ret.Get(1).(func(context.Context, tracestore.OperationQueryParameters) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, tracestore.OperationQueryParams) error); ok { r1 = rf(ctx, query) } else { r1 = ret.Error(1) diff --git a/storage_v2/tracestore/reader.go b/storage_v2/tracestore/reader.go index 5eef0ed8aca..86160412c1f 100644 --- a/storage_v2/tracestore/reader.go +++ b/storage_v2/tracestore/reader.go @@ -36,7 +36,7 @@ type Reader interface { // GetOperations returns all operation names for a given service // known to the backend from spans within its retention period. - GetOperations(ctx context.Context, query OperationQueryParameters) ([]Operation, error) + GetOperations(ctx context.Context, query OperationQueryParams) ([]Operation, error) // FindTraces returns an iterator that retrieves traces matching query parameters. // The iterator is single-use: once consumed, it cannot be used again. @@ -101,8 +101,8 @@ func (t *TraceQueryParams) ToSpanStoreQueryParameters() *spanstore.TraceQueryPar } } -// OperationQueryParameters contains parameters of query operations, empty spanKind means get operations for all kinds of span. -type OperationQueryParameters struct { +// OperationQueryParams contains parameters of query operations, empty spanKind means get operations for all kinds of span. +type OperationQueryParams struct { ServiceName string SpanKind string } diff --git a/storage_v2/v1adapter/reader.go b/storage_v2/v1adapter/reader.go index 225e9267d89..7ef6921d5e9 100644 --- a/storage_v2/v1adapter/reader.go +++ b/storage_v2/v1adapter/reader.go @@ -73,7 +73,7 @@ func (tr *TraceReader) GetServices(ctx context.Context) ([]string, error) { func (tr *TraceReader) GetOperations( ctx context.Context, - query tracestore.OperationQueryParameters, + query tracestore.OperationQueryParams, ) ([]tracestore.Operation, error) { o, err := tr.spanReader.GetOperations(ctx, spanstore.OperationQueryParameters{ ServiceName: query.ServiceName, diff --git a/storage_v2/v1adapter/reader_test.go b/storage_v2/v1adapter/reader_test.go index c142ee99c13..e1952c0a825 100644 --- a/storage_v2/v1adapter/reader_test.go +++ b/storage_v2/v1adapter/reader_test.go @@ -240,7 +240,7 @@ func TestTraceReader_GetOperationsDelegatesResponse(t *testing.T) { } operations, err := traceReader.GetOperations( context.Background(), - tracestore.OperationQueryParameters{ + tracestore.OperationQueryParams{ ServiceName: "service-a", SpanKind: "server", })