diff --git a/storage_v2/factoryadapter/reader.go b/storage_v2/factoryadapter/reader.go index c5239abd8ab..dd3f51041a1 100644 --- a/storage_v2/factoryadapter/reader.go +++ b/storage_v2/factoryadapter/reader.go @@ -40,19 +40,28 @@ func NewTraceReader(spanReader spanstore.Reader) *TraceReader { } } -func (tr *TraceReader) GetTrace(ctx context.Context, traceID pcommon.TraceID) iter.Seq2[ptrace.Traces, error] { - return func(yield func(ptrace.Traces, error) bool) { - t, err := tr.spanReader.GetTrace(ctx, model.TraceIDFromOTEL(traceID)) - if err != nil { - // if not found, return an empty iterator, otherwire yield the error. - if !errors.Is(err, spanstore.ErrTraceNotFound) { - yield(ptrace.NewTraces(), err) +func (tr *TraceReader) GetTraces( + ctx context.Context, + traceIDs ...tracestore.GetTraceParams, +) iter.Seq2[[]ptrace.Traces, error] { + return func(yield func([]ptrace.Traces, error) bool) { + for _, idParams := range traceIDs { + // TODO start/end times are not supported by v1 reader + // https://github.com/jaegertracing/jaeger/pull/6242 + t, err := tr.spanReader.GetTrace(ctx, model.TraceIDFromOTEL(idParams.TraceID)) + if err != nil { + if errors.Is(err, spanstore.ErrTraceNotFound) { + continue + } + yield(nil, err) + return + } + batch := &model.Batch{Spans: t.GetSpans()} + tr, err := model2otel.ProtoToTraces([]*model.Batch{batch}) + if !yield([]ptrace.Traces{tr}, err) || err != nil { + return } - return } - batch := &model.Batch{Spans: t.GetSpans()} - tr, err := model2otel.ProtoToTraces([]*model.Batch{batch}) - yield(tr, err) } } @@ -60,7 +69,10 @@ func (tr *TraceReader) GetServices(ctx context.Context) ([]string, error) { return tr.spanReader.GetServices(ctx) } -func (tr *TraceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) { +func (tr *TraceReader) GetOperations( + ctx context.Context, + query tracestore.OperationQueryParameters, +) ([]tracestore.Operation, error) { o, err := tr.spanReader.GetOperations(ctx, spanstore.OperationQueryParameters{ ServiceName: query.ServiceName, SpanKind: query.SpanKind, @@ -80,7 +92,7 @@ func (tr *TraceReader) GetOperations(ctx context.Context, query tracestore.Opera func (tr *TraceReader) FindTraces( ctx context.Context, - query tracestore.TraceQueryParameters, + query tracestore.TraceQueryParams, ) iter.Seq2[[]ptrace.Traces, error] { return func(yield func([]ptrace.Traces, error) bool) { traces, err := tr.spanReader.FindTraces(ctx, query.ToSpanStoreQueryParameters()) @@ -100,7 +112,7 @@ func (tr *TraceReader) FindTraces( func (tr *TraceReader) FindTraceIDs( ctx context.Context, - query tracestore.TraceQueryParameters, + query tracestore.TraceQueryParams, ) iter.Seq2[[]pcommon.TraceID, error] { return func(yield func([]pcommon.TraceID, error) bool) { traceIDs, err := tr.spanReader.FindTraceIDs(ctx, query.ToSpanStoreQueryParameters()) diff --git a/storage_v2/factoryadapter/reader_test.go b/storage_v2/factoryadapter/reader_test.go index bd7c9c4a91a..ddab5192bfe 100644 --- a/storage_v2/factoryadapter/reader_test.go +++ b/storage_v2/factoryadapter/reader_test.go @@ -42,7 +42,7 @@ func TestGetV1Reader_Error(t *testing.T) { require.ErrorIs(t, err, errV1ReaderNotAvailable) } -func TestTraceReader_GetTraceDelegatesSuccessResponse(t *testing.T) { +func TestTraceReader_GetTracesDelegatesSuccessResponse(t *testing.T) { sr := new(spanStoreMocks.Reader) modelTrace := &model.Trace{ Spans: []*model.Span{ @@ -62,9 +62,11 @@ func TestTraceReader_GetTraceDelegatesSuccessResponse(t *testing.T) { traceReader := &TraceReader{ spanReader: sr, } - traces, err := iter.CollectWithErrors(traceReader.GetTrace( + traces, err := iter.FlattenWithErrors(traceReader.GetTraces( context.Background(), - pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}), + tracestore.GetTraceParams{ + TraceID: pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}), + }, )) require.NoError(t, err) require.Len(t, traces, 1) @@ -78,19 +80,89 @@ func TestTraceReader_GetTraceDelegatesSuccessResponse(t *testing.T) { require.Equal(t, "operation-b", traceSpans.At(1).Name()) } -func TestTraceReader_GetTraceErrorResponse(t *testing.T) { +func TestTraceReader_GetTracesErrorResponse(t *testing.T) { + testCases := []struct { + name string + firstErr error + expectedErr error + expectedIters int + }{ + { + name: "real error aborts iterator", + firstErr: assert.AnError, + expectedErr: assert.AnError, + expectedIters: 0, // technically 1 but FlattenWithErrors makes it 0 + }, + { + name: "trace not found error skips iteration", + firstErr: spanstore.ErrTraceNotFound, + expectedErr: nil, + expectedIters: 1, + }, + { + name: "no error produces two iterations", + firstErr: nil, + expectedErr: nil, + expectedIters: 2, + }, + } + traceID := func(i byte) tracestore.GetTraceParams { + return tracestore.GetTraceParams{ + TraceID: pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, i}), + } + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + sr := new(spanStoreMocks.Reader) + sr.On("GetTrace", mock.Anything, mock.Anything).Return(&model.Trace{}, test.firstErr).Once() + sr.On("GetTrace", mock.Anything, mock.Anything).Return(&model.Trace{}, nil).Once() + traceReader := &TraceReader{ + spanReader: sr, + } + traces, err := iter.FlattenWithErrors(traceReader.GetTraces( + context.Background(), traceID(1), traceID(2), + )) + require.ErrorIs(t, err, test.expectedErr) + assert.Len(t, traces, test.expectedIters) + }) + } +} + +func TestTraceReader_GetTracesEarlyStop(t *testing.T) { sr := new(spanStoreMocks.Reader) - testErr := errors.New("test error") - sr.On("GetTrace", mock.Anything, mock.Anything).Return(nil, testErr) + sr.On( + "GetTrace", + mock.Anything, + mock.Anything, + ).Return(&model.Trace{}, nil) traceReader := &TraceReader{ spanReader: sr, } - traces, err := iter.CollectWithErrors(traceReader.GetTrace( - context.Background(), - pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}), - )) - require.ErrorIs(t, err, testErr) - require.Empty(t, traces) + traceID := func(i byte) tracestore.GetTraceParams { + return tracestore.GetTraceParams{ + TraceID: pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, i}), + } + } + called := 0 + traceReader.GetTraces( + context.Background(), traceID(1), traceID(2), traceID(3), + )(func(tr []ptrace.Traces, err error) bool { + require.NoError(t, err) + require.Len(t, tr, 1) + called++ + return true + }) + assert.Equal(t, 3, called) + called = 0 + traceReader.GetTraces( + context.Background(), traceID(1), traceID(2), traceID(3), + )(func(tr []ptrace.Traces, err error) bool { + require.NoError(t, err) + require.Len(t, tr, 1) + called++ + return false // early return + }) + assert.Equal(t, 1, called) } func TestTraceReader_GetServicesDelegatesToSpanReader(t *testing.T) { @@ -224,7 +296,7 @@ func TestTraceReader_FindTracesDelegatesSuccessResponse(t *testing.T) { } traces, err := iter.FlattenWithErrors(traceReader.FindTraces( context.Background(), - tracestore.TraceQueryParameters{ + tracestore.TraceQueryParams{ ServiceName: "service", OperationName: "operation", Tags: map[string]string{"tag-a": "val-a"}, @@ -287,7 +359,7 @@ func TestTraceReader_FindTracesEdgeCases(t *testing.T) { } traces, err := iter.FlattenWithErrors(traceReader.FindTraces( context.Background(), - tracestore.TraceQueryParameters{}, + tracestore.TraceQueryParams{}, )) require.ErrorIs(t, err, test.err) require.Equal(t, test.expectedTraces, traces) @@ -307,7 +379,7 @@ func TestTraceReader_FindTracesEarlyStop(t *testing.T) { } called := 0 traceReader.FindTraces( - context.Background(), tracestore.TraceQueryParameters{}, + context.Background(), tracestore.TraceQueryParams{}, )(func(tr []ptrace.Traces, err error) bool { require.NoError(t, err) require.Len(t, tr, 1) @@ -317,7 +389,7 @@ func TestTraceReader_FindTracesEarlyStop(t *testing.T) { assert.Equal(t, 3, called) called = 0 traceReader.FindTraces( - context.Background(), tracestore.TraceQueryParameters{}, + context.Background(), tracestore.TraceQueryParams{}, )(func(tr []ptrace.Traces, err error) bool { require.NoError(t, err) require.Len(t, tr, 1) @@ -385,7 +457,7 @@ func TestTraceReader_FindTraceIDsDelegatesResponse(t *testing.T) { } traceIDs, err := iter.FlattenWithErrors(traceReader.FindTraceIDs( context.Background(), - tracestore.TraceQueryParameters{ + tracestore.TraceQueryParams{ ServiceName: "service", OperationName: "operation", Tags: map[string]string{"tag-a": "val-a"}, diff --git a/storage_v2/tracestore/mocks/Reader.go b/storage_v2/tracestore/mocks/Reader.go index 3f4d0f02a42..5f87e43581b 100644 --- a/storage_v2/tracestore/mocks/Reader.go +++ b/storage_v2/tracestore/mocks/Reader.go @@ -26,7 +26,7 @@ type Reader struct { } // FindTraceIDs provides a mock function with given fields: ctx, query -func (_m *Reader) FindTraceIDs(ctx context.Context, query tracestore.TraceQueryParameters) iter.Seq2[[]pcommon.TraceID, error] { +func (_m *Reader) FindTraceIDs(ctx context.Context, query tracestore.TraceQueryParams) iter.Seq2[[]pcommon.TraceID, error] { ret := _m.Called(ctx, query) if len(ret) == 0 { @@ -34,7 +34,7 @@ func (_m *Reader) FindTraceIDs(ctx context.Context, query tracestore.TraceQueryP } var r0 iter.Seq2[[]pcommon.TraceID, error] - if rf, ok := ret.Get(0).(func(context.Context, tracestore.TraceQueryParameters) iter.Seq2[[]pcommon.TraceID, error]); ok { + if rf, ok := ret.Get(0).(func(context.Context, tracestore.TraceQueryParams) iter.Seq2[[]pcommon.TraceID, error]); ok { r0 = rf(ctx, query) } else { if ret.Get(0) != nil { @@ -46,7 +46,7 @@ func (_m *Reader) FindTraceIDs(ctx context.Context, query tracestore.TraceQueryP } // FindTraces provides a mock function with given fields: ctx, query -func (_m *Reader) FindTraces(ctx context.Context, query tracestore.TraceQueryParameters) iter.Seq2[[]ptrace.Traces, error] { +func (_m *Reader) FindTraces(ctx context.Context, query tracestore.TraceQueryParams) iter.Seq2[[]ptrace.Traces, error] { ret := _m.Called(ctx, query) if len(ret) == 0 { @@ -54,7 +54,7 @@ func (_m *Reader) FindTraces(ctx context.Context, query tracestore.TraceQueryPar } var r0 iter.Seq2[[]ptrace.Traces, error] - if rf, ok := ret.Get(0).(func(context.Context, tracestore.TraceQueryParameters) iter.Seq2[[]ptrace.Traces, error]); ok { + if rf, ok := ret.Get(0).(func(context.Context, tracestore.TraceQueryParams) iter.Seq2[[]ptrace.Traces, error]); ok { r0 = rf(ctx, query) } else { if ret.Get(0) != nil { @@ -125,20 +125,27 @@ func (_m *Reader) GetServices(ctx context.Context) ([]string, error) { return r0, r1 } -// GetTrace provides a mock function with given fields: ctx, traceID -func (_m *Reader) GetTrace(ctx context.Context, traceID pcommon.TraceID) iter.Seq2[ptrace.Traces, error] { - ret := _m.Called(ctx, traceID) +// GetTraces provides a mock function with given fields: ctx, traceIDs +func (_m *Reader) GetTraces(ctx context.Context, traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] { + _va := make([]interface{}, len(traceIDs)) + for _i := range traceIDs { + _va[_i] = traceIDs[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) if len(ret) == 0 { - panic("no return value specified for GetTrace") + panic("no return value specified for GetTraces") } - var r0 iter.Seq2[ptrace.Traces, error] - if rf, ok := ret.Get(0).(func(context.Context, pcommon.TraceID) iter.Seq2[ptrace.Traces, error]); ok { - r0 = rf(ctx, traceID) + var r0 iter.Seq2[[]ptrace.Traces, error] + if rf, ok := ret.Get(0).(func(context.Context, ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error]); ok { + r0 = rf(ctx, traceIDs...) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(iter.Seq2[ptrace.Traces, error]) + r0 = ret.Get(0).(iter.Seq2[[]ptrace.Traces, error]) } } diff --git a/storage_v2/tracestore/reader.go b/storage_v2/tracestore/reader.go index df4187b21c4..5426ff8e8d8 100644 --- a/storage_v2/tracestore/reader.go +++ b/storage_v2/tracestore/reader.go @@ -16,13 +16,18 @@ import ( // Reader finds and loads traces and other data from storage. type Reader interface { - // GetTrace returns an iterator that retrieves all spans of the trace with a given id. + // GetTraces returns an iterator that retrieves all traces with given IDs. // The iterator is single-use: once consumed, it cannot be used again. // - // If the trace is too large it may be returned in multiple chunks. + // Chunking requirements: + // - A single ptrace.Traces chunk MUST NOT contain spans from multiple traces. + // - Large traces MAY be split across multiple, *consecutive* ptrace.Traces chunks. // - // If no spans are stored for this trace, it returns an empty iterator. - GetTrace(ctx context.Context, traceID pcommon.TraceID) iter.Seq2[ptrace.Traces, error] + // Edge cases: + // - If no spans are found for any given trace ID, the ID is ignored. + // - If none of the trace IDs are found in the storage, an empty iterator is returned. + // - If an error is encountered, the iterator returns the error and stops. + GetTraces(ctx context.Context, traceIDs ...GetTraceParams) iter.Seq2[[]ptrace.Traces, error] // GetServices returns all service names known to the backend from spans // within its retention period. @@ -35,32 +40,43 @@ type Reader interface { // FindTraces returns an iterator that retrieves traces matching query parameters. // The iterator is single-use: once consumed, it cannot be used again. // - // There is no guarantee that all spans for a single trace are returned in a single chunk - // (same as GetTrace: if the trace is too large, it may be returned in multiple chunks). - // However, it is guaranteed that all spans for a single trace are returned in - // one or more consecutive chunks, as if the total output is grouped by trace ID. + // The chunking rules is the same as for GetTraces. // // If no matching traces are found, the function returns an empty iterator. + // If an error is encountered, the iterator returns the error and stops. // // There's currently an implementation-dependent ambiguity whether all query filters // (such as multiple tags) must apply to the same span within a trace, or can be satisfied // by different spans. - FindTraces(ctx context.Context, query TraceQueryParameters) iter.Seq2[[]ptrace.Traces, error] + FindTraces(ctx context.Context, query TraceQueryParams) iter.Seq2[[]ptrace.Traces, error] // FindTraceIDs returns an iterator that retrieves IDs of traces matching query parameters. // The iterator is single-use: once consumed, it cannot be used again. // // If no matching traces are found, the function returns an empty iterator. + // If an error is encountered, the iterator returns the error and stops. // // This function behaves identically to FindTraces, except that it returns only the list // of matching trace IDs. This is useful in some contexts, such as batch jobs, where a // large list of trace IDs may be queried first and then the full traces are loaded // in batches. - FindTraceIDs(ctx context.Context, query TraceQueryParameters) iter.Seq2[[]pcommon.TraceID, error] + FindTraceIDs(ctx context.Context, query TraceQueryParams) iter.Seq2[[]pcommon.TraceID, error] } -// TraceQueryParameters contains parameters of a trace query. -type TraceQueryParameters struct { +// GetTraceParams contains single-trace parameters for a GetTraces request. +// Some storage backends (e.g. Tempo) perform GetTraces much more efficiently +// if they know the approximate time range of the trace. +type GetTraceParams struct { + // TraceID is the ID of the trace to retrieve. Required. + TraceID pcommon.TraceID + // Start of the time interval to search for trace ID. Optional. + Start time.Time + // End of the time interval to search for trace ID. Optional. + End time.Time +} + +// TraceQueryParams contains parameters of a trace query. +type TraceQueryParams struct { ServiceName string OperationName string Tags map[string]string @@ -71,7 +87,7 @@ type TraceQueryParameters struct { NumTraces int } -func (t *TraceQueryParameters) ToSpanStoreQueryParameters() *spanstore.TraceQueryParameters { +func (t *TraceQueryParams) ToSpanStoreQueryParameters() *spanstore.TraceQueryParameters { return &spanstore.TraceQueryParameters{ ServiceName: t.ServiceName, OperationName: t.OperationName, diff --git a/storage_v2/tracestore/reader_test.go b/storage_v2/tracestore/reader_test.go index 16f558c9877..f40630d7b46 100644 --- a/storage_v2/tracestore/reader_test.go +++ b/storage_v2/tracestore/reader_test.go @@ -14,7 +14,7 @@ import ( func TestToSpanStoreQueryParameters(t *testing.T) { now := time.Now() - query := &TraceQueryParameters{ + query := &TraceQueryParams{ ServiceName: "service", OperationName: "operation", Tags: map[string]string{"tag-a": "val-a"},