Skip to content

Commit

Permalink
[refactor][query] Propagate RawTraces flag to query service (#6438)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Towards #6417

## Description of the changes
- This PR defines `GetTraceParamaters` and `TraceQueryParameters` inside
`package querysvc` that are currently just wrappers around their
`package spanstore` counterparts.
- This is done so that additional parameters can be passed into the
query service, like the `RawTraces` flag, without having to extend the
parameters that are passed into the storage implementations.

## How was this change tested?
- CI

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Dec 28, 2024
1 parent e7a0205 commit a6616fb
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 136 deletions.
24 changes: 15 additions & 9 deletions cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS
return fmt.Errorf("malform trace ID: %w", err)
}

query := spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: request.GetStartTime(),
EndTime: request.GetEndTime(),
query := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: request.GetStartTime(),
EndTime: request.GetEndTime(),
},
RawTraces: request.GetRawTraces(),
}
trace, err := h.QueryService.GetTrace(stream.Context(), query)
if err != nil {
Expand Down Expand Up @@ -66,11 +69,14 @@ func (h *Handler) internalFindTraces(
return errors.New("start time min and max are required parameters")
}

queryParams := &spanstore.TraceQueryParameters{
ServiceName: query.GetServiceName(),
OperationName: query.GetOperationName(),
Tags: query.GetAttributes(),
NumTraces: int(query.GetSearchDepth()),
queryParams := &querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
ServiceName: query.GetServiceName(),
OperationName: query.GetOperationName(),
Tags: query.GetAttributes(),
NumTraces: int(query.GetSearchDepth()),
},
RawTraces: query.GetRawTraces(),
}
if ts := query.GetStartTimeMin(); !ts.IsZero() {
queryParams.StartTimeMin = ts
Expand Down
54 changes: 37 additions & 17 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ import (
)

const (
paramTraceID = "trace_id" // get trace by ID
paramStartTime = "start_time"
paramEndTime = "end_time"
paramServiceName = "query.service_name" // find traces
paramOperationName = "query.operation_name"
paramTimeMin = "query.start_time_min"
paramTimeMax = "query.start_time_max"
paramNumTraces = "query.num_traces"
paramDurationMin = "query.duration_min"
paramDurationMax = "query.duration_max"
paramTraceID = "trace_id" // get trace by ID
paramStartTime = "start_time"
paramEndTime = "end_time"
paramRawTraces = "raw_traces"
paramServiceName = "query.service_name" // find traces
paramOperationName = "query.operation_name"
paramTimeMin = "query.start_time_min"
paramTimeMax = "query.start_time_max"
paramNumTraces = "query.num_traces"
paramDurationMin = "query.duration_min"
paramDurationMax = "query.duration_max"
paramQueryRawTraces = "query.raw_traces"

routeGetTrace = "/api/v3/traces/{" + paramTraceID + "}"
routeFindTraces = "/api/v3/traces"
Expand Down Expand Up @@ -135,8 +137,10 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
if h.tryParamError(w, err, paramTraceID) {
return
}
request := spanstore.GetTraceParameters{
TraceID: traceID,
request := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
},
}
http_query := r.URL.Query()
startTime := http_query.Get(paramStartTime)
Expand All @@ -155,6 +159,13 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
}
request.EndTime = timeParsed.UTC()
}
if r := http_query.Get(paramRawTraces); r != "" {
rawTraces, err := strconv.ParseBool(r)
if h.tryParamError(w, err, paramRawTraces) {
return
}
request.RawTraces = rawTraces
}
trc, err := h.QueryService.GetTrace(r.Context(), request)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
Expand All @@ -180,11 +191,13 @@ func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
h.returnSpans(spans, w)
}

func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*spanstore.TraceQueryParameters, bool) {
queryParams := &spanstore.TraceQueryParameters{
ServiceName: q.Get(paramServiceName),
OperationName: q.Get(paramOperationName),
Tags: nil, // most curiously not supported by grpc-gateway
func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*querysvc.TraceQueryParameters, bool) {
queryParams := &querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
ServiceName: q.Get(paramServiceName),
OperationName: q.Get(paramOperationName),
Tags: nil, // most curiously not supported by grpc-gateway
},
}

