Skip to content

Commit

Permalink
fix(otlp): Write protobuf status on error (#15097)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Nov 26, 2024
1 parent 3da33cd commit 63a2442
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 62 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ require (
golang.org/x/tools v0.23.0 // indirect
google.golang.org/genproto v0.0.0-20241113202542-65e8d215514f // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f
gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
Expand Down
33 changes: 7 additions & 26 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,19 @@ import (

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
d.pushHandler(w, r, push.ParseLokiRequest)
d.pushHandler(w, r, push.ParseLokiRequest, push.HTTPError)
}

func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request) {
interceptor := newOtelErrorHeaderInterceptor(w)
d.pushHandler(interceptor, r, push.ParseOTLPRequest)
d.pushHandler(w, r, push.ParseOTLPRequest, push.OTLPError)
}

// otelErrorHeaderInterceptor maps 500 errors to 503.
// According to the OTLP specification, 500 errors are never retried on the client side, but 503 are.
type otelErrorHeaderInterceptor struct {
http.ResponseWriter
}

func newOtelErrorHeaderInterceptor(w http.ResponseWriter) *otelErrorHeaderInterceptor {
return &otelErrorHeaderInterceptor{ResponseWriter: w}
}

func (i *otelErrorHeaderInterceptor) WriteHeader(statusCode int) {
if statusCode == http.StatusInternalServerError {
statusCode = http.StatusServiceUnavailable
}

i.ResponseWriter.WriteHeader(statusCode)
}

func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRequestParser push.RequestParser) {
func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRequestParser push.RequestParser, errorWriter push.ErrorWriter) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
tenantID, err := tenant.TenantID(r.Context())
if err != nil {
level.Error(logger).Log("msg", "error getting tenant id", "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
errorWriter(w, err.Error(), http.StatusBadRequest, logger)
return
}

Expand All @@ -70,7 +51,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
}
d.writeFailuresManager.Log(tenantID, fmt.Errorf("couldn't parse push request: %w", err))

http.Error(w, err.Error(), http.StatusBadRequest)
errorWriter(w, err.Error(), http.StatusBadRequest, logger)
return
}

Expand Down Expand Up @@ -106,7 +87,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
"err", body,
)
}
http.Error(w, body, int(resp.Code))
errorWriter(w, body, int(resp.Code), logger)
} else {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand All @@ -115,7 +96,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
"err", err.Error(),
)
}
http.Error(w, err.Error(), http.StatusInternalServerError)
errorWriter(w, err.Error(), http.StatusInternalServerError, logger)
}
}

Expand Down
34 changes: 1 addition & 33 deletions pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,43 +78,11 @@ func TestRequestParserWrapping(t *testing.T) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
require.NoError(t, err)

distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser)
distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser, push.HTTPError)

require.True(t, called)
}

func Test_OtelErrorHeaderInterceptor(t *testing.T) {
for _, tc := range []struct {
name string
inputCode int
expectedCode int
}{
{
name: "500",
inputCode: http.StatusInternalServerError,
expectedCode: http.StatusServiceUnavailable,
},
{
name: "400",
inputCode: http.StatusBadRequest,
expectedCode: http.StatusBadRequest,
},
{
name: "204",
inputCode: http.StatusNoContent,
expectedCode: http.StatusNoContent,
},
} {
t.Run(tc.name, func(t *testing.T) {
r := httptest.NewRecorder()
i := newOtelErrorHeaderInterceptor(r)

http.Error(i, "error", tc.inputCode)
require.Equal(t, tc.expectedCode, r.Code)
})
}
}

