From 9eb0170af494910249c31154c5bdbf331907abbb Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 13 Dec 2024 21:32:58 -0400 Subject: [PATCH] Respond correctly to stream send error (#6357) ## Which problem is this PR solving? - Noticed by accident that apiv3 handler was ignoring stream send error ## Description of the changes - Handle error propertly - Add tests - Remove impossible error handling from pdata converter ## How was this change tested? - CI Signed-off-by: Yuri Shkuro --- cmd/query/app/apiv3/grpc_handler.go | 26 ++++++++++------- cmd/query/app/apiv3/grpc_handler_test.go | 37 ++++++++++++++++++++++++ cmd/query/app/apiv3/http_gateway.go | 7 ++--- cmd/query/app/apiv3/http_gateway_test.go | 16 ---------- cmd/query/app/apiv3/otlp_translator.go | 6 ++-- 5 files changed, 59 insertions(+), 33 deletions(-) diff --git a/cmd/query/app/apiv3/grpc_handler.go b/cmd/query/app/apiv3/grpc_handler.go index f27f5560c89..0e684f60406 100644 --- a/cmd/query/app/apiv3/grpc_handler.go +++ b/cmd/query/app/apiv3/grpc_handler.go @@ -36,16 +36,22 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS if err != nil { return fmt.Errorf("cannot retrieve trace: %w", err) } - td, err := modelToOTLP(trace.GetSpans()) - if err != nil { - return err - } + td := modelToOTLP(trace.GetSpans()) tracesData := api_v3.TracesData(td) return stream.Send(&tracesData) } // FindTraces implements api_v3.QueryServiceServer's FindTraces func (h *Handler) FindTraces(request *api_v3.FindTracesRequest, stream api_v3.QueryService_FindTracesServer) error { + return h.internalFindTraces(stream.Context(), request, stream.Send) +} + +// separated for testing +func (h *Handler) internalFindTraces( + ctx context.Context, + request *api_v3.FindTracesRequest, + streamSend func(*api_v3.TracesData) error, +) error { query := request.GetQuery() if query == nil { return status.Error(codes.InvalidArgument, "missing query") @@ -74,17 +80,17 @@ func (h *Handler) FindTraces(request *api_v3.FindTracesRequest, stream api_v3.Qu queryParams.DurationMax = d } - traces, err := h.QueryService.FindTraces(stream.Context(), queryParams) + traces, err := h.QueryService.FindTraces(ctx, queryParams) if err != nil { return err } for _, t := range traces { - td, err := modelToOTLP(t.GetSpans()) - if err != nil { - return err - } + td := modelToOTLP(t.GetSpans()) tracesData := api_v3.TracesData(td) - stream.Send(&tracesData) + if err := streamSend(&tracesData); err != nil { + return status.Error(codes.Internal, + fmt.Sprintf("failed to send response stream chunk to client: %v", err)) + } } return nil } diff --git a/cmd/query/app/apiv3/grpc_handler_test.go b/cmd/query/app/apiv3/grpc_handler_test.go index 6534df23fe6..dbd0c48c82b 100644 --- a/cmd/query/app/apiv3/grpc_handler_test.go +++ b/cmd/query/app/apiv3/grpc_handler_test.go @@ -155,6 +155,9 @@ func TestFindTraces(t *testing.T) { Attributes: map[string]string{"foo": "bar"}, StartTimeMin: time.Now().Add(-2 * time.Hour), StartTimeMax: time.Now(), + DurationMin: 1 * time.Second, + DurationMax: 2 * time.Second, + SearchDepth: 10, }, }) require.NoError(t, err) @@ -164,6 +167,40 @@ func TestFindTraces(t *testing.T) { require.EqualValues(t, 1, td.SpanCount()) } +func TestFindTracesSendError(t *testing.T) { + reader := new(spanstoremocks.Reader) + reader.On("FindTraces", mock.Anything, mock.AnythingOfType("*spanstore.TraceQueryParameters")).Return( + []*model.Trace{ + { + Spans: []*model.Span{ + { + OperationName: "name", + }, + }, + }, + }, nil).Once() + h := &Handler{ + QueryService: querysvc.NewQueryService( + factoryadapter.NewTraceReader(reader), + new(dependencyStoreMocks.Reader), + querysvc.QueryServiceOptions{}, + ), + } + err := h.internalFindTraces(context.Background(), + &api_v3.FindTracesRequest{ + Query: &api_v3.TraceQueryParameters{ + StartTimeMin: time.Now().Add(-2 * time.Hour), + StartTimeMax: time.Now(), + }, + }, + /* streamSend= */ func(*api_v3.TracesData) error { + return errors.New("send_error") + }, + ) + require.ErrorContains(t, err, "send_error") + require.ErrorContains(t, err, "failed to send response") +} + func TestFindTracesQueryNil(t *testing.T) { tsc := newTestServerClient(t) responseStream, err := tsc.client.FindTraces(context.Background(), &api_v3.FindTracesRequest{}) diff --git a/cmd/query/app/apiv3/http_gateway.go b/cmd/query/app/apiv3/http_gateway.go index 1abbcece1f8..24f0e6c4396 100644 --- a/cmd/query/app/apiv3/http_gateway.go +++ b/cmd/query/app/apiv3/http_gateway.go @@ -112,12 +112,9 @@ func (h *HTTPGateway) returnSpans(spans []*model.Span, w http.ResponseWriter) { func (h *HTTPGateway) returnSpansTestable( spans []*model.Span, w http.ResponseWriter, - modelToOTLP func(_ []*model.Span) (ptrace.Traces, error), + modelToOTLP func(_ []*model.Span) ptrace.Traces, ) { - td, err := modelToOTLP(spans) - if h.tryHandleError(w, err, http.StatusInternalServerError) { - return - } + td := modelToOTLP(spans) tracesData := api_v3.TracesData(td) response := &api_v3.GRPCGatewayWrapper{ Result: &tracesData, diff --git a/cmd/query/app/apiv3/http_gateway_test.go b/cmd/query/app/apiv3/http_gateway_test.go index f0abbeb3307..c395715b662 100644 --- a/cmd/query/app/apiv3/http_gateway_test.go +++ b/cmd/query/app/apiv3/http_gateway_test.go @@ -15,11 +15,9 @@ import ( "github.com/gorilla/mux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" - "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -92,20 +90,6 @@ func TestHTTPGatewayTryHandleError(t *testing.T) { assert.Contains(t, string(w.Body.String()), e, "writes error message to body") } -func TestHTTPGatewayOTLPError(t *testing.T) { - w := httptest.NewRecorder() - gw := &HTTPGateway{ - Logger: zap.NewNop(), - } - const simErr = "simulated error" - gw.returnSpansTestable(nil, w, - func(_ []*model.Span) (ptrace.Traces, error) { - return ptrace.Traces{}, errors.New(simErr) - }, - ) - assert.Contains(t, w.Body.String(), simErr) -} - func TestHTTPGatewayGetTraceErrors(t *testing.T) { gw := setupHTTPGatewayNoServer(t, "") diff --git a/cmd/query/app/apiv3/otlp_translator.go b/cmd/query/app/apiv3/otlp_translator.go index 9fa952a3cc3..1a39ca79845 100644 --- a/cmd/query/app/apiv3/otlp_translator.go +++ b/cmd/query/app/apiv3/otlp_translator.go @@ -10,7 +10,9 @@ import ( "github.com/jaegertracing/jaeger/model" ) -func modelToOTLP(spans []*model.Span) (ptrace.Traces, error) { +func modelToOTLP(spans []*model.Span) ptrace.Traces { batch := &model.Batch{Spans: spans} - return model2otel.ProtoToTraces([]*model.Batch{batch}) + // there is never an error returned from ProtoToTraces + tr, _ := model2otel.ProtoToTraces([]*model.Batch{batch}) + return tr }