diff --git a/cmd/query/app/querysvc/adjuster/ipattribute.go b/cmd/query/app/querysvc/adjuster/ipattribute.go new file mode 100644 index 00000000000..6b14769e305 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/ipattribute.go @@ -0,0 +1,72 @@ +package adjuster + +import ( + "bytes" + "encoding/binary" + "strconv" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +var ipAttributesToCorrect = map[string]struct{}{ + "ip": {}, + "peer.ipv4": {}, +} + +// IPAttribute returns an adjuster that replaces numeric "ip" attributes, +// which usually contain IPv4 packed into uint32, with their string +// representation (e.g. "8.8.8.8""). +func IPAttribute() Adjuster { + adjustAttributes := func(attributes pcommon.Map) { + adjusted := make(map[string]string) + + attributes.Range(func(k string, v pcommon.Value) bool { + if _, ok := ipAttributesToCorrect[k]; !ok { + return true + } + var value uint32 + switch v.Type() { + case pcommon.ValueTypeInt: + //nolint: gosec // G115 + value = uint32(v.Int()) + case pcommon.ValueTypeDouble: + value = uint32(v.Double()) + default: + return true + } + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], value) + var sBuf bytes.Buffer + for i, b := range buf { + if i > 0 { + sBuf.WriteRune('.') + } + sBuf.WriteString(strconv.FormatUint(uint64(b), 10)) + } + adjusted[k] = sBuf.String() + return true + }) + + for k, v := range adjusted { + attributes.PutStr(k, v) + } + } + + return Func(func(traces ptrace.Traces) (ptrace.Traces, error) { + resourceSpans := traces.ResourceSpans() + for i := 0; i < resourceSpans.Len(); i++ { + rs := resourceSpans.At(i) + scopeSpans := rs.ScopeSpans() + for j := 0; j < scopeSpans.Len(); j++ { + ss := scopeSpans.At(j) + spans := ss.Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + adjustAttributes(span.Attributes()) + } + } + } + return traces, nil + }) +} diff --git a/cmd/query/app/querysvc/adjuster/ipattribute_test.go b/cmd/query/app/querysvc/adjuster/ipattribute_test.go new file mode 100644 index 00000000000..355e0b4c3fc --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/ipattribute_test.go @@ -0,0 +1,47 @@ +package adjuster + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestIPAttributeAdjuster(t *testing.T) { + traces := ptrace.NewTraces() + spans := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans() + + spanA := spans.AppendEmpty() + spanA.Attributes().PutInt("a", 42) + spanA.Attributes().PutStr("ip", "not integer") + spanA.Attributes().PutInt("ip", 1<<24|2<<16|3<<8|4) + spanA.Attributes().PutStr("peer.ipv4", "something") + + spanB := spans.AppendEmpty() + spanB.Attributes().PutDouble("ip", 1<<25|2<<16|3<<8|4) + + trace, err := IPAttribute().Adjust(traces) + require.NoError(t, err) + + span := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + attributesA := span.At(0).Attributes() + + val, ok := attributesA.Get("a") + require.True(t, ok) + require.EqualValues(t, 42, val.Int()) + + val, ok = attributesA.Get("ip") + require.True(t, ok) + require.EqualValues(t, "1.2.3.4", val.Str()) + + val, ok = attributesA.Get("peer.ipv4") + require.True(t, ok) + require.EqualValues(t, "something", val.Str()) + + attributesB := span.At(1).Attributes() + + val, ok = attributesB.Get("ip") + require.True(t, ok) + require.EqualValues(t, "2.2.3.4", val.Str()) + +}