From f3441ccc25343efb0b3b2d793cf6fcfecc8291f6 Mon Sep 17 00:00:00 2001 From: Safeer Jiwan Date: Tue, 30 Jul 2024 16:59:51 -0700 Subject: [PATCH] fix: use connectrpc otel interceptor along with ours (#2208) --- examples/go/echo/go.mod | 1 + examples/go/echo/go.sum | 6 ++ go.mod | 1 + go.sum | 2 + internal/rpc/context.go | 23 +++++- internal/rpc/otel_interceptor.go | 130 +++++-------------------------- 6 files changed, 51 insertions(+), 112 deletions(-) diff --git a/examples/go/echo/go.mod b/examples/go/echo/go.mod index 975563c34b..00c5d8e4db 100644 --- a/examples/go/echo/go.mod +++ b/examples/go/echo/go.mod @@ -9,6 +9,7 @@ require github.com/TBD54566975/ftl v0.248.0 require ( connectrpc.com/connect v1.16.2 // indirect connectrpc.com/grpcreflect v1.2.0 // indirect + connectrpc.com/otelconnect v0.7.1 // indirect github.com/alecthomas/atomic v0.1.0-alpha2 // indirect github.com/alecthomas/concurrency v0.0.2 // indirect github.com/alecthomas/participle/v2 v2.1.1 // indirect diff --git a/examples/go/echo/go.sum b/examples/go/echo/go.sum index fbefaba633..359cfad1d6 100644 --- a/examples/go/echo/go.sum +++ b/examples/go/echo/go.sum @@ -2,6 +2,8 @@ connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U= connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= +connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY= +connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms= github.com/TBD54566975/scaffolder v1.0.0 h1:QUFSy2wVzumLDg7IHcKC6AP+IYyqWe9Wxiu72nZn5qU= github.com/TBD54566975/scaffolder v1.0.0/go.mod h1:auVpczIbOAdIhYDVSruIw41DanxOKB9bSvjf6MEl7Fs= github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= @@ -103,6 +105,10 @@ go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/sdk/metric v1.28.0 h1:OkuaKgKrgAbYrrY0t92c+cC+2F6hsFNnCQArXCKlg08= +go.opentelemetry.io/otel/sdk/metric v1.28.0/go.mod h1:cWPjykihLAPvXKi4iZc1dpER3Jdq2Z0YLse3moQUCpg= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= diff --git a/go.mod b/go.mod index 74902401f5..8f0038cd3e 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22.2 require ( connectrpc.com/connect v1.16.2 connectrpc.com/grpcreflect v1.2.0 + connectrpc.com/otelconnect v0.7.1 github.com/BurntSushi/toml v1.4.0 github.com/TBD54566975/golang-tools v0.2.1 github.com/TBD54566975/scaffolder v1.0.0 diff --git a/go.sum b/go.sum index 07cf542813..5becce661c 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U= connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= +connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY= +connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= diff --git a/internal/rpc/context.go b/internal/rpc/context.go index d47615aeba..34f7164328 100644 --- a/internal/rpc/context.go +++ b/internal/rpc/context.go @@ -7,6 +7,7 @@ import ( "runtime/debug" "connectrpc.com/connect" + "connectrpc.com/otelconnect" "github.com/alecthomas/types/optional" "golang.org/x/mod/semver" @@ -79,7 +80,12 @@ func WithRequestKey(ctx context.Context, key model.RequestKey) context.Context { } func DefaultClientOptions(level log.Level) []connect.ClientOption { - interceptors := []connect.Interceptor{PanicInterceptor(), MetadataInterceptor(log.Debug), OtelInterceptor()} + interceptors := []connect.Interceptor{ + PanicInterceptor(), + MetadataInterceptor(log.Debug), + connectOtelInterceptor(), + CustomOtelInterceptor(), + } if ftl.Version != "dev" { interceptors = append(interceptors, versionInterceptor{}) } @@ -90,13 +96,26 @@ func DefaultClientOptions(level log.Level) []connect.ClientOption { } func DefaultHandlerOptions() []connect.HandlerOption { - interceptors := []connect.Interceptor{PanicInterceptor(), MetadataInterceptor(log.Debug), OtelInterceptor()} + interceptors := []connect.Interceptor{ + PanicInterceptor(), + MetadataInterceptor(log.Debug), + connectOtelInterceptor(), + CustomOtelInterceptor(), + } if ftl.Version != "dev" { interceptors = append(interceptors, versionInterceptor{}) } return []connect.HandlerOption{connect.WithInterceptors(interceptors...)} } +func connectOtelInterceptor() connect.Interceptor { + otel, err := otelconnect.NewInterceptor(otelconnect.WithTrustRemote(), otelconnect.WithoutServerPeerAttributes()) + if err != nil { + panic(err) + } + return otel +} + // PanicInterceptor intercepts panics and logs them. func PanicInterceptor() connect.Interceptor { return &panicInterceptor{} diff --git a/internal/rpc/otel_interceptor.go b/internal/rpc/otel_interceptor.go index 9bccf79d7f..b42daf8959 100644 --- a/internal/rpc/otel_interceptor.go +++ b/internal/rpc/otel_interceptor.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "io" - "net/http" - "strings" "sync" "time" @@ -15,8 +13,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/propagation" - semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" @@ -28,22 +24,16 @@ const ( otelFtlVerbChainAttr = attribute.Key("ftl.verb_chain") otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref") otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module") - otelMessageEventName = "message" - otelMessageEventIDAttr = attribute.Key("message.id") - otelMessageEventSizeAttr = attribute.Key("message.uncompressed_size") - otelMessageEventTypeAttr = attribute.Key("message.type") - otelMessageEventTypeSent = "SENT" - otelMessageEventTypeReceived = "RECEIVED" - otelMessageSentSizesAttr = attribute.Key("rpc.message.sent.sizes_bytes") - otelMessageReceivedSizesAttr = attribute.Key("rpc.message.received.sizes_bytes") - otelRPCDurationMetricName = "rpc.duration_ms" - otelRPCRequestSizeMetricName = "rpc.request.size_bytes" - otelRPCRequestsPerRPCMetricName = "rpc.request.count_per_rpc" - otelRPCResponseSizeMetricName = "rpc.response.size_bytes" - otelRPCResponsesPerRPCMetricName = "rpc.response.count_per_rpc" + otelMessageSentSizesAttr = attribute.Key("ftl.rpc.message.sent.sizes_bytes") + otelMessageReceivedSizesAttr = attribute.Key("ftl.rpc.message.received.sizes_bytes") + otelRPCDurationMetricName = "ftl.rpc.duration_ms" + otelRPCRequestSizeMetricName = "ftl.rpc.request.size_bytes" + otelRPCRequestsPerRPCMetricName = "ftl.rpc.request.count_per_rpc" + otelRPCResponseSizeMetricName = "ftl.rpc.response.size_bytes" + otelRPCResponsesPerRPCMetricName = "ftl.rpc.response.count_per_rpc" ) -func OtelInterceptor() connect.Interceptor { +func CustomOtelInterceptor() connect.Interceptor { return &otelInterceptor{} } @@ -64,11 +54,9 @@ func getInstruments(isClient bool) instrumentation { return serverInstruments } -func getAttributes(ctx context.Context, rpcSystemKey string) []attribute.KeyValue { +func getAttributes(ctx context.Context) []attribute.KeyValue { logger := log.FromContext(ctx) - attributes := []attribute.KeyValue{ - semconv.RPCSystemKey.String(rpcSystemKey), - } + attributes := []attribute.KeyValue{} requestKey, err := RequestKeyFromContext(ctx) if err != nil { logger.Warnf("failed to get request key: %s", err) @@ -92,33 +80,17 @@ func getAttributes(ctx context.Context, rpcSystemKey string) []attribute.KeyValu func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) { - ctx = propagateOtelHeaders(ctx, request.Spec().IsClient, request.Header()) requestStartTime := time.Now() isClient := request.Spec().IsClient - name := strings.TrimLeft(request.Spec().Procedure, "/") - spanKind := trace.SpanKindClient - requestSpan := otelMessageEventTypeAttr.String(otelMessageEventTypeSent) - responseSpan := otelMessageEventTypeAttr.String(otelMessageEventTypeReceived) requestSizesAttr := otelMessageSentSizesAttr responseSizesAttr := otelMessageReceivedSizesAttr if !isClient { - spanKind = trace.SpanKindServer - requestSpan = otelMessageEventTypeAttr.String(otelMessageEventTypeReceived) - responseSpan = otelMessageEventTypeAttr.String(otelMessageEventTypeSent) requestSizesAttr = otelMessageReceivedSizesAttr responseSizesAttr = otelMessageSentSizesAttr } - attributes := getAttributes(ctx, request.Peer().Protocol) - traceOpts := []trace.SpanStartOption{ - trace.WithAttributes(attributes...), - trace.WithSpanKind(spanKind), - } - tracer := otel.GetTracerProvider().Tracer(request.Spec().Procedure) - ctx, span := tracer.Start(ctx, name, traceOpts...) - defer span.End() - + attributes := getAttributes(ctx) requestSize := 0 if request != nil { if msg, ok := request.Any().(proto.Message); ok { @@ -126,29 +98,15 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { } } - span.AddEvent(otelMessageEventName, - trace.WithAttributes( - requestSpan, - otelMessageEventIDAttr.Int(1), - otelMessageEventSizeAttr.Int(requestSize), - ), - ) - response, err := next(ctx, request) - attributes = append(attributes, statusCodeAttribute(request.Peer().Protocol, err)) responseSize := 0 if err == nil { if msg, ok := response.Any().(proto.Message); ok { responseSize = proto.Size(msg) } } - span.AddEvent(otelMessageEventName, - trace.WithAttributes( - responseSpan, - otelMessageEventIDAttr.Int(1), - otelMessageEventSizeAttr.Int(responseSize), - ), - ) + + span := trace.SpanFromContext(ctx) duration := time.Since(requestStartTime).Milliseconds() span.SetAttributes(append(attributes, attribute.Int64(otelRPCRequestsPerRPCMetricName, 1), @@ -175,15 +133,7 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn { requestStartTime := time.Now() - name := strings.TrimLeft(spec.Procedure, "/") - - attributes := getAttributes(ctx, "grpc") - traceOpts := []trace.SpanStartOption{ - trace.WithAttributes(attributes...), - trace.WithSpanKind(trace.SpanKindClient), - } - tracer := otel.GetTracerProvider().Tracer(spec.Procedure) - ctx, span := tracer.Start(ctx, name, traceOpts...) // nolint:spancheck + attributes := getAttributes(ctx) conn := next(ctx, spec) instruments := getInstruments(spec.IsClient) @@ -197,6 +147,7 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) sendSizes: []int64{}, } + span := trace.SpanFromContext(ctx) return &streamingClientInterceptor{ // nolint:spancheck StreamingClientConn: conn, receive: func(msg any, conn connect.StreamingClientConn) error { @@ -206,9 +157,6 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) return state.send(ctx, msg, conn) }, onClose: func() { - state.attributes = append( - state.attributes, - statusCodeAttribute(conn.Peer().Protocol, state.error)) duration := time.Since(requestStartTime).Milliseconds() span.SetAttributes(append(state.attributes, attribute.Int64(otelRPCRequestsPerRPCMetricName, state.sentCounter), @@ -231,19 +179,8 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { return func(ctx context.Context, conn connect.StreamingHandlerConn) error { - ctx = propagateOtelHeaders(ctx, conn.Spec().IsClient, conn.RequestHeader()) requestStartTime := time.Now() - name := strings.TrimLeft(conn.Spec().Procedure, "/") - - attributes := getAttributes(ctx, "grpc") - traceOpts := []trace.SpanStartOption{ - trace.WithAttributes(attributes...), - trace.WithSpanKind(trace.SpanKindServer), - } - tracer := otel.GetTracerProvider().Tracer(conn.Spec().Procedure) - ctx, span := tracer.Start(ctx, name, traceOpts...) - defer span.End() - + attributes := getAttributes(ctx) instruments := getInstruments(conn.Spec().IsClient) state := &streamingState{ spec: conn.Spec(), @@ -267,10 +204,8 @@ func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc state.attributes = append( state.attributes, statusCodeAttribute(conn.Peer().Protocol, err)) - if err != nil { - span.SetStatus(codes.Error, err.Error()) - } duration := time.Since(requestStartTime).Milliseconds() + span := trace.SpanFromContext(ctx) span.SetAttributes(append(state.attributes, attribute.Int64(otelRPCRequestsPerRPCMetricName, state.receivedCounter), attribute.Int64(otelRPCResponsesPerRPCMetricName, state.sentCounter), @@ -285,15 +220,6 @@ func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc } } -func propagateOtelHeaders(ctx context.Context, isClient bool, header http.Header) context.Context { - if isClient { - otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header)) - } else { - ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(header)) - } - return ctx -} - type instrumentation struct { duration metric.Int64Histogram requestSize metric.Int64Histogram @@ -333,7 +259,7 @@ func createInstruments(meter metric.Meter) instrumentation { } func statusCodeAttribute(protocol string, err error) attribute.KeyValue { - statusCodeKey := fmt.Sprintf("rpc.%s.status_code", protocol) + statusCodeKey := fmt.Sprintf("ftl.rpc.%s.status_code", protocol) statusCode := attribute.Int64(statusCodeKey, 0) if err != nil { statusCode = attribute.Int64(statusCodeKey, int64(connect.CodeOf(err))) @@ -374,19 +300,11 @@ func (s *streamingState) receive(ctx context.Context, msg any, conn streamingSen s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err)) } s.receivedCounter++ - attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic - otelMessageEventTypeAttr.String(otelMessageEventTypeReceived), - otelMessageEventIDAttr.Int64(s.receivedCounter), - }...) if protomsg, ok := msg.(proto.Message); ok { size := proto.Size(protomsg) - attrs = append(attrs, otelMessageEventSizeAttr.Int(size)) s.receiveSizes = append(s.receiveSizes, int64(size)) - s.receiveSizeMetric.Record(ctx, int64(size), metric.WithAttributes(attrs...)) + s.receiveSizeMetric.Record(ctx, int64(size), metric.WithAttributes(s.attributes...)) } - - span := trace.SpanFromContext(ctx) - span.AddEvent(otelMessageEventName, trace.WithAttributes(attrs...)) return err // nolint:wrapcheck } @@ -402,19 +320,11 @@ func (s *streamingState) send(ctx context.Context, msg any, conn streamingSender s.error = err s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err)) } - attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic - otelMessageEventTypeAttr.String(otelMessageEventTypeSent), - otelMessageEventIDAttr.Int64(s.sentCounter), - }...) if protomsg, ok := msg.(proto.Message); ok { size := proto.Size(protomsg) - attrs = append(attrs, otelMessageEventSizeAttr.Int(size)) s.sendSizes = append(s.sendSizes, int64(size)) - s.sendSizeMetric.Record(ctx, int64(size), metric.WithAttributes(attrs...)) + s.sendSizeMetric.Record(ctx, int64(size), metric.WithAttributes(s.attributes...)) } - - span := trace.SpanFromContext(ctx) - span.AddEvent(otelMessageEventName, trace.WithAttributes(attrs...)) return err // nolint:wrapcheck }