From 63a2442191751e32aaafd6227e1602dfa3a95caa Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 26 Nov 2024 09:04:23 +0100 Subject: [PATCH] fix(otlp): Write protobuf status on error (#15097) --- go.mod | 2 +- pkg/distributor/http.go | 33 ++++-------------- pkg/distributor/http_test.go | 34 +----------------- pkg/loghttp/push/otlp_test.go | 42 ++++++++++++++++++++++ pkg/loghttp/push/push.go | 66 +++++++++++++++++++++++++++++++++-- 5 files changed, 115 insertions(+), 62 deletions(-) diff --git a/go.mod b/go.mod index 2391249a1289c..a207f41fbb376 100644 --- a/go.mod +++ b/go.mod @@ -364,7 +364,7 @@ require ( golang.org/x/tools v0.23.0 // indirect google.golang.org/genproto v0.0.0-20241113202542-65e8d215514f // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 636a16bb507b1..7337ce16209c4 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -19,38 +19,19 @@ import ( // PushHandler reads a snappy-compressed proto from the HTTP body. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { - d.pushHandler(w, r, push.ParseLokiRequest) + d.pushHandler(w, r, push.ParseLokiRequest, push.HTTPError) } func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request) { - interceptor := newOtelErrorHeaderInterceptor(w) - d.pushHandler(interceptor, r, push.ParseOTLPRequest) + d.pushHandler(w, r, push.ParseOTLPRequest, push.OTLPError) } -// otelErrorHeaderInterceptor maps 500 errors to 503. -// According to the OTLP specification, 500 errors are never retried on the client side, but 503 are. -type otelErrorHeaderInterceptor struct { - http.ResponseWriter -} - -func newOtelErrorHeaderInterceptor(w http.ResponseWriter) *otelErrorHeaderInterceptor { - return &otelErrorHeaderInterceptor{ResponseWriter: w} -} - -func (i *otelErrorHeaderInterceptor) WriteHeader(statusCode int) { - if statusCode == http.StatusInternalServerError { - statusCode = http.StatusServiceUnavailable - } - - i.ResponseWriter.WriteHeader(statusCode) -} - -func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRequestParser push.RequestParser) { +func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRequestParser push.RequestParser, errorWriter push.ErrorWriter) { logger := util_log.WithContext(r.Context(), util_log.Logger) tenantID, err := tenant.TenantID(r.Context()) if err != nil { level.Error(logger).Log("msg", "error getting tenant id", "err", err) - http.Error(w, err.Error(), http.StatusBadRequest) + errorWriter(w, err.Error(), http.StatusBadRequest, logger) return } @@ -70,7 +51,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe } d.writeFailuresManager.Log(tenantID, fmt.Errorf("couldn't parse push request: %w", err)) - http.Error(w, err.Error(), http.StatusBadRequest) + errorWriter(w, err.Error(), http.StatusBadRequest, logger) return } @@ -106,7 +87,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe "err", body, ) } - http.Error(w, body, int(resp.Code)) + errorWriter(w, body, int(resp.Code), logger) } else { if d.tenantConfigs.LogPushRequest(tenantID) { level.Debug(logger).Log( @@ -115,7 +96,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe "err", err.Error(), ) } - http.Error(w, err.Error(), http.StatusInternalServerError) + errorWriter(w, err.Error(), http.StatusInternalServerError, logger) } } diff --git a/pkg/distributor/http_test.go b/pkg/distributor/http_test.go index c6b8f3d017af6..8da8fc608fa98 100644 --- a/pkg/distributor/http_test.go +++ b/pkg/distributor/http_test.go @@ -78,43 +78,11 @@ func TestRequestParserWrapping(t *testing.T) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil) require.NoError(t, err) - distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser) + distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser, push.HTTPError) require.True(t, called) } -func Test_OtelErrorHeaderInterceptor(t *testing.T) { - for _, tc := range []struct { - name string - inputCode int - expectedCode int - }{ - { - name: "500", - inputCode: http.StatusInternalServerError, - expectedCode: http.StatusServiceUnavailable, - }, - { - name: "400", - inputCode: http.StatusBadRequest, - expectedCode: http.StatusBadRequest, - }, - { - name: "204", - inputCode: http.StatusNoContent, - expectedCode: http.StatusNoContent, - }, - } { - t.Run(tc.name, func(t *testing.T) { - r := httptest.NewRecorder() - i := newOtelErrorHeaderInterceptor(r) - - http.Error(i, "error", tc.inputCode) - require.Equal(t, tc.expectedCode, r.Code) - }) - } -} - func stubParser( _ string, _ *http.Request, diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index 5e5632eec0082..cbcf560bbcb10 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -4,6 +4,8 @@ import ( "context" "encoding/base64" "fmt" + "net/http" + "net/http/httptest" "testing" "time" @@ -13,6 +15,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/protobuf/proto" "github.com/grafana/loki/pkg/push" @@ -733,3 +737,41 @@ type fakeRetention struct{} func (f fakeRetention) RetentionPeriodFor(_ string, _ labels.Labels) time.Duration { return time.Hour } + +func TestOtlpError(t *testing.T) { + for _, tc := range []struct { + name string + msg string + inCode int + expectedCode int + }{ + { + name: "500 error maps 503", + msg: "test error 500 to 503", + inCode: http.StatusInternalServerError, + expectedCode: http.StatusServiceUnavailable, + }, + { + name: "other error", + msg: "test error", + inCode: http.StatusForbidden, + expectedCode: http.StatusForbidden, + }, + } { + t.Run(tc.name, func(t *testing.T) { + logger := log.NewNopLogger() + + r := httptest.NewRecorder() + OTLPError(r, tc.msg, tc.inCode, logger) + + require.Equal(t, tc.expectedCode, r.Code) + require.Equal(t, "application/octet-stream", r.Header().Get("Content-Type")) + + respStatus := &status.Status{} + require.NoError(t, proto.Unmarshal(r.Body.Bytes(), respStatus)) + + require.Equal(t, tc.msg, respStatus.Message) + require.EqualValues(t, 0, respStatus.Code) + }) + } +} diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 01feefae67e88..9da5b29722643 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -16,17 +16,19 @@ import ( "github.com/dustin/go-humanize" "github.com/go-kit/log" + "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" - - loki_util "github.com/grafana/loki/v3/pkg/util" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/loghttp" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/util" + loki_util "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/unmarshal" unmarshal2 "github.com/grafana/loki/v3/pkg/util/unmarshal/legacy" @@ -86,6 +88,7 @@ func (EmptyLimits) DiscoverServiceName(string) []string { type ( RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) RequestParserWrapper func(inner RequestParser) RequestParser + ErrorWriter func(w http.ResponseWriter, error string, code int, logger log.Logger) ) type Stats struct { @@ -307,3 +310,62 @@ func RetentionPeriodToString(retentionPeriod time.Duration) string { } return retentionHours } + +// OTLPError writes an OTLP-compliant error response to the given http.ResponseWriter. +// +// According to the OTLP spec: https://opentelemetry.io/docs/specs/otlp/#failures-1 +// Re. the error response format +// > If the processing of the request fails, the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code. +// > The response body for all HTTP 4xx and HTTP 5xx responses MUST be a Protobuf-encoded Status message that describes the problem. +// > This specification does not use Status.code field and the server MAY omit Status.code field. +// > The clients are not expected to alter their behavior based on Status.code field but MAY record it for troubleshooting purposes. +// > The Status.message field SHOULD contain a developer-facing error message as defined in Status message schema. +// +// Re. retryable errors +// > The requests that receive a response status code listed in following table SHOULD be retried. +// > All other 4xx or 5xx response status codes MUST NOT be retried +// > 429 Too Many Requests +// > 502 Bad Gateway +// > 503 Service Unavailable +// > 504 Gateway Timeout +// In loki, we expect clients to retry on 500 errors, so we map 500 errors to 503. +func OTLPError(w http.ResponseWriter, error string, code int, logger log.Logger) { + // Map 500 errors to 503. 500 errors are never retried on the client side, but 503 are. + if code == http.StatusInternalServerError { + code = http.StatusServiceUnavailable + } + + // As per the OTLP spec, we send the status code on the http header. + w.WriteHeader(code) + + // Status 0 because we omit the Status.code field. + status := grpcstatus.New(0, error).Proto() + respBytes, err := proto.Marshal(status) + if err != nil { + level.Error(logger).Log("msg", "failed to marshal error response", "error", err) + writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New( + codes.Internal, + fmt.Sprintf("failed to marshal error response: %s", err.Error()), + ).Proto()) + _, _ = w.Write(writeResponseFailedBody) + return + } + + w.Header().Set(contentType, "application/octet-stream") + if _, err = w.Write(respBytes); err != nil { + level.Error(logger).Log("msg", "failed to write error response", "error", err) + writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New( + codes.Internal, + fmt.Sprintf("failed write error: %s", err.Error()), + ).Proto()) + _, _ = w.Write(writeResponseFailedBody) + } +} + +var _ ErrorWriter = OTLPError + +func HTTPError(w http.ResponseWriter, error string, code int, _ log.Logger) { + http.Error(w, error, code) +} + +var _ ErrorWriter = HTTPError