Skip to content

Commit

Permalink
Respond correctly to stream send error (jaegertracing#6357)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Noticed by accident that apiv3 handler was ignoring stream send error

## Description of the changes
- Handle error propertly
- Add tests
- Remove impossible error handling from pdata converter

## How was this change tested?
- CI

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Dec 14, 2024
1 parent cd99501 commit 9eb0170
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 33 deletions.
26 changes: 16 additions & 10 deletions cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,22 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS
if err != nil {
return fmt.Errorf("cannot retrieve trace: %w", err)
}
td, err := modelToOTLP(trace.GetSpans())
if err != nil {
return err
}
td := modelToOTLP(trace.GetSpans())
tracesData := api_v3.TracesData(td)
return stream.Send(&tracesData)
}

// FindTraces implements api_v3.QueryServiceServer's FindTraces
func (h *Handler) FindTraces(request *api_v3.FindTracesRequest, stream api_v3.QueryService_FindTracesServer) error {
return h.internalFindTraces(stream.Context(), request, stream.Send)
}

// separated for testing
func (h *Handler) internalFindTraces(
ctx context.Context,
request *api_v3.FindTracesRequest,
streamSend func(*api_v3.TracesData) error,
) error {
query := request.GetQuery()
if query == nil {
return status.Error(codes.InvalidArgument, "missing query")
Expand Down Expand Up @@ -74,17 +80,17 @@ func (h *Handler) FindTraces(request *api_v3.FindTracesRequest, stream api_v3.Qu
queryParams.DurationMax = d
}

traces, err := h.QueryService.FindTraces(stream.Context(), queryParams)
traces, err := h.QueryService.FindTraces(ctx, queryParams)
if err != nil {
return err
}
for _, t := range traces {
td, err := modelToOTLP(t.GetSpans())
if err != nil {
return err
}
td := modelToOTLP(t.GetSpans())
tracesData := api_v3.TracesData(td)
stream.Send(&tracesData)
if err := streamSend(&tracesData); err != nil {
return status.Error(codes.Internal,
fmt.Sprintf("failed to send response stream chunk to client: %v", err))
}
}
return nil
}
Expand Down
37 changes: 37 additions & 0 deletions cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ func TestFindTraces(t *testing.T) {
Attributes: map[string]string{"foo": "bar"},
StartTimeMin: time.Now().Add(-2 * time.Hour),
StartTimeMax: time.Now(),
DurationMin: 1 * time.Second,
DurationMax: 2 * time.Second,
SearchDepth: 10,
},
})
require.NoError(t, err)
Expand All @@ -164,6 +167,40 @@ func TestFindTraces(t *testing.T) {
require.EqualValues(t, 1, td.SpanCount())
}

func TestFindTracesSendError(t *testing.T) {
reader := new(spanstoremocks.Reader)
reader.On("FindTraces", mock.Anything, mock.AnythingOfType("*spanstore.TraceQueryParameters")).Return(
[]*model.Trace{
{
Spans: []*model.Span{
{
OperationName: "name",
},
},
},
}, nil).Once()
h := &Handler{
QueryService: querysvc.NewQueryService(
factoryadapter.NewTraceReader(reader),
new(dependencyStoreMocks.Reader),
querysvc.QueryServiceOptions{},
),
}
err := h.internalFindTraces(context.Background(),
&api_v3.FindTracesRequest{
Query: &api_v3.TraceQueryParameters{
StartTimeMin: time.Now().Add(-2 * time.Hour),
StartTimeMax: time.Now(),
},
},
/* streamSend= */ func(*api_v3.TracesData) error {
return errors.New("send_error")
},
)
require.ErrorContains(t, err, "send_error")
require.ErrorContains(t, err, "failed to send response")
}

func TestFindTracesQueryNil(t *testing.T) {
tsc := newTestServerClient(t)
responseStream, err := tsc.client.FindTraces(context.Background(), &api_v3.FindTracesRequest{})
Expand Down
7 changes: 2 additions & 5 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,9 @@ func (h *HTTPGateway) returnSpans(spans []*model.Span, w http.ResponseWriter) {
func (h *HTTPGateway) returnSpansTestable(
spans []*model.Span,
w http.ResponseWriter,
modelToOTLP func(_ []*model.Span) (ptrace.Traces, error),
modelToOTLP func(_ []*model.Span) ptrace.Traces,
) {
td, err := modelToOTLP(spans)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}
td := modelToOTLP(spans)
tracesData := api_v3.TracesData(td)
response := &api_v3.GRPCGatewayWrapper{
Result: &tracesData,
Expand Down
16 changes: 0 additions & 16 deletions cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ import (
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand Down Expand Up @@ -92,20 +90,6 @@ func TestHTTPGatewayTryHandleError(t *testing.T) {
assert.Contains(t, string(w.Body.String()), e, "writes error message to body")
}

func TestHTTPGatewayOTLPError(t *testing.T) {
w := httptest.NewRecorder()
gw := &HTTPGateway{
Logger: zap.NewNop(),
}
const simErr = "simulated error"
gw.returnSpansTestable(nil, w,
func(_ []*model.Span) (ptrace.Traces, error) {
return ptrace.Traces{}, errors.New(simErr)
},
)
assert.Contains(t, w.Body.String(), simErr)
}

func TestHTTPGatewayGetTraceErrors(t *testing.T) {
gw := setupHTTPGatewayNoServer(t, "")

Expand Down
6 changes: 4 additions & 2 deletions cmd/query/app/apiv3/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/jaegertracing/jaeger/model"
)

func modelToOTLP(spans []*model.Span) (ptrace.Traces, error) {
func modelToOTLP(spans []*model.Span) ptrace.Traces {
batch := &model.Batch{Spans: spans}
return model2otel.ProtoToTraces([]*model.Batch{batch})
// there is never an error returned from ProtoToTraces
tr, _ := model2otel.ProtoToTraces([]*model.Batch{batch})
return tr
}

0 comments on commit 9eb0170

Please sign in to comment.