timeMin := q.Get(paramTimeMin)
Expand Down Expand Up @@ -227,6 +240,13 @@ func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter)
}
queryParams.DurationMax = dur
}
if r := q.Get(paramQueryRawTraces); r != "" {
rawTraces, err := strconv.ParseBool(r)
if h.tryParamError(w, err, paramQueryRawTraces) {
return nil, true
}
queryParams.RawTraces = rawTraces
}
return queryParams, false
}

Expand Down
16 changes: 16 additions & 0 deletions cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func TestHTTPGatewayGetTraceMalformedInputErrors(t *testing.T) {
requestUrl: "/api/v3/traces/123?end_time=xyz",
expectedError: "malformed parameter end_time",
},
{
name: "TestGetTraceWithInvalidRawTraces",
requestUrl: "/api/v3/traces/123?raw_traces=foobar",
expectedError: "malformed parameter raw_traces",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -227,6 +232,7 @@ func mockFindQueries() (url.Values, *spanstore.TraceQueryParameters) {
func TestHTTPGatewayFindTracesErrors(t *testing.T) {
goodTimeV := time.Now()
goodTime := goodTimeV.Format(time.RFC3339Nano)
goodDuration := "1s"
timeRangeErr := fmt.Sprintf("%s and %s are required", paramTimeMin, paramTimeMax)
testCases := []struct {
name string
Expand Down Expand Up @@ -272,6 +278,16 @@ func TestHTTPGatewayFindTracesErrors(t *testing.T) {
params: map[string]string{paramTimeMin: goodTime, paramTimeMax: goodTime, paramDurationMax: "NaN"},
expErr: paramDurationMax,
},
{
name: "bad raw traces",
params: map[string]string{
paramTimeMin: goodTime,
paramTimeMax: goodTime,
paramDurationMax: goodDuration,
paramQueryRawTraces: "foobar",
},
expErr: paramQueryRawTraces,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
33 changes: 20 additions & 13 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ func (g *GRPCHandler) GetTrace(r *api_v2.GetTraceRequest, stream api_v2.QuerySer
if r.TraceID == (model.TraceID{}) {
return errUninitializedTraceID
}
query := spanstore.GetTraceParameters{
TraceID: r.TraceID,
StartTime: r.StartTime,
EndTime: r.EndTime,
query := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: r.TraceID,
StartTime: r.StartTime,
EndTime: r.EndTime,
},
RawTraces: r.RawTraces,
}
trace, err := g.queryService.GetTrace(stream.Context(), query)
if errors.Is(err, spanstore.ErrTraceNotFound) {
Expand All @@ -119,6 +122,7 @@ func (g *GRPCHandler) ArchiveTrace(ctx context.Context, r *api_v2.ArchiveTraceRe
StartTime: r.StartTime,
EndTime: r.EndTime,
}

err := g.queryService.ArchiveTrace(ctx, query)
if errors.Is(err, spanstore.ErrTraceNotFound) {
g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err))
Expand All @@ -141,15 +145,18 @@ func (g *GRPCHandler) FindTraces(r *api_v2.FindTracesRequest, stream api_v2.Quer
if query == nil {
return status.Errorf(codes.InvalidArgument, "missing query")
}
queryParams := spanstore.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: int(query.SearchDepth),
queryParams := querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: int(query.SearchDepth),
},
RawTraces: query.RawTraces,
}
traces, err := g.queryService.FindTraces(stream.Context(), &queryParams)
if err != nil {
Expand Down
43 changes: 30 additions & 13 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,7 @@ func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) {
if len(tQuery.traceIDs) > 0 {
tracesFromStorage, uiErrors, err = aH.tracesByIDs(
r.Context(),
tQuery.traceIDs,
tQuery.StartTimeMin,
tQuery.StartTimeMax,
tQuery,
)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
Expand Down Expand Up @@ -264,14 +262,17 @@ func (aH *APIHandler) tracesToResponse(traces []*model.Trace, adjust bool, uiErr
}
}

func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID, startTime time.Time, endTime time.Time) ([]*model.Trace, []structuredError, error) {
func (aH *APIHandler) tracesByIDs(ctx context.Context, traceQuery *traceQueryParameters) ([]*model.Trace, []structuredError, error) {
var traceErrors []structuredError
retMe := make([]*model.Trace, 0, len(traceIDs))
for _, traceID := range traceIDs {
query := spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: startTime,
EndTime: endTime,
retMe := make([]*model.Trace, 0, len(traceQuery.traceIDs))
for _, traceID := range traceQuery.traceIDs {
query := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: traceQuery.StartTimeMin,
EndTime: traceQuery.StartTimeMax,
},
RawTraces: traceQuery.RawTraces,
}
if trc, err := aH.queryService.GetTrace(ctx, query); err != nil {
if !errors.Is(err, spanstore.ErrTraceNotFound) {
Expand Down Expand Up @@ -428,8 +429,19 @@ func (aH *APIHandler) parseMicroseconds(w http.ResponseWriter, r *http.Request,
return time.Time{}, true
}

func (aH *APIHandler) parseGetTraceParameters(w http.ResponseWriter, r *http.Request) (spanstore.GetTraceParameters, bool) {
query := spanstore.GetTraceParameters{}
func (aH *APIHandler) parseBool(w http.ResponseWriter, r *http.Request, boolKey string) (value bool, isValid bool) {
if boolString := r.FormValue(boolKey); boolString != "" {
b, err := parseBool(r, boolKey)
if aH.handleError(w, err, http.StatusBadRequest) {
return false, false
}
return b, true
}
return false, true
}

func (aH *APIHandler) parseGetTraceParameters(w http.ResponseWriter, r *http.Request) (querysvc.GetTraceParameters, bool) {
query := querysvc.GetTraceParameters{}
traceID, ok := aH.parseTraceID(w, r)
if !ok {
return query, false
Expand All @@ -442,9 +454,14 @@ func (aH *APIHandler) parseGetTraceParameters(w http.ResponseWriter, r *http.Req
if !ok {
return query, false
}
raw, ok := aH.parseBool(w, r, rawParam)
if !ok {
return query, false
}
query.TraceID = traceID
query.StartTime = startTime
query.EndTime = endTime
query.RawTraces = raw
return query, true
}

Expand Down Expand Up @@ -485,7 +502,7 @@ func (aH *APIHandler) archiveTrace(w http.ResponseWriter, r *http.Request) {
}

// QueryService.ArchiveTrace can now archive this traceID.
err := aH.queryService.ArchiveTrace(r.Context(), query)
err := aH.queryService.ArchiveTrace(r.Context(), query.GetTraceParameters)
if errors.Is(err, spanstore.ErrTraceNotFound) {
aH.handleError(w, err, http.StatusNotFound)
return
Expand Down
37 changes: 37 additions & 0 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,43 @@ func TestGetTraceBadTimeWindow(t *testing.T) {
}
}

func TestGetTraceWithRawTracesParameter(t *testing.T) {
// TODO: extend the test cases to ensure raw traces are obtained
// when the flag is set once the differentiating logic has been implemented
tests := []struct {
rawTraces bool
}{
{
rawTraces: true,
},
{
rawTraces: false,
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) {
ts := initializeTestServer(t)
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), spanstore.GetTraceParameters{
TraceID: mockTraceID,
}).Return(mockTrace, nil).Once()

var response structuredResponse
err := getJSON(fmt.Sprintf("%s/api/traces/%s?raw=%v", ts.server.URL, mockTraceID.String(), test.rawTraces), &response)
require.NoError(t, err)
assert.Empty(t, response.Errors)
})
}
}

func TestGetTraceBadRawTracesFlag(t *testing.T) {
ts := initializeTestServer(t)
var response structuredResponse
err := getJSON(ts.server.URL+`/api/traces/123456?raw=foobar`, &response)
require.Error(t, err)
require.ErrorContains(t, err, "400 error from server")
require.ErrorContains(t, err, "unable to parse param 'raw'")
}

func TestSearchSuccess(t *testing.T) {
ts := initializeTestServer(t)
ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")).
Expand Down
Loading

0 comments on commit a6616fb

Please sign in to comment.