func stubParser(
_ string,
_ *http.Request,
Expand Down
42 changes: 42 additions & 0 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/base64"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

Expand All @@ -13,6 +15,8 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/proto"

"github.com/grafana/loki/pkg/push"

Expand Down Expand Up @@ -733,3 +737,41 @@ type fakeRetention struct{}
func (f fakeRetention) RetentionPeriodFor(_ string, _ labels.Labels) time.Duration {
return time.Hour
}

func TestOtlpError(t *testing.T) {
for _, tc := range []struct {
name string
msg string
inCode int
expectedCode int
}{
{
name: "500 error maps 503",
msg: "test error 500 to 503",
inCode: http.StatusInternalServerError,
expectedCode: http.StatusServiceUnavailable,
},
{
name: "other error",
msg: "test error",
inCode: http.StatusForbidden,
expectedCode: http.StatusForbidden,
},
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()

r := httptest.NewRecorder()
OTLPError(r, tc.msg, tc.inCode, logger)

require.Equal(t, tc.expectedCode, r.Code)
require.Equal(t, "application/octet-stream", r.Header().Get("Content-Type"))

respStatus := &status.Status{}
require.NoError(t, proto.Unmarshal(r.Body.Bytes(), respStatus))

require.Equal(t, tc.msg, respStatus.Message)
require.EqualValues(t, 0, respStatus.Code)
})
}
}
66 changes: 64 additions & 2 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ import (

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"

loki_util "github.com/grafana/loki/v3/pkg/util"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util"
loki_util "github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/unmarshal"
unmarshal2 "github.com/grafana/loki/v3/pkg/util/unmarshal/legacy"
Expand Down Expand Up @@ -86,6 +88,7 @@ func (EmptyLimits) DiscoverServiceName(string) []string {
type (
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
RequestParserWrapper func(inner RequestParser) RequestParser
ErrorWriter func(w http.ResponseWriter, error string, code int, logger log.Logger)
)

type Stats struct {
Expand Down Expand Up @@ -307,3 +310,62 @@ func RetentionPeriodToString(retentionPeriod time.Duration) string {
}
return retentionHours
}

// OTLPError writes an OTLP-compliant error response to the given http.ResponseWriter.
//
// According to the OTLP spec: https://opentelemetry.io/docs/specs/otlp/#failures-1
// Re. the error response format
// > If the processing of the request fails, the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code.
// > The response body for all HTTP 4xx and HTTP 5xx responses MUST be a Protobuf-encoded Status message that describes the problem.
// > This specification does not use Status.code field and the server MAY omit Status.code field.
// > The clients are not expected to alter their behavior based on Status.code field but MAY record it for troubleshooting purposes.
// > The Status.message field SHOULD contain a developer-facing error message as defined in Status message schema.
//
// Re. retryable errors
// > The requests that receive a response status code listed in following table SHOULD be retried.
// > All other 4xx or 5xx response status codes MUST NOT be retried
// > 429 Too Many Requests
// > 502 Bad Gateway
// > 503 Service Unavailable
// > 504 Gateway Timeout
// In loki, we expect clients to retry on 500 errors, so we map 500 errors to 503.
func OTLPError(w http.ResponseWriter, error string, code int, logger log.Logger) {
// Map 500 errors to 503. 500 errors are never retried on the client side, but 503 are.
if code == http.StatusInternalServerError {
code = http.StatusServiceUnavailable
}

// As per the OTLP spec, we send the status code on the http header.
w.WriteHeader(code)

// Status 0 because we omit the Status.code field.
status := grpcstatus.New(0, error).Proto()
respBytes, err := proto.Marshal(status)
if err != nil {
level.Error(logger).Log("msg", "failed to marshal error response", "error", err)
writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(
codes.Internal,
fmt.Sprintf("failed to marshal error response: %s", err.Error()),
).Proto())
_, _ = w.Write(writeResponseFailedBody)
return
}

w.Header().Set(contentType, "application/octet-stream")
if _, err = w.Write(respBytes); err != nil {
level.Error(logger).Log("msg", "failed to write error response", "error", err)
writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(
codes.Internal,
fmt.Sprintf("failed write error: %s", err.Error()),
).Proto())
_, _ = w.Write(writeResponseFailedBody)
}
}

var _ ErrorWriter = OTLPError

func HTTPError(w http.ResponseWriter, error string, code int, _ log.Logger) {
http.Error(w, error, code)
}

var _ ErrorWriter = HTTPError

0 comments on commit 63a2442

Please sign in to comment.