Skip to content

Commit

Permalink
Merge branch 'main' into adjuster-interface
Browse files Browse the repository at this point in the history
  • Loading branch information
mahadzaryab1 authored Dec 15, 2024
2 parents 84c5332 + d3d70e0 commit 517a969
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 57 deletions.
40 changes: 26 additions & 14 deletions storage_v2/factoryadapter/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,39 @@ func NewTraceReader(spanReader spanstore.Reader) *TraceReader {
}
}

func (tr *TraceReader) GetTrace(ctx context.Context, traceID pcommon.TraceID) iter.Seq2[ptrace.Traces, error] {
return func(yield func(ptrace.Traces, error) bool) {
t, err := tr.spanReader.GetTrace(ctx, model.TraceIDFromOTEL(traceID))
if err != nil {
// if not found, return an empty iterator, otherwire yield the error.
if !errors.Is(err, spanstore.ErrTraceNotFound) {
yield(ptrace.NewTraces(), err)
func (tr *TraceReader) GetTraces(
ctx context.Context,
traceIDs ...tracestore.GetTraceParams,
) iter.Seq2[[]ptrace.Traces, error] {
return func(yield func([]ptrace.Traces, error) bool) {
for _, idParams := range traceIDs {
// TODO start/end times are not supported by v1 reader
// https://github.com/jaegertracing/jaeger/pull/6242
t, err := tr.spanReader.GetTrace(ctx, model.TraceIDFromOTEL(idParams.TraceID))
if err != nil {
if errors.Is(err, spanstore.ErrTraceNotFound) {
continue
}
yield(nil, err)
return
}
batch := &model.Batch{Spans: t.GetSpans()}
tr, err := model2otel.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{tr}, err) || err != nil {
return
}
return
}
batch := &model.Batch{Spans: t.GetSpans()}
tr, err := model2otel.ProtoToTraces([]*model.Batch{batch})
yield(tr, err)
}
}

func (tr *TraceReader) GetServices(ctx context.Context) ([]string, error) {
return tr.spanReader.GetServices(ctx)
}

func (tr *TraceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) {
func (tr *TraceReader) GetOperations(
ctx context.Context,
query tracestore.OperationQueryParameters,
) ([]tracestore.Operation, error) {
o, err := tr.spanReader.GetOperations(ctx, spanstore.OperationQueryParameters{
ServiceName: query.ServiceName,
SpanKind: query.SpanKind,
Expand All @@ -80,7 +92,7 @@ func (tr *TraceReader) GetOperations(ctx context.Context, query tracestore.Opera

func (tr *TraceReader) FindTraces(
ctx context.Context,
query tracestore.TraceQueryParameters,
query tracestore.TraceQueryParams,
) iter.Seq2[[]ptrace.Traces, error] {
return func(yield func([]ptrace.Traces, error) bool) {
traces, err := tr.spanReader.FindTraces(ctx, query.ToSpanStoreQueryParameters())
Expand All @@ -100,7 +112,7 @@ func (tr *TraceReader) FindTraces(

func (tr *TraceReader) FindTraceIDs(
ctx context.Context,
query tracestore.TraceQueryParameters,
query tracestore.TraceQueryParams,
) iter.Seq2[[]pcommon.TraceID, error] {
return func(yield func([]pcommon.TraceID, error) bool) {
traceIDs, err := tr.spanReader.FindTraceIDs(ctx, query.ToSpanStoreQueryParameters())
Expand Down
106 changes: 89 additions & 17 deletions storage_v2/factoryadapter/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestGetV1Reader_Error(t *testing.T) {
require.ErrorIs(t, err, errV1ReaderNotAvailable)
}

func TestTraceReader_GetTraceDelegatesSuccessResponse(t *testing.T) {
func TestTraceReader_GetTracesDelegatesSuccessResponse(t *testing.T) {
sr := new(spanStoreMocks.Reader)
modelTrace := &model.Trace{
Spans: []*model.Span{
Expand All @@ -62,9 +62,11 @@ func TestTraceReader_GetTraceDelegatesSuccessResponse(t *testing.T) {
traceReader := &TraceReader{
spanReader: sr,
}
traces, err := iter.CollectWithErrors(traceReader.GetTrace(
traces, err := iter.FlattenWithErrors(traceReader.GetTraces(
context.Background(),
pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}),
tracestore.GetTraceParams{
TraceID: pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}),
},
))
require.NoError(t, err)
require.Len(t, traces, 1)
Expand All @@ -78,19 +80,89 @@ func TestTraceReader_GetTraceDelegatesSuccessResponse(t *testing.T) {
require.Equal(t, "operation-b", traceSpans.At(1).Name())
}

func TestTraceReader_GetTraceErrorResponse(t *testing.T) {
func TestTraceReader_GetTracesErrorResponse(t *testing.T) {
testCases := []struct {
name string
firstErr error
expectedErr error
expectedIters int
}{
{
name: "real error aborts iterator",
firstErr: assert.AnError,
expectedErr: assert.AnError,
expectedIters: 0, // technically 1 but FlattenWithErrors makes it 0
},
{
name: "trace not found error skips iteration",
firstErr: spanstore.ErrTraceNotFound,
expectedErr: nil,
expectedIters: 1,
},
{
name: "no error produces two iterations",
firstErr: nil,
expectedErr: nil,
expectedIters: 2,
},
}
traceID := func(i byte) tracestore.GetTraceParams {
return tracestore.GetTraceParams{
TraceID: pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, i}),
}
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
sr := new(spanStoreMocks.Reader)
sr.On("GetTrace", mock.Anything, mock.Anything).Return(&model.Trace{}, test.firstErr).Once()
sr.On("GetTrace", mock.Anything, mock.Anything).Return(&model.Trace{}, nil).Once()
traceReader := &TraceReader{
spanReader: sr,
}
traces, err := iter.FlattenWithErrors(traceReader.GetTraces(
context.Background(), traceID(1), traceID(2),
))
require.ErrorIs(t, err, test.expectedErr)
assert.Len(t, traces, test.expectedIters)
})
}
}

func TestTraceReader_GetTracesEarlyStop(t *testing.T) {
sr := new(spanStoreMocks.Reader)
testErr := errors.New("test error")
sr.On("GetTrace", mock.Anything, mock.Anything).Return(nil, testErr)
sr.On(
"GetTrace",
mock.Anything,
mock.Anything,
).Return(&model.Trace{}, nil)
traceReader := &TraceReader{
spanReader: sr,
}
traces, err := iter.CollectWithErrors(traceReader.GetTrace(
context.Background(),
pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}),
))
require.ErrorIs(t, err, testErr)
require.Empty(t, traces)
traceID := func(i byte) tracestore.GetTraceParams {
return tracestore.GetTraceParams{
TraceID: pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, i}),
}
}
called := 0
traceReader.GetTraces(
context.Background(), traceID(1), traceID(2), traceID(3),
)(func(tr []ptrace.Traces, err error) bool {
require.NoError(t, err)
require.Len(t, tr, 1)
called++
return true
})
assert.Equal(t, 3, called)
called = 0
traceReader.GetTraces(
context.Background(), traceID(1), traceID(2), traceID(3),
)(func(tr []ptrace.Traces, err error) bool {
require.NoError(t, err)
require.Len(t, tr, 1)
called++
return false // early return
})
assert.Equal(t, 1, called)
}

func TestTraceReader_GetServicesDelegatesToSpanReader(t *testing.T) {
Expand Down Expand Up @@ -224,7 +296,7 @@ func TestTraceReader_FindTracesDelegatesSuccessResponse(t *testing.T) {
}
traces, err := iter.FlattenWithErrors(traceReader.FindTraces(
context.Background(),
tracestore.TraceQueryParameters{
tracestore.TraceQueryParams{
ServiceName: "service",
OperationName: "operation",
Tags: map[string]string{"tag-a": "val-a"},
Expand Down Expand Up @@ -287,7 +359,7 @@ func TestTraceReader_FindTracesEdgeCases(t *testing.T) {
}
traces, err := iter.FlattenWithErrors(traceReader.FindTraces(
context.Background(),
tracestore.TraceQueryParameters{},
tracestore.TraceQueryParams{},
))
require.ErrorIs(t, err, test.err)
require.Equal(t, test.expectedTraces, traces)
Expand All @@ -307,7 +379,7 @@ func TestTraceReader_FindTracesEarlyStop(t *testing.T) {
}
called := 0
traceReader.FindTraces(
context.Background(), tracestore.TraceQueryParameters{},
context.Background(), tracestore.TraceQueryParams{},
)(func(tr []ptrace.Traces, err error) bool {
require.NoError(t, err)
require.Len(t, tr, 1)
Expand All @@ -317,7 +389,7 @@ func TestTraceReader_FindTracesEarlyStop(t *testing.T) {
assert.Equal(t, 3, called)
called = 0
traceReader.FindTraces(
context.Background(), tracestore.TraceQueryParameters{},
context.Background(), tracestore.TraceQueryParams{},
)(func(tr []ptrace.Traces, err error) bool {
require.NoError(t, err)
require.Len(t, tr, 1)
Expand Down Expand Up @@ -385,7 +457,7 @@ func TestTraceReader_FindTraceIDsDelegatesResponse(t *testing.T) {
}
traceIDs, err := iter.FlattenWithErrors(traceReader.FindTraceIDs(
context.Background(),
tracestore.TraceQueryParameters{
tracestore.TraceQueryParams{
ServiceName: "service",
OperationName: "operation",
Tags: map[string]string{"tag-a": "val-a"},
Expand Down
31 changes: 19 additions & 12 deletions storage_v2/tracestore/mocks/Reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 29 additions & 13 deletions storage_v2/tracestore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ import (

// Reader finds and loads traces and other data from storage.
type Reader interface {
// GetTrace returns an iterator that retrieves all spans of the trace with a given id.
// GetTraces returns an iterator that retrieves all traces with given IDs.
// The iterator is single-use: once consumed, it cannot be used again.
//
// If the trace is too large it may be returned in multiple chunks.
// Chunking requirements:
// - A single ptrace.Traces chunk MUST NOT contain spans from multiple traces.
// - Large traces MAY be split across multiple, *consecutive* ptrace.Traces chunks.
//
// If no spans are stored for this trace, it returns an empty iterator.
GetTrace(ctx context.Context, traceID pcommon.TraceID) iter.Seq2[ptrace.Traces, error]
// Edge cases:
// - If no spans are found for any given trace ID, the ID is ignored.
// - If none of the trace IDs are found in the storage, an empty iterator is returned.
// - If an error is encountered, the iterator returns the error and stops.
GetTraces(ctx context.Context, traceIDs ...GetTraceParams) iter.Seq2[[]ptrace.Traces, error]

// GetServices returns all service names known to the backend from spans
// within its retention period.
Expand All @@ -35,32 +40,43 @@ type Reader interface {
// FindTraces returns an iterator that retrieves traces matching query parameters.
// The iterator is single-use: once consumed, it cannot be used again.
//
// There is no guarantee that all spans for a single trace are returned in a single chunk
// (same as GetTrace: if the trace is too large, it may be returned in multiple chunks).
// However, it is guaranteed that all spans for a single trace are returned in
// one or more consecutive chunks, as if the total output is grouped by trace ID.
// The chunking rules is the same as for GetTraces.
//
// If no matching traces are found, the function returns an empty iterator.
// If an error is encountered, the iterator returns the error and stops.
//
// There's currently an implementation-dependent ambiguity whether all query filters
// (such as multiple tags) must apply to the same span within a trace, or can be satisfied
// by different spans.
FindTraces(ctx context.Context, query TraceQueryParameters) iter.Seq2[[]ptrace.Traces, error]
FindTraces(ctx context.Context, query TraceQueryParams) iter.Seq2[[]ptrace.Traces, error]

// FindTraceIDs returns an iterator that retrieves IDs of traces matching query parameters.
// The iterator is single-use: once consumed, it cannot be used again.
//
// If no matching traces are found, the function returns an empty iterator.
// If an error is encountered, the iterator returns the error and stops.
//
// This function behaves identically to FindTraces, except that it returns only the list
// of matching trace IDs. This is useful in some contexts, such as batch jobs, where a
// large list of trace IDs may be queried first and then the full traces are loaded
// in batches.
FindTraceIDs(ctx context.Context, query TraceQueryParameters) iter.Seq2[[]pcommon.TraceID, error]
FindTraceIDs(ctx context.Context, query TraceQueryParams) iter.Seq2[[]pcommon.TraceID, error]
}

// TraceQueryParameters contains parameters of a trace query.
type TraceQueryParameters struct {
// GetTraceParams contains single-trace parameters for a GetTraces request.
// Some storage backends (e.g. Tempo) perform GetTraces much more efficiently
// if they know the approximate time range of the trace.
type GetTraceParams struct {
// TraceID is the ID of the trace to retrieve. Required.
TraceID pcommon.TraceID
// Start of the time interval to search for trace ID. Optional.
Start time.Time
// End of the time interval to search for trace ID. Optional.
End time.Time
}

// TraceQueryParams contains parameters of a trace query.
type TraceQueryParams struct {
ServiceName string
OperationName string
Tags map[string]string
Expand All @@ -71,7 +87,7 @@ type TraceQueryParameters struct {
NumTraces int
}

func (t *TraceQueryParameters) ToSpanStoreQueryParameters() *spanstore.TraceQueryParameters {
func (t *TraceQueryParams) ToSpanStoreQueryParameters() *spanstore.TraceQueryParameters {
return &spanstore.TraceQueryParameters{
ServiceName: t.ServiceName,
OperationName: t.OperationName,
Expand Down
Loading

0 comments on commit 517a969

Please sign in to comment.