Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][adjuster] Implement model to otlp translator with post processing #6397

Merged
merged 8 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,12 @@ linters-settings:
files:
- "**_test.go"

# TODO: enable after import is not used anywhere
# disallow-otel-contrib-translator:
# deny:
# - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger
# desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
# files:
# - "!**/jptrace/**"
disallow-otel-contrib-translator:
deny:
- pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger
desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
files:
- "!**/jptrace/**"

goimports:
local-prefixes: github.com/jaegertracing/jaeger
Expand Down
7 changes: 2 additions & 5 deletions cmd/jaeger/internal/integration/span_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"io"
"time"

jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down Expand Up @@ -68,15 +68,12 @@ func (w *spanWriter) Close() error {
}

func (w *spanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{
td := jptrace.ProtoToTraces([]*model.Batch{
{
Spans: []*model.Span{span},
Process: span.Process,
},
})
if err != nil {
return err
}

return w.exporter.ConsumeTraces(ctx, td)
}
5 changes: 2 additions & 3 deletions cmd/query/app/apiv3/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
package apiv3

import (
model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
)

func modelToOTLP(spans []*model.Span) ptrace.Traces {
batch := &model.Batch{Spans: spans}
// there is never an error returned from ProtoToTraces
tr, _ := model2otel.ProtoToTraces([]*model.Batch{batch})
tr := jptrace.ProtoToTraces([]*model.Batch{batch})
return tr
}
10 changes: 5 additions & 5 deletions cmd/query/app/querysvc/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (a *clockSkewAdjuster) buildNodesMap() {
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if _, exists := a.spans[span.SpanID()]; exists {
jptrace.AddWarning(span, warningDuplicateSpanID)
jptrace.AddWarnings(span, warningDuplicateSpanID)
} else {
a.spans[span.SpanID()] = &node{
span: span,
Expand All @@ -129,7 +129,7 @@ func (a *clockSkewAdjuster) buildSubGraphs() {
p.children = append(p.children, n)
} else {
warning := fmt.Sprintf(warningMissingParentSpanID, n.span.ParentSpanID())
jptrace.AddWarning(n.span, warning)
jptrace.AddWarnings(n.span, warning)
// treat spans with invalid parent ID as root spans
a.roots[n.span.SpanID()] = n
}
Expand Down Expand Up @@ -185,14 +185,14 @@ func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) {
}
if absDuration(skew.delta) > a.maxDelta {
if a.maxDelta == 0 {
jptrace.AddWarning(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta))
jptrace.AddWarnings(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta))
return
}
jptrace.AddWarning(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta))
jptrace.AddWarnings(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta))
return
}
n.span.SetStartTimestamp(pcommon.NewTimestampFromTime(n.span.StartTimestamp().AsTime().Add(skew.delta)))
jptrace.AddWarning(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta))
jptrace.AddWarnings(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta))
for i := 0; i < n.span.Events().Len(); i++ {
event := n.span.Events().At(i)
event.SetTimestamp(pcommon.NewTimestampFromTime(event.Timestamp().AsTime().Add(skew.delta)))
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) {
hashTrace,
)
if err != nil {
jptrace.AddWarning(span, fmt.Sprintf("failed to compute hash code: %v", err))
jptrace.AddWarnings(span, fmt.Sprintf("failed to compute hash code: %v", err))
span.CopyTo(dedupedSpans.AppendEmpty())
continue
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/libraryattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (ResourceAttributesAdjuster) moveAttributes(span ptrace.Span, resource pcom
for k, v := range replace {
existing, ok := resource.Attributes().Get(k)
if ok && existing.AsRaw() != v.AsRaw() {
jptrace.AddWarning(span, "conflicting values between Span and Resource for attribute "+k)
jptrace.AddWarnings(span, "conflicting values between Span and Resource for attribute "+k)
continue
}
v.CopyTo(resource.Attributes().PutEmpty(k))
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/spaniduniquifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *spanIDDeduper) uniquifyServerSpanIDs(traces ptrace.Traces) {
if span.Kind() == ptrace.SpanKindServer && d.isSharedWithClientSpan(span.SpanID()) {
newID, err := d.makeUniqueSpanID()
if err != nil {
jptrace.AddWarning(span, err.Error())
jptrace.AddWarnings(span, err.Error())
continue
}
oldToNewSpanIDs[span.SpanID()] = newID
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/spanlinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (la LinksAdjuster) adjust(span ptrace.Span) {
newLink := validLinks.AppendEmpty()
link.CopyTo(newLink)
} else {
jptrace.AddWarning(span, invalidSpanLinkWarning)
jptrace.AddWarnings(span, invalidSpanLinkWarning)
}
}
validLinks.CopyTo(span.Links())
Expand Down
55 changes: 49 additions & 6 deletions internal/jptrace/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package jptrace

import (
jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
Expand All @@ -15,10 +16,19 @@ import (
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
batches := jaegerTranslator.ProtoFromTraces(traces)
spanMap := createSpanMapFromBatches(batches)
transferWarnings(traces, spanMap)
transferWarningsToModelSpans(traces, spanMap)
return batches
}

// ProtoToTraces converts Jaeger model batches ([]*model.Batch)
// to OpenTelemetry traces (ptrace.Traces).
func ProtoToTraces(batches []*model.Batch) ptrace.Traces {
traces, _ := jaegerTranslator.ProtoToTraces(batches) // never returns an error
spanMap := createSpanMapFromTraces(traces)
transferWarningsToOTLPSpans(batches, spanMap)
return traces
}

func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span {
spanMap := make(map[model.SpanID]*model.Span)
for _, batch := range batches {
Expand All @@ -29,7 +39,23 @@ func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Sp
return spanMap
}

func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) {
func createSpanMapFromTraces(traces ptrace.Traces) map[pcommon.SpanID]ptrace.Span {
spanMap := make(map[pcommon.SpanID]ptrace.Span)
resources := traces.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
for j := 0; j < scopes.Len(); j++ {
spans := scopes.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
spanMap[span.SpanID()] = span
}
}
}
return spanMap
}

func transferWarningsToModelSpans(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) {
resources := traces.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
Expand All @@ -38,10 +64,27 @@ func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span
for k := 0; k < spans.Len(); k++ {
otelSpan := spans.At(k)
warnings := GetWarnings(otelSpan)
span := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]
span.Warnings = append(span.Warnings, warnings...)
// filter out the warning tag
span.Tags = filterTags(span.Tags, warningsAttribute)
if len(warnings) == 0 {
continue
}
if span, ok := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]; ok {
span.Warnings = append(span.Warnings, warnings...)
// filter out the warning tag
span.Tags = filterTags(span.Tags, warningsAttribute)
}
}
}
}
}

