diff --git a/internal/rpc/otel_interceptor.go b/internal/rpc/otel_interceptor.go index b42daf8959..8ea78525c3 100644 --- a/internal/rpc/otel_interceptor.go +++ b/internal/rpc/otel_interceptor.go @@ -6,13 +6,10 @@ import ( "fmt" "io" "sync" - "time" "connectrpc.com/connect" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" @@ -20,17 +17,12 @@ import ( ) const ( - otelFtlRequestKeyAttr = attribute.Key("ftl.request_key") - otelFtlVerbChainAttr = attribute.Key("ftl.verb_chain") - otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref") - otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module") - otelMessageSentSizesAttr = attribute.Key("ftl.rpc.message.sent.sizes_bytes") - otelMessageReceivedSizesAttr = attribute.Key("ftl.rpc.message.received.sizes_bytes") - otelRPCDurationMetricName = "ftl.rpc.duration_ms" - otelRPCRequestSizeMetricName = "ftl.rpc.request.size_bytes" - otelRPCRequestsPerRPCMetricName = "ftl.rpc.request.count_per_rpc" - otelRPCResponseSizeMetricName = "ftl.rpc.response.size_bytes" - otelRPCResponsesPerRPCMetricName = "ftl.rpc.response.count_per_rpc" + otelFtlRequestKeyAttr = attribute.Key("ftl.request_key") + otelFtlVerbChainAttr = attribute.Key("ftl.verb_chain") + otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref") + otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module") + otelMessageSentSizesAttr = attribute.Key("ftl.rpc.message.sent.sizes_bytes") + otelMessageReceivedSizesAttr = attribute.Key("ftl.rpc.message.received.sizes_bytes") ) func CustomOtelInterceptor() connect.Interceptor { @@ -39,21 +31,6 @@ func CustomOtelInterceptor() connect.Interceptor { type otelInterceptor struct{} -var clientInstruments instrumentation -var serverInstruments instrumentation - -func init() { - clientInstruments = createInstruments(otel.GetMeterProvider().Meter("ftl.rpc.client")) - serverInstruments = createInstruments(otel.GetMeterProvider().Meter("ftl.rpc.server")) -} - -func getInstruments(isClient bool) instrumentation { - if isClient { - return clientInstruments - } - return serverInstruments -} - func getAttributes(ctx context.Context) []attribute.KeyValue { logger := log.FromContext(ctx) attributes := []attribute.KeyValue{} @@ -80,7 +57,6 @@ func getAttributes(ctx context.Context) []attribute.KeyValue { func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) { - requestStartTime := time.Now() isClient := request.Spec().IsClient requestSizesAttr := otelMessageSentSizesAttr @@ -107,71 +83,44 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { } span := trace.SpanFromContext(ctx) - duration := time.Since(requestStartTime).Milliseconds() span.SetAttributes(append(attributes, - attribute.Int64(otelRPCRequestsPerRPCMetricName, 1), - attribute.Int64(otelRPCRequestSizeMetricName, int64(requestSize)), - attribute.Int64(otelRPCResponsesPerRPCMetricName, 1), - attribute.Int64(otelRPCResponseSizeMetricName, int64(responseSize)), - attribute.Int64(otelRPCDurationMetricName, duration), requestSizesAttr.Int64Slice([]int64{int64(requestSize)}), responseSizesAttr.Int64Slice([]int64{int64(responseSize)}), )...) - instruments := getInstruments(isClient) - instruments.duration.Record( - ctx, - time.Since(requestStartTime).Milliseconds(), - metric.WithAttributes(attributes...)) - instruments.requestSize.Record(ctx, int64(requestSize), metric.WithAttributes(attributes...)) - instruments.requestsPerRPC.Record(ctx, 1, metric.WithAttributes(attributes...)) - instruments.responseSize.Record(ctx, int64(responseSize), metric.WithAttributes(attributes...)) - instruments.responsesPerRPC.Record(ctx, 1, metric.WithAttributes(attributes...)) return response, err } } func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn { - requestStartTime := time.Now() attributes := getAttributes(ctx) conn := next(ctx, spec) - instruments := getInstruments(spec.IsClient) state := &streamingState{ - spec: spec, - protocol: conn.Peer().Protocol, - attributes: attributes, - receiveSizeMetric: instruments.responseSize, - sendSizeMetric: instruments.requestSize, - receiveSizes: []int64{}, - sendSizes: []int64{}, + spec: spec, + protocol: conn.Peer().Protocol, + attributes: attributes, + receiveSizes: []int64{}, + sendSizes: []int64{}, } span := trace.SpanFromContext(ctx) - return &streamingClientInterceptor{ // nolint:spancheck + return &streamingClientInterceptor{ StreamingClientConn: conn, receive: func(msg any, conn connect.StreamingClientConn) error { - return state.receive(ctx, msg, conn) + return state.receive(msg, conn) }, send: func(msg any, conn connect.StreamingClientConn) error { - return state.send(ctx, msg, conn) + return state.send(msg, conn) }, onClose: func() { - duration := time.Since(requestStartTime).Milliseconds() span.SetAttributes(append(state.attributes, - attribute.Int64(otelRPCRequestsPerRPCMetricName, state.sentCounter), - attribute.Int64(otelRPCResponsesPerRPCMetricName, state.receivedCounter), - attribute.Int64(otelRPCDurationMetricName, duration), otelMessageSentSizesAttr.Int64Slice(state.sendSizes), otelMessageReceivedSizesAttr.Int64Slice(state.receiveSizes), )...) if state.error != nil { span.SetStatus(codes.Error, state.error.Error()) } - span.End() - instruments.requestsPerRPC.Record(ctx, state.sentCounter, metric.WithAttributes(state.attributes...)) - instruments.responsesPerRPC.Record(ctx, state.receivedCounter, metric.WithAttributes(state.attributes...)) - instruments.duration.Record(ctx, duration, metric.WithAttributes(state.attributes...)) }, } } @@ -179,85 +128,36 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { return func(ctx context.Context, conn connect.StreamingHandlerConn) error { - requestStartTime := time.Now() attributes := getAttributes(ctx) - instruments := getInstruments(conn.Spec().IsClient) state := &streamingState{ - spec: conn.Spec(), - protocol: conn.Peer().Protocol, - attributes: attributes, - receiveSizeMetric: instruments.requestSize, - sendSizeMetric: instruments.responseSize, - receiveSizes: []int64{}, - sendSizes: []int64{}, + spec: conn.Spec(), + protocol: conn.Peer().Protocol, + attributes: attributes, + receiveSizes: []int64{}, + sendSizes: []int64{}, } streamingHandler := &streamingHandlerInterceptor{ StreamingHandlerConn: conn, receive: func(msg any, conn connect.StreamingHandlerConn) error { - return state.receive(ctx, msg, conn) + return state.receive(msg, conn) }, send: func(msg any, conn connect.StreamingHandlerConn) error { - return state.send(ctx, msg, conn) + return state.send(msg, conn) }, } err := next(ctx, streamingHandler) state.attributes = append( state.attributes, statusCodeAttribute(conn.Peer().Protocol, err)) - duration := time.Since(requestStartTime).Milliseconds() span := trace.SpanFromContext(ctx) span.SetAttributes(append(state.attributes, - attribute.Int64(otelRPCRequestsPerRPCMetricName, state.receivedCounter), - attribute.Int64(otelRPCResponsesPerRPCMetricName, state.sentCounter), - attribute.Int64(otelRPCDurationMetricName, duration), otelMessageSentSizesAttr.Int64Slice(state.sendSizes), otelMessageReceivedSizesAttr.Int64Slice(state.receiveSizes), )...) - instruments.requestsPerRPC.Record(ctx, state.receivedCounter, metric.WithAttributes(state.attributes...)) - instruments.responsesPerRPC.Record(ctx, state.sentCounter, metric.WithAttributes(state.attributes...)) - instruments.duration.Record(ctx, duration, metric.WithAttributes(state.attributes...)) return err } } -type instrumentation struct { - duration metric.Int64Histogram - requestSize metric.Int64Histogram - responseSize metric.Int64Histogram - requestsPerRPC metric.Int64Histogram - responsesPerRPC metric.Int64Histogram -} - -func createInstruments(meter metric.Meter) instrumentation { - duration, err := meter.Int64Histogram(otelRPCDurationMetricName, metric.WithUnit("ms"), metric.WithDescription("Duration of the RPC call")) - if err != nil { - panic(fmt.Errorf("failed to create duration metric: %w", err)) - } - requestSize, err := meter.Int64Histogram(otelRPCRequestSizeMetricName, metric.WithUnit("By"), metric.WithDescription("Size of the request payload")) - if err != nil { - panic(fmt.Errorf("failed to create request size metric: %w", err)) - } - responseSize, err := meter.Int64Histogram(otelRPCResponseSizeMetricName, metric.WithUnit("By"), metric.WithDescription("Size of the response payload")) - if err != nil { - panic(fmt.Errorf("failed to create response size metric: %w", err)) - } - requestsPerRPC, err := meter.Int64Histogram(otelRPCRequestsPerRPCMetricName, metric.WithUnit("1"), metric.WithDescription("Number of requests made in the RPC call")) - if err != nil { - panic(fmt.Errorf("failed to create requests per rpc metric: %w", err)) - } - responsesPerRPC, err := meter.Int64Histogram(otelRPCResponsesPerRPCMetricName, metric.WithUnit("1"), metric.WithDescription("Number of responses received in the RPC call")) - if err != nil { - panic(fmt.Errorf("failed to create responses per rpc metric: %w", err)) - } - return instrumentation{ - duration: duration, - requestSize: requestSize, - responseSize: responseSize, - requestsPerRPC: requestsPerRPC, - responsesPerRPC: responsesPerRPC, - } -} - func statusCodeAttribute(protocol string, err error) attribute.KeyValue { statusCodeKey := fmt.Sprintf("ftl.rpc.%s.status_code", protocol) statusCode := attribute.Int64(statusCodeKey, 0) @@ -269,17 +169,13 @@ func statusCodeAttribute(protocol string, err error) attribute.KeyValue { // streamingState stores the ongoing metrics for streaming interceptors. type streamingState struct { - mu sync.Mutex - spec connect.Spec - protocol string - attributes []attribute.KeyValue - error error - sentCounter int64 - receivedCounter int64 - receiveSizeMetric metric.Int64Histogram - sendSizeMetric metric.Int64Histogram - receiveSizes []int64 - sendSizes []int64 + mu sync.Mutex + spec connect.Spec + protocol string + attributes []attribute.KeyValue + error error + receiveSizes []int64 + sendSizes []int64 } // streamingSenderReceiver encapsulates either a StreamingClientConn or a StreamingHandlerConn. @@ -288,7 +184,7 @@ type streamingSenderReceiver interface { Send(msg any) error } -func (s *streamingState) receive(ctx context.Context, msg any, conn streamingSenderReceiver) error { +func (s *streamingState) receive(msg any, conn streamingSenderReceiver) error { err := conn.Receive(msg) s.mu.Lock() defer s.mu.Unlock() @@ -299,23 +195,20 @@ func (s *streamingState) receive(ctx context.Context, msg any, conn streamingSen s.error = err s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err)) } - s.receivedCounter++ if protomsg, ok := msg.(proto.Message); ok { size := proto.Size(protomsg) s.receiveSizes = append(s.receiveSizes, int64(size)) - s.receiveSizeMetric.Record(ctx, int64(size), metric.WithAttributes(s.attributes...)) } return err // nolint:wrapcheck } -func (s *streamingState) send(ctx context.Context, msg any, conn streamingSenderReceiver) error { +func (s *streamingState) send(msg any, conn streamingSenderReceiver) error { err := conn.Send(msg) s.mu.Lock() defer s.mu.Unlock() if errors.Is(err, io.EOF) { return err // nolint:wrapcheck } - s.sentCounter++ if err != nil { s.error = err s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err)) @@ -323,7 +216,6 @@ func (s *streamingState) send(ctx context.Context, msg any, conn streamingSender if protomsg, ok := msg.(proto.Message); ok { size := proto.Size(protomsg) s.sendSizes = append(s.sendSizes, int64(size)) - s.sendSizeMetric.Record(ctx, int64(size), metric.WithAttributes(s.attributes...)) } return err // nolint:wrapcheck }