Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change storage_v2 GetTrace to GetTraces plural #6361

Merged
merged 8 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions storage_v2/factoryadapter/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,39 @@ func NewTraceReader(spanReader spanstore.Reader) *TraceReader {
}
}

func (tr *TraceReader) GetTrace(ctx context.Context, traceID pcommon.TraceID) iter.Seq2[ptrace.Traces, error] {
return func(yield func(ptrace.Traces, error) bool) {
t, err := tr.spanReader.GetTrace(ctx, model.TraceIDFromOTEL(traceID))
if err != nil {
// if not found, return an empty iterator, otherwire yield the error.
if !errors.Is(err, spanstore.ErrTraceNotFound) {
yield(ptrace.NewTraces(), err)
func (tr *TraceReader) GetTraces(
ctx context.Context,
traceIDs ...tracestore.GetTraceParams,
) iter.Seq2[[]ptrace.Traces, error] {
return func(yield func([]ptrace.Traces, error) bool) {
for _, idParams := range traceIDs {
// TODO start/end times are not supported by v1 reader
// https://github.com/jaegertracing/jaeger/pull/6242
t, err := tr.spanReader.GetTrace(ctx, model.TraceIDFromOTEL(idParams.TraceID))
if err != nil {
if errors.Is(err, spanstore.ErrTraceNotFound) {
continue
}
yield(nil, err)
return
}
batch := &model.Batch{Spans: t.GetSpans()}
tr, err := model2otel.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{tr}, err) || err != nil {
return
}
return
}
batch := &model.Batch{Spans: t.GetSpans()}
tr, err := model2otel.ProtoToTraces([]*model.Batch{batch})
yield(tr, err)
}
}

func (tr *TraceReader) GetServices(ctx context.Context) ([]string, error) {
return tr.spanReader.GetServices(ctx)
}

func (tr *TraceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) {
func (tr *TraceReader) GetOperations(
ctx context.Context,
query tracestore.OperationQueryParameters,
) ([]tracestore.Operation, error) {
o, err := tr.spanReader.GetOperations(ctx, spanstore.OperationQueryParameters{
ServiceName: query.ServiceName,
SpanKind: query.SpanKind,
Expand All @@ -80,7 +92,7 @@ func (tr *TraceReader) GetOperations(ctx context.Context, query tracestore.Opera

func (tr *TraceReader) FindTraces(
ctx context.Context,
query tracestore.TraceQueryParameters,
query tracestore.TraceQueryParams,
) iter.Seq2[[]ptrace.Traces, error] {
return func(yield func([]ptrace.Traces, error) bool) {
traces, err := tr.spanReader.FindTraces(ctx, query.ToSpanStoreQueryParameters())
Expand All @@ -100,7 +112,7 @@ func (tr *TraceReader) FindTraces(

func (tr *TraceReader) FindTraceIDs(
ctx context.Context,
query tracestore.TraceQueryParameters,
query tracestore.TraceQueryParams,
) iter.Seq2[[]pcommon.TraceID, error] {
return func(yield func([]pcommon.TraceID, error) bool) {
traceIDs, err := tr.spanReader.FindTraceIDs(ctx, query.ToSpanStoreQueryParameters())
Expand Down
106 changes: 89 additions & 17 deletions storage_v2/factoryadapter/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestGetV1Reader_Error(t *testing.T) {
require.ErrorIs(t, err, errV1ReaderNotAvailable)
}

func TestTraceReader_GetTraceDelegatesSuccessResponse(t *testing.T) {
func TestTraceReader_GetTracesDelegatesSuccessResponse(t *testing.T) {
sr := new(spanStoreMocks.Reader)
modelTrace := &model.Trace{
Spans: []*model.Span{
Expand All @@ -62,9 +62,11 @@ func TestTraceReader_GetTraceDelegatesSuccessResponse(t *testing.T) {
traceReader := &TraceReader{
spanReader: sr,
}
traces, err := iter.CollectWithErrors(traceReader.GetTrace(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is CollectWithErrors still being used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think so

Copy link
Collaborator

@mahadzaryab1 mahadzaryab1 Dec 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove it then? or do we think we'll have a use case for it in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not part of this PR, so I'd leave it. That whole package will go away in a couple of months.

traces, err := iter.FlattenWithErrors(traceReader.GetTraces(
context.Background(),
pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}),
tracestore.GetTraceParams{
TraceID: pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}),
},
))
require.NoError(t, err)
require.Len(t, traces, 1)
Expand All @@ -78,19 +80,89 @@ func TestTraceReader_GetTraceDelegatesSuccessResponse(t *testing.T) {
require.Equal(t, "operation-b", traceSpans.At(1).Name())
}

func TestTraceReader_GetTraceErrorResponse(t *testing.T) {
func TestTraceReader_GetTracesErrorResponse(t *testing.T) {
testCases := []struct {
name string
firstErr error
expectedErr error
expectedIters int
}{
{
name: "real error aborts iterator",
firstErr: assert.AnError,
expectedErr: assert.AnError,
expectedIters: 0, // technically 1 but FlattenWithErrors makes it 0
},
{
name: "trace not found error skips iteration",
firstErr: spanstore.ErrTraceNotFound,
expectedErr: nil,
expectedIters: 1,
},
{
name: "no error produces two iterations",
firstErr: nil,
expectedErr: nil,
expectedIters: 2,
},
}
traceID := func(i byte) tracestore.GetTraceParams {
return tracestore.GetTraceParams{
TraceID: pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, i}),
}
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
sr := new(spanStoreMocks.Reader)
sr.On("GetTrace", mock.Anything, mock.Anything).Return(&model.Trace{}, test.firstErr).Once()
sr.On("GetTrace", mock.Anything, mock.Anything).Return(&model.Trace{}, nil).Once()
traceReader := &TraceReader{
spanReader: sr,
}
traces, err := iter.FlattenWithErrors(traceReader.GetTraces(
context.Background(), traceID(1), traceID(2),
))
require.ErrorIs(t, err, test.expectedErr)
assert.Len(t, traces, test.expectedIters)
})
}
}

func TestTraceReader_GetTracesEarlyStop(t *testing.T) {
sr := new(spanStoreMocks.Reader)
testErr := errors.New("test error")
sr.On("GetTrace", mock.Anything, mock.Anything).Return(nil, testErr)
sr.On(
"GetTrace",
mock.Anything,
mock.Anything,
).Return(&model.Trace{}, nil)
traceReader := &TraceReader{
spanReader: sr,
}
traces, err := iter.CollectWithErrors(traceReader.GetTrace(
context.Background(),
pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}),
))
require.ErrorIs(t, err, testErr)
require.Empty(t, traces)
traceID := func(i byte) tracestore.GetTraceParams {
return tracestore.GetTraceParams{
TraceID: pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, i}),
}
}
called := 0
traceReader.GetTraces(
context.Background(), traceID(1), traceID(2), traceID(3),
)(func(tr []ptrace.Traces, err error) bool {
require.NoError(t, err)
require.Len(t, tr, 1)
called++
return true
})
assert.Equal(t, 3, called)
called = 0
traceReader.GetTraces(
context.Background(), traceID(1), traceID(2), traceID(3),
)(func(tr []ptrace.Traces, err error) bool {
require.NoError(t, err)
require.Len(t, tr, 1)
called++
return false // early return
})
assert.Equal(t, 1, called)
}

func TestTraceReader_GetServicesDelegatesToSpanReader(t *testing.T) {
Expand Down Expand Up @@ -224,7 +296,7 @@ func TestTraceReader_FindTracesDelegatesSuccessResponse(t *testing.T) {
}
traces, err := iter.FlattenWithErrors(traceReader.FindTraces(
context.Background(),
tracestore.TraceQueryParameters{
tracestore.TraceQueryParams{
ServiceName: "service",
OperationName: "operation",
Tags: map[string]string{"tag-a": "val-a"},
Expand Down Expand Up @@ -287,7 +359,7 @@ func TestTraceReader_FindTracesEdgeCases(t *testing.T) {
}
traces, err := iter.FlattenWithErrors(traceReader.FindTraces(
context.Background(),
tracestore.TraceQueryParameters{},
tracestore.TraceQueryParams{},
))
require.ErrorIs(t, err, test.err)
require.Equal(t, test.expectedTraces, traces)
Expand All @@ -307,7 +379,7 @@ func TestTraceReader_FindTracesEarlyStop(t *testing.T) {
}
called := 0
traceReader.FindTraces(
context.Background(), tracestore.TraceQueryParameters{},
context.Background(), tracestore.TraceQueryParams{},
)(func(tr []ptrace.Traces, err error) bool {
require.NoError(t, err)
require.Len(t, tr, 1)
Expand All @@ -317,7 +389,7 @@ func TestTraceReader_FindTracesEarlyStop(t *testing.T) {
assert.Equal(t, 3, called)
called = 0
traceReader.FindTraces(
context.Background(), tracestore.TraceQueryParameters{},
context.Background(), tracestore.TraceQueryParams{},
)(func(tr []ptrace.Traces, err error) bool {
require.NoError(t, err)
require.Len(t, tr, 1)
Expand Down Expand Up @@ -385,7 +457,7 @@ func TestTraceReader_FindTraceIDsDelegatesResponse(t *testing.T) {
}
traceIDs, err := iter.FlattenWithErrors(traceReader.FindTraceIDs(
context.Background(),
tracestore.TraceQueryParameters{
tracestore.TraceQueryParams{
ServiceName: "service",
OperationName: "operation",
Tags: map[string]string{"tag-a": "val-a"},
Expand Down
31 changes: 19 additions & 12 deletions storage_v2/tracestore/mocks/Reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 29 additions & 13 deletions storage_v2/tracestore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ import (

// Reader finds and loads traces and other data from storage.
type Reader interface {
// GetTrace returns an iterator that retrieves all spans of the trace with a given id.
// GetTraces returns an iterator that retrieves all traces with given IDs.
// The iterator is single-use: once consumed, it cannot be used again.
//
// If the trace is too large it may be returned in multiple chunks.
// Chunking requirements:
// - A single ptrace.Traces chunk MUST NOT contain spans from multiple traces.
// - Large traces MAY be split across multiple, *consecutive* ptrace.Traces chunks.
//
// If no spans are stored for this trace, it returns an empty iterator.
GetTrace(ctx context.Context, traceID pcommon.TraceID) iter.Seq2[ptrace.Traces, error]
// Edge cases:
// - If no spans are found for any given trace ID, the ID is ignored.
// - If none of the trace IDs are found in the storage, an empty iterator is returned.
// - If an error is encountered, the iterator returns the error and stops.
GetTraces(ctx context.Context, traceIDs ...GetTraceParams) iter.Seq2[[]ptrace.Traces, error]

// GetServices returns all service names known to the backend from spans
// within its retention period.
Expand All @@ -35,32 +40,43 @@ type Reader interface {
// FindTraces returns an iterator that retrieves traces matching query parameters.
// The iterator is single-use: once consumed, it cannot be used again.
//
// There is no guarantee that all spans for a single trace are returned in a single chunk
// (same as GetTrace: if the trace is too large, it may be returned in multiple chunks).
// However, it is guaranteed that all spans for a single trace are returned in
// one or more consecutive chunks, as if the total output is grouped by trace ID.
// The chunking rules is the same as for GetTraces.
//
// If no matching traces are found, the function returns an empty iterator.
// If an error is encountered, the iterator returns the error and stops.
//
// There's currently an implementation-dependent ambiguity whether all query filters
// (such as multiple tags) must apply to the same span within a trace, or can be satisfied
// by different spans.
FindTraces(ctx context.Context, query TraceQueryParameters) iter.Seq2[[]ptrace.Traces, error]
FindTraces(ctx context.Context, query TraceQueryParams) iter.Seq2[[]ptrace.Traces, error]

// FindTraceIDs returns an iterator that retrieves IDs of traces matching query parameters.
// The iterator is single-use: once consumed, it cannot be used again.
//
// If no matching traces are found, the function returns an empty iterator.
// If an error is encountered, the iterator returns the error and stops.
//
// This function behaves identically to FindTraces, except that it returns only the list
// of matching trace IDs. This is useful in some contexts, such as batch jobs, where a
// large list of trace IDs may be queried first and then the full traces are loaded
// in batches.
FindTraceIDs(ctx context.Context, query TraceQueryParameters) iter.Seq2[[]pcommon.TraceID, error]
FindTraceIDs(ctx context.Context, query TraceQueryParams) iter.Seq2[[]pcommon.TraceID, error]
}

// TraceQueryParameters contains parameters of a trace query.
type TraceQueryParameters struct {
// GetTraceParams contains single-trace parameters for a GetTraces request.
// Some storage backends (e.g. Tempo) perform GetTraces much more efficiently
// if they know the approximate time range of the trace.
type GetTraceParams struct {
// TraceID is the ID of the trace to retrieve. Required.
TraceID pcommon.TraceID
// Start of the time interval to search for trace ID. Optional.
Start time.Time
// End of the time interval to search for trace ID. Optional.
End time.Time
}

// TraceQueryParams contains parameters of a trace query.
type TraceQueryParams struct {
ServiceName string
OperationName string
Tags map[string]string
Expand All @@ -71,7 +87,7 @@ type TraceQueryParameters struct {
NumTraces int
}

func (t *TraceQueryParameters) ToSpanStoreQueryParameters() *spanstore.TraceQueryParameters {
func (t *TraceQueryParams) ToSpanStoreQueryParameters() *spanstore.TraceQueryParameters {
return &spanstore.TraceQueryParameters{
ServiceName: t.ServiceName,
OperationName: t.OperationName,
Expand Down
Loading
Loading