func transferWarningsToOTLPSpans(batches []*model.Batch, spanMap map[pcommon.SpanID]ptrace.Span) {
for _, batch := range batches {
for _, span := range batch.Spans {
if len(span.Warnings) == 0 {
continue
}
if otelSpan, ok := spanMap[span.SpanID.ToOTELSpanID()]; ok {
AddWarnings(otelSpan, span.Warnings...)
}
}
}
Expand Down
66 changes: 63 additions & 3 deletions internal/jptrace/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) {
span1 := ss1.Spans().AppendEmpty()
span1.SetName("test-span-1")
span1.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
AddWarning(span1, "test-warning-1")
AddWarning(span1, "test-warning-2")
AddWarnings(span1, "test-warning-1")
AddWarnings(span1, "test-warning-2")
span1.Attributes().PutStr("key", "value")

ss2 := rs1.ScopeSpans().AppendEmpty()
Expand All @@ -34,7 +34,7 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) {
span3 := ss3.Spans().AppendEmpty()
span3.SetName("test-span-3")
span3.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24}))
AddWarning(span3, "test-warning-3")
AddWarnings(span3, "test-warning-3")

batches := ProtoFromTraces(traces)

Expand All @@ -53,3 +53,63 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) {
assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings)
assert.Empty(t, batches[1].Spans[0].Tags)
}

