Skip to content

Commit

Permalink
fix: use connectrpc otel interceptor along with ours (#2208)
Browse files Browse the repository at this point in the history
  • Loading branch information
safeer authored Jul 30, 2024
1 parent eaa09ea commit f3441cc
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 112 deletions.
1 change: 1 addition & 0 deletions examples/go/echo/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require github.com/TBD54566975/ftl v0.248.0
require (
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions examples/go/echo/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.2
require (
connectrpc.com/connect v1.16.2
connectrpc.com/grpcreflect v1.2.0
connectrpc.com/otelconnect v0.7.1
github.com/BurntSushi/toml v1.4.0
github.com/TBD54566975/golang-tools v0.2.1
github.com/TBD54566975/scaffolder v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions internal/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"runtime/debug"

"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"github.com/alecthomas/types/optional"
"golang.org/x/mod/semver"

Expand Down Expand Up @@ -79,7 +80,12 @@ func WithRequestKey(ctx context.Context, key model.RequestKey) context.Context {
}

func DefaultClientOptions(level log.Level) []connect.ClientOption {
interceptors := []connect.Interceptor{PanicInterceptor(), MetadataInterceptor(log.Debug), OtelInterceptor()}
interceptors := []connect.Interceptor{
PanicInterceptor(),
MetadataInterceptor(log.Debug),
connectOtelInterceptor(),
CustomOtelInterceptor(),
}
if ftl.Version != "dev" {
interceptors = append(interceptors, versionInterceptor{})
}
Expand All @@ -90,13 +96,26 @@ func DefaultClientOptions(level log.Level) []connect.ClientOption {
}

func DefaultHandlerOptions() []connect.HandlerOption {
interceptors := []connect.Interceptor{PanicInterceptor(), MetadataInterceptor(log.Debug), OtelInterceptor()}
interceptors := []connect.Interceptor{
PanicInterceptor(),
MetadataInterceptor(log.Debug),
connectOtelInterceptor(),
CustomOtelInterceptor(),
}
if ftl.Version != "dev" {
interceptors = append(interceptors, versionInterceptor{})
}
return []connect.HandlerOption{connect.WithInterceptors(interceptors...)}
}

func connectOtelInterceptor() connect.Interceptor {
otel, err := otelconnect.NewInterceptor(otelconnect.WithTrustRemote(), otelconnect.WithoutServerPeerAttributes())
if err != nil {
panic(err)
}
return otel
}

// PanicInterceptor intercepts panics and logs them.
func PanicInterceptor() connect.Interceptor {
return &panicInterceptor{}
Expand Down
130 changes: 20 additions & 110 deletions internal/rpc/otel_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"

Expand All @@ -15,8 +13,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"

Expand All @@ -28,22 +24,16 @@ const (
otelFtlVerbChainAttr = attribute.Key("ftl.verb_chain")
otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref")
otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module")
otelMessageEventName = "message"
otelMessageEventIDAttr = attribute.Key("message.id")
otelMessageEventSizeAttr = attribute.Key("message.uncompressed_size")
otelMessageEventTypeAttr = attribute.Key("message.type")
otelMessageEventTypeSent = "SENT"
otelMessageEventTypeReceived = "RECEIVED"
otelMessageSentSizesAttr = attribute.Key("rpc.message.sent.sizes_bytes")
otelMessageReceivedSizesAttr = attribute.Key("rpc.message.received.sizes_bytes")
otelRPCDurationMetricName = "rpc.duration_ms"
otelRPCRequestSizeMetricName = "rpc.request.size_bytes"
otelRPCRequestsPerRPCMetricName = "rpc.request.count_per_rpc"
otelRPCResponseSizeMetricName = "rpc.response.size_bytes"
otelRPCResponsesPerRPCMetricName = "rpc.response.count_per_rpc"
otelMessageSentSizesAttr = attribute.Key("ftl.rpc.message.sent.sizes_bytes")
otelMessageReceivedSizesAttr = attribute.Key("ftl.rpc.message.received.sizes_bytes")
otelRPCDurationMetricName = "ftl.rpc.duration_ms"
otelRPCRequestSizeMetricName = "ftl.rpc.request.size_bytes"
otelRPCRequestsPerRPCMetricName = "ftl.rpc.request.count_per_rpc"
otelRPCResponseSizeMetricName = "ftl.rpc.response.size_bytes"
otelRPCResponsesPerRPCMetricName = "ftl.rpc.response.count_per_rpc"
)

func OtelInterceptor() connect.Interceptor {
func CustomOtelInterceptor() connect.Interceptor {
return &otelInterceptor{}
}

Expand All @@ -64,11 +54,9 @@ func getInstruments(isClient bool) instrumentation {
return serverInstruments
}

func getAttributes(ctx context.Context, rpcSystemKey string) []attribute.KeyValue {
func getAttributes(ctx context.Context) []attribute.KeyValue {
logger := log.FromContext(ctx)
attributes := []attribute.KeyValue{
semconv.RPCSystemKey.String(rpcSystemKey),
}
attributes := []attribute.KeyValue{}
requestKey, err := RequestKeyFromContext(ctx)
if err != nil {
logger.Warnf("failed to get request key: %s", err)
Expand All @@ -92,63 +80,33 @@ func getAttributes(ctx context.Context, rpcSystemKey string) []attribute.KeyValu

func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) {
ctx = propagateOtelHeaders(ctx, request.Spec().IsClient, request.Header())
requestStartTime := time.Now()
isClient := request.Spec().IsClient
name := strings.TrimLeft(request.Spec().Procedure, "/")

spanKind := trace.SpanKindClient
requestSpan := otelMessageEventTypeAttr.String(otelMessageEventTypeSent)
responseSpan := otelMessageEventTypeAttr.String(otelMessageEventTypeReceived)
requestSizesAttr := otelMessageSentSizesAttr
responseSizesAttr := otelMessageReceivedSizesAttr
if !isClient {
spanKind = trace.SpanKindServer
requestSpan = otelMessageEventTypeAttr.String(otelMessageEventTypeReceived)
responseSpan = otelMessageEventTypeAttr.String(otelMessageEventTypeSent)
requestSizesAttr = otelMessageReceivedSizesAttr
responseSizesAttr = otelMessageSentSizesAttr
}

attributes := getAttributes(ctx, request.Peer().Protocol)
traceOpts := []trace.SpanStartOption{
trace.WithAttributes(attributes...),
trace.WithSpanKind(spanKind),
}
tracer := otel.GetTracerProvider().Tracer(request.Spec().Procedure)
ctx, span := tracer.Start(ctx, name, traceOpts...)
defer span.End()

attributes := getAttributes(ctx)
requestSize := 0
if request != nil {
if msg, ok := request.Any().(proto.Message); ok {
requestSize = proto.Size(msg)
}
}

span.AddEvent(otelMessageEventName,
trace.WithAttributes(
requestSpan,
otelMessageEventIDAttr.Int(1),
otelMessageEventSizeAttr.Int(requestSize),
),
)

response, err := next(ctx, request)
attributes = append(attributes, statusCodeAttribute(request.Peer().Protocol, err))
responseSize := 0
if err == nil {
if msg, ok := response.Any().(proto.Message); ok {
responseSize = proto.Size(msg)
}
}
span.AddEvent(otelMessageEventName,
trace.WithAttributes(
responseSpan,
otelMessageEventIDAttr.Int(1),
otelMessageEventSizeAttr.Int(responseSize),
),
)

span := trace.SpanFromContext(ctx)
duration := time.Since(requestStartTime).Milliseconds()
span.SetAttributes(append(attributes,
attribute.Int64(otelRPCRequestsPerRPCMetricName, 1),
Expand All @@ -175,15 +133,7 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn {
requestStartTime := time.Now()
name := strings.TrimLeft(spec.Procedure, "/")

attributes := getAttributes(ctx, "grpc")
traceOpts := []trace.SpanStartOption{
trace.WithAttributes(attributes...),
trace.WithSpanKind(trace.SpanKindClient),
}
tracer := otel.GetTracerProvider().Tracer(spec.Procedure)
ctx, span := tracer.Start(ctx, name, traceOpts...) // nolint:spancheck
attributes := getAttributes(ctx)
conn := next(ctx, spec)

instruments := getInstruments(spec.IsClient)
Expand All @@ -197,6 +147,7 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc)
sendSizes: []int64{},
}

span := trace.SpanFromContext(ctx)
return &streamingClientInterceptor{ // nolint:spancheck
StreamingClientConn: conn,
receive: func(msg any, conn connect.StreamingClientConn) error {
Expand All @@ -206,9 +157,6 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc)
return state.send(ctx, msg, conn)
},
onClose: func() {
state.attributes = append(
state.attributes,
statusCodeAttribute(conn.Peer().Protocol, state.error))
duration := time.Since(requestStartTime).Milliseconds()
span.SetAttributes(append(state.attributes,
attribute.Int64(otelRPCRequestsPerRPCMetricName, state.sentCounter),
Expand All @@ -231,19 +179,8 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc)

func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return func(ctx context.Context, conn connect.StreamingHandlerConn) error {
ctx = propagateOtelHeaders(ctx, conn.Spec().IsClient, conn.RequestHeader())
requestStartTime := time.Now()
name := strings.TrimLeft(conn.Spec().Procedure, "/")

attributes := getAttributes(ctx, "grpc")
traceOpts := []trace.SpanStartOption{
trace.WithAttributes(attributes...),
trace.WithSpanKind(trace.SpanKindServer),
}
tracer := otel.GetTracerProvider().Tracer(conn.Spec().Procedure)
ctx, span := tracer.Start(ctx, name, traceOpts...)
defer span.End()

attributes := getAttributes(ctx)
instruments := getInstruments(conn.Spec().IsClient)
state := &streamingState{
spec: conn.Spec(),
Expand All @@ -267,10 +204,8 @@ func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc
state.attributes = append(
state.attributes,
statusCodeAttribute(conn.Peer().Protocol, err))
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
duration := time.Since(requestStartTime).Milliseconds()
span := trace.SpanFromContext(ctx)
span.SetAttributes(append(state.attributes,
attribute.Int64(otelRPCRequestsPerRPCMetricName, state.receivedCounter),
attribute.Int64(otelRPCResponsesPerRPCMetricName, state.sentCounter),
Expand All @@ -285,15 +220,6 @@ func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc
}
}

func propagateOtelHeaders(ctx context.Context, isClient bool, header http.Header) context.Context {
if isClient {
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header))
} else {
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(header))
}
return ctx
}

type instrumentation struct {
duration metric.Int64Histogram
requestSize metric.Int64Histogram
Expand Down Expand Up @@ -333,7 +259,7 @@ func createInstruments(meter metric.Meter) instrumentation {
}

func statusCodeAttribute(protocol string, err error) attribute.KeyValue {
statusCodeKey := fmt.Sprintf("rpc.%s.status_code", protocol)
statusCodeKey := fmt.Sprintf("ftl.rpc.%s.status_code", protocol)
statusCode := attribute.Int64(statusCodeKey, 0)
if err != nil {
statusCode = attribute.Int64(statusCodeKey, int64(connect.CodeOf(err)))
Expand Down Expand Up @@ -374,19 +300,11 @@ func (s *streamingState) receive(ctx context.Context, msg any, conn streamingSen
s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err))
}
s.receivedCounter++
attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic
otelMessageEventTypeAttr.String(otelMessageEventTypeReceived),
otelMessageEventIDAttr.Int64(s.receivedCounter),
}...)
if protomsg, ok := msg.(proto.Message); ok {
size := proto.Size(protomsg)
attrs = append(attrs, otelMessageEventSizeAttr.Int(size))
s.receiveSizes = append(s.receiveSizes, int64(size))
s.receiveSizeMetric.Record(ctx, int64(size), metric.WithAttributes(attrs...))
s.receiveSizeMetric.Record(ctx, int64(size), metric.WithAttributes(s.attributes...))
}

span := trace.SpanFromContext(ctx)
span.AddEvent(otelMessageEventName, trace.WithAttributes(attrs...))
return err // nolint:wrapcheck
}

Expand All @@ -402,19 +320,11 @@ func (s *streamingState) send(ctx context.Context, msg any, conn streamingSender
s.error = err
s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err))
}
attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic
otelMessageEventTypeAttr.String(otelMessageEventTypeSent),
otelMessageEventIDAttr.Int64(s.sentCounter),
}...)
if protomsg, ok := msg.(proto.Message); ok {
size := proto.Size(protomsg)
attrs = append(attrs, otelMessageEventSizeAttr.Int(size))
s.sendSizes = append(s.sendSizes, int64(size))
s.sendSizeMetric.Record(ctx, int64(size), metric.WithAttributes(attrs...))
s.sendSizeMetric.Record(ctx, int64(size), metric.WithAttributes(s.attributes...))
}

span := trace.SpanFromContext(ctx)
span.AddEvent(otelMessageEventName, trace.WithAttributes(attrs...))
return err // nolint:wrapcheck
}

Expand Down

0 comments on commit f3441cc

Please sign in to comment.