diff --git a/internal/rpc/otel_interceptor.go b/internal/rpc/otel_interceptor.go index 33352ed46a..9bccf79d7f 100644 --- a/internal/rpc/otel_interceptor.go +++ b/internal/rpc/otel_interceptor.go @@ -24,15 +24,23 @@ import ( ) const ( - otelFtlRequestKeyAttr = attribute.Key("ftl.requestKey") - otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref") - otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module") - otelMessageEvent = "message" - otelMessageIDAttr = attribute.Key("message.id") - otelMessageSizeAttr = attribute.Key("message.uncompressed_size") - otelMessageTypeAttr = attribute.Key("message.type") - otelMessageTypeSent = "SENT" - otelMessageTypeReceived = "RECEIVED" + otelFtlRequestKeyAttr = attribute.Key("ftl.request_key") + otelFtlVerbChainAttr = attribute.Key("ftl.verb_chain") + otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref") + otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module") + otelMessageEventName = "message" + otelMessageEventIDAttr = attribute.Key("message.id") + otelMessageEventSizeAttr = attribute.Key("message.uncompressed_size") + otelMessageEventTypeAttr = attribute.Key("message.type") + otelMessageEventTypeSent = "SENT" + otelMessageEventTypeReceived = "RECEIVED" + otelMessageSentSizesAttr = attribute.Key("rpc.message.sent.sizes_bytes") + otelMessageReceivedSizesAttr = attribute.Key("rpc.message.received.sizes_bytes") + otelRPCDurationMetricName = "rpc.duration_ms" + otelRPCRequestSizeMetricName = "rpc.request.size_bytes" + otelRPCRequestsPerRPCMetricName = "rpc.request.count_per_rpc" + otelRPCResponseSizeMetricName = "rpc.response.size_bytes" + otelRPCResponsesPerRPCMetricName = "rpc.response.count_per_rpc" ) func OtelInterceptor() connect.Interceptor { @@ -72,6 +80,13 @@ func getAttributes(ctx context.Context, rpcSystemKey string) []attribute.KeyValu attributes = append(attributes, otelFtlVerbRefAttr.String(verb.String())) attributes = append(attributes, otelFtlVerbModuleAttr.String(verb.Module)) } + if verbs, ok := VerbsFromContext(ctx); ok { + verbStrings := make([]string, len(verbs)) + for i, v := range verbs { + verbStrings[i] = v.String() + } + attributes = append(attributes, otelFtlVerbChainAttr.StringSlice(verbStrings)) + } return attributes } @@ -83,12 +98,16 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { name := strings.TrimLeft(request.Spec().Procedure, "/") spanKind := trace.SpanKindClient - requestSpan := otelMessageTypeAttr.String(otelMessageTypeSent) - responseSpan := otelMessageTypeAttr.String(otelMessageTypeReceived) + requestSpan := otelMessageEventTypeAttr.String(otelMessageEventTypeSent) + responseSpan := otelMessageEventTypeAttr.String(otelMessageEventTypeReceived) + requestSizesAttr := otelMessageSentSizesAttr + responseSizesAttr := otelMessageReceivedSizesAttr if !isClient { spanKind = trace.SpanKindServer - requestSpan = otelMessageTypeAttr.String(otelMessageTypeReceived) - responseSpan = otelMessageTypeAttr.String(otelMessageTypeSent) + requestSpan = otelMessageEventTypeAttr.String(otelMessageEventTypeReceived) + responseSpan = otelMessageEventTypeAttr.String(otelMessageEventTypeSent) + requestSizesAttr = otelMessageReceivedSizesAttr + responseSizesAttr = otelMessageSentSizesAttr } attributes := getAttributes(ctx, request.Peer().Protocol) @@ -107,30 +126,39 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { } } - span.AddEvent(otelMessageEvent, + span.AddEvent(otelMessageEventName, trace.WithAttributes( requestSpan, - otelMessageIDAttr.Int(1), - otelMessageSizeAttr.Int(requestSize), + otelMessageEventIDAttr.Int(1), + otelMessageEventSizeAttr.Int(requestSize), ), ) response, err := next(ctx, request) attributes = append(attributes, statusCodeAttribute(request.Peer().Protocol, err)) - var responseSize int + responseSize := 0 if err == nil { if msg, ok := response.Any().(proto.Message); ok { responseSize = proto.Size(msg) } } - span.AddEvent(otelMessageEvent, + span.AddEvent(otelMessageEventName, trace.WithAttributes( responseSpan, - otelMessageIDAttr.Int(1), - otelMessageSizeAttr.Int(responseSize), + otelMessageEventIDAttr.Int(1), + otelMessageEventSizeAttr.Int(responseSize), ), ) - span.SetAttributes(attributes...) + duration := time.Since(requestStartTime).Milliseconds() + span.SetAttributes(append(attributes, + attribute.Int64(otelRPCRequestsPerRPCMetricName, 1), + attribute.Int64(otelRPCRequestSizeMetricName, int64(requestSize)), + attribute.Int64(otelRPCResponsesPerRPCMetricName, 1), + attribute.Int64(otelRPCResponseSizeMetricName, int64(responseSize)), + attribute.Int64(otelRPCDurationMetricName, duration), + requestSizesAttr.Int64Slice([]int64{int64(requestSize)}), + responseSizesAttr.Int64Slice([]int64{int64(responseSize)}), + )...) instruments := getInstruments(isClient) instruments.duration.Record( ctx, @@ -160,11 +188,13 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) instruments := getInstruments(spec.IsClient) state := &streamingState{ - spec: spec, - protocol: conn.Peer().Protocol, - attributes: attributes, - receiveSize: instruments.responseSize, - sendSize: instruments.requestSize, + spec: spec, + protocol: conn.Peer().Protocol, + attributes: attributes, + receiveSizeMetric: instruments.responseSize, + sendSizeMetric: instruments.requestSize, + receiveSizes: []int64{}, + sendSizes: []int64{}, } return &streamingClientInterceptor{ // nolint:spancheck @@ -179,22 +209,21 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) state.attributes = append( state.attributes, statusCodeAttribute(conn.Peer().Protocol, state.error)) - span.SetAttributes(state.attributes...) + duration := time.Since(requestStartTime).Milliseconds() + span.SetAttributes(append(state.attributes, + attribute.Int64(otelRPCRequestsPerRPCMetricName, state.sentCounter), + attribute.Int64(otelRPCResponsesPerRPCMetricName, state.receivedCounter), + attribute.Int64(otelRPCDurationMetricName, duration), + otelMessageSentSizesAttr.Int64Slice(state.sendSizes), + otelMessageReceivedSizesAttr.Int64Slice(state.receiveSizes), + )...) if state.error != nil { span.SetStatus(codes.Error, state.error.Error()) } span.End() - instruments.requestsPerRPC.Record( - ctx, - state.sentCounter, - metric.WithAttributes(state.attributes...)) - instruments.responsesPerRPC.Record( - ctx, - state.receivedCounter, - metric.WithAttributes(state.attributes...)) - instruments.duration.Record(ctx, - time.Since(requestStartTime).Milliseconds(), - metric.WithAttributes(state.attributes...)) + instruments.requestsPerRPC.Record(ctx, state.sentCounter, metric.WithAttributes(state.attributes...)) + instruments.responsesPerRPC.Record(ctx, state.receivedCounter, metric.WithAttributes(state.attributes...)) + instruments.duration.Record(ctx, duration, metric.WithAttributes(state.attributes...)) }, } } @@ -217,11 +246,13 @@ func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc instruments := getInstruments(conn.Spec().IsClient) state := &streamingState{ - spec: conn.Spec(), - protocol: conn.Peer().Protocol, - attributes: attributes, - receiveSize: instruments.responseSize, - sendSize: instruments.requestSize, + spec: conn.Spec(), + protocol: conn.Peer().Protocol, + attributes: attributes, + receiveSizeMetric: instruments.requestSize, + sendSizeMetric: instruments.responseSize, + receiveSizes: []int64{}, + sendSizes: []int64{}, } streamingHandler := &streamingHandlerInterceptor{ StreamingHandlerConn: conn, @@ -239,18 +270,17 @@ func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc if err != nil { span.SetStatus(codes.Error, err.Error()) } - span.SetAttributes(state.attributes...) - instruments.requestsPerRPC.Record( - ctx, - state.receivedCounter, - metric.WithAttributes(state.attributes...)) - instruments.responsesPerRPC.Record( - ctx, - state.sentCounter, - metric.WithAttributes(state.attributes...)) - instruments.duration.Record(ctx, - time.Since(requestStartTime).Milliseconds(), - metric.WithAttributes(state.attributes...)) + duration := time.Since(requestStartTime).Milliseconds() + span.SetAttributes(append(state.attributes, + attribute.Int64(otelRPCRequestsPerRPCMetricName, state.receivedCounter), + attribute.Int64(otelRPCResponsesPerRPCMetricName, state.sentCounter), + attribute.Int64(otelRPCDurationMetricName, duration), + otelMessageSentSizesAttr.Int64Slice(state.sendSizes), + otelMessageReceivedSizesAttr.Int64Slice(state.receiveSizes), + )...) + instruments.requestsPerRPC.Record(ctx, state.receivedCounter, metric.WithAttributes(state.attributes...)) + instruments.responsesPerRPC.Record(ctx, state.sentCounter, metric.WithAttributes(state.attributes...)) + instruments.duration.Record(ctx, duration, metric.WithAttributes(state.attributes...)) return err } } @@ -273,23 +303,23 @@ type instrumentation struct { } func createInstruments(meter metric.Meter) instrumentation { - duration, err := meter.Int64Histogram("duration", metric.WithUnit("ms")) + duration, err := meter.Int64Histogram(otelRPCDurationMetricName, metric.WithUnit("ms"), metric.WithDescription("Duration of the RPC call")) if err != nil { panic(fmt.Errorf("failed to create duration metric: %w", err)) } - requestSize, err := meter.Int64Histogram("request.size", metric.WithUnit("By")) + requestSize, err := meter.Int64Histogram(otelRPCRequestSizeMetricName, metric.WithUnit("By"), metric.WithDescription("Size of the request payload")) if err != nil { panic(fmt.Errorf("failed to create request size metric: %w", err)) } - responseSize, err := meter.Int64Histogram("response.size", metric.WithUnit("By")) + responseSize, err := meter.Int64Histogram(otelRPCResponseSizeMetricName, metric.WithUnit("By"), metric.WithDescription("Size of the response payload")) if err != nil { panic(fmt.Errorf("failed to create response size metric: %w", err)) } - requestsPerRPC, err := meter.Int64Histogram("requests_per_rpc", metric.WithUnit("1")) + requestsPerRPC, err := meter.Int64Histogram(otelRPCRequestsPerRPCMetricName, metric.WithUnit("1"), metric.WithDescription("Number of requests made in the RPC call")) if err != nil { panic(fmt.Errorf("failed to create requests per rpc metric: %w", err)) } - responsesPerRPC, err := meter.Int64Histogram("responses_per_rpc", metric.WithUnit("1")) + responsesPerRPC, err := meter.Int64Histogram(otelRPCResponsesPerRPCMetricName, metric.WithUnit("1"), metric.WithDescription("Number of responses received in the RPC call")) if err != nil { panic(fmt.Errorf("failed to create responses per rpc metric: %w", err)) } @@ -313,15 +343,17 @@ func statusCodeAttribute(protocol string, err error) attribute.KeyValue { // streamingState stores the ongoing metrics for streaming interceptors. type streamingState struct { - mu sync.Mutex - spec connect.Spec - protocol string - attributes []attribute.KeyValue - error error - sentCounter int64 - receivedCounter int64 - receiveSize metric.Int64Histogram - sendSize metric.Int64Histogram + mu sync.Mutex + spec connect.Spec + protocol string + attributes []attribute.KeyValue + error error + sentCounter int64 + receivedCounter int64 + receiveSizeMetric metric.Int64Histogram + sendSizeMetric metric.Int64Histogram + receiveSizes []int64 + sendSizes []int64 } // streamingSenderReceiver encapsulates either a StreamingClientConn or a StreamingHandlerConn. @@ -337,23 +369,24 @@ func (s *streamingState) receive(ctx context.Context, msg any, conn streamingSen if errors.Is(err, io.EOF) { return err // nolint:wrapcheck } - s.receivedCounter++ if err != nil { s.error = err s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err)) } + s.receivedCounter++ attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic - otelMessageTypeAttr.String(otelMessageTypeReceived), - otelMessageIDAttr.Int64(s.receivedCounter), + otelMessageEventTypeAttr.String(otelMessageEventTypeReceived), + otelMessageEventIDAttr.Int64(s.receivedCounter), }...) if protomsg, ok := msg.(proto.Message); ok { size := proto.Size(protomsg) - attrs = append(attrs, otelMessageSizeAttr.Int(size)) - s.receiveSize.Record(ctx, int64(size), metric.WithAttributes(attrs...)) + attrs = append(attrs, otelMessageEventSizeAttr.Int(size)) + s.receiveSizes = append(s.receiveSizes, int64(size)) + s.receiveSizeMetric.Record(ctx, int64(size), metric.WithAttributes(attrs...)) } span := trace.SpanFromContext(ctx) - span.AddEvent(otelMessageEvent, trace.WithAttributes(attrs...)) + span.AddEvent(otelMessageEventName, trace.WithAttributes(attrs...)) return err // nolint:wrapcheck } @@ -370,17 +403,18 @@ func (s *streamingState) send(ctx context.Context, msg any, conn streamingSender s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err)) } attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic - otelMessageTypeAttr.String(otelMessageTypeSent), - otelMessageIDAttr.Int64(s.sentCounter), + otelMessageEventTypeAttr.String(otelMessageEventTypeSent), + otelMessageEventIDAttr.Int64(s.sentCounter), }...) if protomsg, ok := msg.(proto.Message); ok { size := proto.Size(protomsg) - attrs = append(attrs, otelMessageSizeAttr.Int(size)) - s.sendSize.Record(ctx, int64(size), metric.WithAttributes(attrs...)) + attrs = append(attrs, otelMessageEventSizeAttr.Int(size)) + s.sendSizes = append(s.sendSizes, int64(size)) + s.sendSizeMetric.Record(ctx, int64(size), metric.WithAttributes(attrs...)) } span := trace.SpanFromContext(ctx) - span.AddEvent(otelMessageEvent, trace.WithAttributes(attrs...)) + span.AddEvent(otelMessageEventName, trace.WithAttributes(attrs...)) return err // nolint:wrapcheck }