func TestProtoToTraces_AddsWarnings(t *testing.T) {
batch1 := &model.Batch{
Process: &model.Process{
ServiceName: "batch-1",
},
Spans: []*model.Span{
{
OperationName: "test-span-1",
SpanID: model.NewSpanID(1),
Warnings: []string{"test-warning-1", "test-warning-2"},
},
{
OperationName: "test-span-2",
SpanID: model.NewSpanID(2),
},
},
}
batch2 := &model.Batch{
Process: &model.Process{
ServiceName: "batch-2",
},
Spans: []*model.Span{
{
OperationName: "test-span-3",
SpanID: model.NewSpanID(3),
Warnings: []string{"test-warning-3"},
},
},
}
batches := []*model.Batch{batch1, batch2}
traces := ProtoToTraces(batches)

assert.Equal(t, 2, traces.ResourceSpans().Len())

rs1 := traces.ResourceSpans().At(0)
assert.Equal(t, 1, rs1.ScopeSpans().Len())
ss1 := rs1.ScopeSpans().At(0)
assert.Equal(t, 2, ss1.Spans().Len())

span1 := ss1.Spans().At(0)
assert.Equal(t, "test-span-1", span1.Name())
assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), span1.SpanID())
assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, GetWarnings(span1))

span2 := ss1.Spans().At(1)
assert.Equal(t, "test-span-2", span2.Name())
assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 2}), span2.SpanID())
assert.Empty(t, GetWarnings(span2))

rs2 := traces.ResourceSpans().At(1)
assert.Equal(t, 1, rs2.ScopeSpans().Len())
ss3 := rs2.ScopeSpans().At(0)
assert.Equal(t, 1, ss3.Spans().Len())

span3 := ss3.Spans().At(0)
assert.Equal(t, "test-span-3", span3.Name())
assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}), span3.SpanID())
assert.Equal(t, []string{"test-warning-3"}, GetWarnings(span3))
}
12 changes: 7 additions & 5 deletions internal/jptrace/warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ const (
warningsAttribute = "jaeger.internal.warnings"
)

func AddWarning(span ptrace.Span, warning string) {
var warnings pcommon.Slice
func AddWarnings(span ptrace.Span, warnings ...string) {
var w pcommon.Slice
if currWarnings, ok := span.Attributes().Get(warningsAttribute); ok {
warnings = currWarnings.Slice()
w = currWarnings.Slice()
} else {
warnings = span.Attributes().PutEmptySlice(warningsAttribute)
w = span.Attributes().PutEmptySlice(warningsAttribute)
}
for _, warning := range warnings {
w.AppendEmpty().SetStr(warning)
}
warnings.AppendEmpty().SetStr(warning)
}

func GetWarnings(span ptrace.Span) []string {
Expand Down
12 changes: 11 additions & 1 deletion internal/jptrace/warning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ func TestAddWarning(t *testing.T) {
warnings.AppendEmpty().SetStr(warn)
}
}
AddWarning(span, test.newWarn)
AddWarnings(span, test.newWarn)
warnings, ok := attrs.Get("jaeger.internal.warnings")
assert.True(t, ok)
assert.Equal(t, len(test.expected), warnings.Slice().Len())
Expand All @@ -57,6 +58,15 @@ func TestAddWarning(t *testing.T) {
}
}

func TestAddWarning_MultipleWarnings(t *testing.T) {
span := ptrace.NewSpan()
AddWarnings(span, "warning-1", "warning-2")
warnings, ok := span.Attributes().Get("jaeger.internal.warnings")
require.True(t, ok)
require.Equal(t, "warning-1", warnings.Slice().At(0).Str())
require.Equal(t, "warning-2", warnings.Slice().At(1).Str())
}

func TestGetWarnings(t *testing.T) {
tests := []struct {
name string
Expand Down
8 changes: 4 additions & 4 deletions storage_v2/v1adapter/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"context"
"errors"

model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand Down Expand Up @@ -60,8 +60,8 @@ func (tr *TraceReader) GetTraces(
return
}
batch := &model.Batch{Spans: t.GetSpans()}
tr, err := model2otel.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{tr}, err) || err != nil {
tr := jptrace.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{tr}, nil) {
return
}
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (tr *TraceReader) FindTraces(
}
for _, trace := range traces {
batch := &model.Batch{Spans: trace.GetSpans()}
otelTrace, _ := model2otel.ProtoToTraces([]*model.Batch{batch})
otelTrace := jptrace.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{otelTrace}, nil) {
return
}
Expand Down
Loading