Skip to content

Commit

Permalink
[v2][adjuster] Implement function to get standard adjusters to operat…
Browse files Browse the repository at this point in the history
…e on otlp format (#6396)

## Which problem is this PR solving?
- Towards #6344 

## Description of the changes
- Implemented a function `StandardAdjusters` that returns a list of
adjusters to be applied on ptrace.Traces
- This will be used by the v2 query service in
#6343

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Dec 23, 2024
1 parent 4ecb086 commit c8a1548
Show file tree
Hide file tree
Showing 16 changed files with 78 additions and 30 deletions.
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
warningSkewAdjustDisabled = "clock skew adjustment disabled; not applying calculated delta of %v"
)

// ClockSkew returns an Adjuster that corrects span timestamps for clock skew.
// CorrectClockSkew returns an Adjuster that corrects span timestamps for clock skew.
//
// This adjuster modifies the start and log timestamps of child spans that are
// inconsistent with their parent spans due to clock differences between hosts.
Expand All @@ -36,7 +36,7 @@ const (
// Parameters:
// - maxDelta: The maximum allowable time adjustment. Adjustments exceeding
// this value will be ignored.
func ClockSkew(maxDelta time.Duration) Adjuster {
func CorrectClockSkew(maxDelta time.Duration) Adjuster {
return Func(func(traces ptrace.Traces) {
adjuster := &clockSkewAdjuster{
traces: traces,
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/clockskew_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestClockSkewAdjuster(t *testing.T) {
testCase := tt // capture loop var
t.Run(testCase.description, func(t *testing.T) {
trace := makeTrace(testCase.trace)
adjuster := ClockSkew(tt.maxAdjust)
adjuster := CorrectClockSkew(tt.maxAdjust)
adjuster.Adjust(trace)

var gotErr string
Expand Down
6 changes: 3 additions & 3 deletions cmd/query/app/querysvc/adjuster/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var _ Adjuster = (*SpanHashDeduper)(nil)

// SpanHash creates an adjuster that deduplicates spans by removing all but one span
// DeduplicateSpans creates an adjuster that deduplicates spans by removing all but one span
// with the same hash code. This is particularly useful for scenarios where spans
// may be duplicated during archival, such as with ElasticSearch archival.
//
Expand All @@ -23,8 +23,8 @@ var _ Adjuster = (*SpanHashDeduper)(nil)
//
// To ensure consistent hash codes, this adjuster should be executed after
// SortAttributesAndEvents, which normalizes the order of collections within the span.
func SpanHash() SpanHashDeduper {
return SpanHashDeduper{
func DeduplicateSpans() *SpanHashDeduper {
return &SpanHashDeduper{
marshaler: &ptrace.ProtoMarshaler{},
}
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/query/app/querysvc/adjuster/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
)

func TestSpanHash_EmptySpans(t *testing.T) {
adjuster := SpanHash()
adjuster := DeduplicateSpans()
input := ptrace.NewTraces()
expected := ptrace.NewTraces()
adjuster.Adjust(input)
assert.Equal(t, expected, input)
}

func TestSpanHash_RemovesDuplicateSpans(t *testing.T) {
adjuster := SpanHash()
adjuster := DeduplicateSpans()
input := func() ptrace.Traces {
traces := ptrace.NewTraces()

Expand Down Expand Up @@ -126,7 +126,7 @@ func TestSpanHash_RemovesDuplicateSpans(t *testing.T) {
}

func TestSpanHash_NoDuplicateSpans(t *testing.T) {
adjuster := SpanHash()
adjuster := DeduplicateSpans()
input := func() ptrace.Traces {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestSpanHash_NoDuplicateSpans(t *testing.T) {
}

func TestSpanHash_DuplicateSpansDifferentScopeAttributes(t *testing.T) {
adjuster := SpanHash()
adjuster := DeduplicateSpans()
input := func() ptrace.Traces {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestSpanHash_DuplicateSpansDifferentScopeAttributes(t *testing.T) {
}

func TestSpanHash_DuplicateSpansDifferentResourceAttributes(t *testing.T) {
adjuster := SpanHash()
adjuster := DeduplicateSpans()
input := func() ptrace.Traces {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/adjuster/ipattribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ var ipAttributesToCorrect = map[string]struct{}{
"peer.ipv4": {},
}

// IPAttribute returns an adjuster that replaces numeric "ip" attributes,
// NormalizeIPAttributes returns an adjuster that replaces numeric "ip" attributes,
// which usually contain IPv4 packed into uint32, with their string
// representation (e.g. "8.8.8.8"").
func IPAttribute() IPAttributeAdjuster {
func NormalizeIPAttributes() IPAttributeAdjuster {
return IPAttributeAdjuster{}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/ipattribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestIPAttributeAdjuster(t *testing.T) {
}
}

IPAttribute().Adjust(traces)
NormalizeIPAttributes().Adjust(traces)

resourceSpan := traces.ResourceSpans().At(0)
assert.Equal(t, 3, resourceSpan.Resource().Attributes().Len())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ var libraryKeys = map[string]struct{}{
string(otelsemconv.TelemetryDistroVersionKey): {},
}

// ResourceAttributes creates an adjuster that moves the OpenTelemetry library
// MoveLibraryAttributes creates an adjuster that moves the OpenTelemetry library
// attributes from spans to the parent resource so that the UI can
// display them separately under Process.
// https://github.com/jaegertracing/jaeger/issues/4534
func ResourceAttributes() ResourceAttributesAdjuster {
func MoveLibraryAttributes() ResourceAttributesAdjuster {
return ResourceAttributesAdjuster{}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestResourceAttributesAdjuster_SpanWithLibraryAttributes(t *testing.T) {
span.Attributes().PutStr(string(otelsemconv.TelemetryDistroVersionKey), "blah")
span.Attributes().PutStr("another_key", "another_value")

adjuster := ResourceAttributes()
adjuster := MoveLibraryAttributes()
adjuster.Adjust(traces)

resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestResourceAttributesAdjuster_SpanWithoutLibraryAttributes(t *testing.T) {
span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.Attributes().PutStr("random_key", "random_value")

adjuster := ResourceAttributes()
adjuster := MoveLibraryAttributes()
adjuster.Adjust(traces)

resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
Expand All @@ -85,7 +85,7 @@ func TestResourceAttributesAdjuster_SpanWithConflictingLibraryAttributes(t *test
span.Attributes().PutStr("random_key", "random_value")
span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Java")

adjuster := ResourceAttributes()
adjuster := MoveLibraryAttributes()
adjuster.Adjust(traces)

resultSpan := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestResourceAttributesAdjuster_SpanWithNonConflictingLibraryAttributes(t *t
span.Attributes().PutStr("random_key", "random_value")
span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Go")

adjuster := ResourceAttributes()
adjuster := MoveLibraryAttributes()
adjuster.Adjust(traces)

resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/adjuster/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

var _ Adjuster = (*SortAttributesAndEventsAdjuster)(nil)

// SortAttributesAndEvents creates an adjuster that standardizes trace data by sorting elements:
// SortCollections creates an adjuster that standardizes trace data by sorting elements:
// - Resource attributes are sorted lexicographically by their keys.
// - Scope attributes are sorted lexicographically by their keys.
// - Span attributes are sorted lexicographically by their keys.
// - Span events are sorted lexicographically by their names.
// - Attributes within each span event are sorted lexicographically by their keys.
// - Attributes within each span link are sorted lexicographically by their keys.
func SortAttributesAndEvents() SortAttributesAndEventsAdjuster {
func SortCollections() SortAttributesAndEventsAdjuster {
return SortAttributesAndEventsAdjuster{}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TestSortAttributesAndEventsAdjuster(t *testing.T) {
adjuster := SortAttributesAndEvents()
adjuster := SortCollections()
input := func() ptrace.Traces {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/adjuster/spaniduniquifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (

var errTooManySpans = errors.New("cannot assign unique span ID, too many spans in the trace")

// SpanIDUniquifier returns an adjuster that changes span ids for server
// DeduplicateClientServerSpanIDs returns an adjuster that changes span ids for server
// spans (i.e. spans with tag: span.kind == server) if there is another
// client span that shares the same span ID. This is needed to deal with
// Zipkin-style clients that reuse the same span ID for both client and server
// side of an RPC call. Jaeger UI expects all spans to have unique IDs.
//
// Any issues encountered during adjustment are recorded as warnings in the
// span.
func SpanIDUniquifier() Adjuster {
func DeduplicateClientServerSpanIDs() Adjuster {
return Func(func(traces ptrace.Traces) {
adjuster := spanIDDeduper{
spansByID: make(map[pcommon.SpanID][]ptrace.Span),
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func makeTraces() ptrace.Traces {

func TestSpanIDUniquifierTriggered(t *testing.T) {
traces := makeTraces()
deduper := SpanIDUniquifier()
deduper := DeduplicateClientServerSpanIDs()
deduper.Adjust(traces)

spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
Expand All @@ -73,7 +73,7 @@ func TestSpanIDUniquifierNotTriggered(t *testing.T) {
spans.At(2).CopyTo(newSpans.AppendEmpty())
newSpans.CopyTo(spans)

deduper := SpanIDUniquifier()
deduper := DeduplicateClientServerSpanIDs()
deduper.Adjust(traces)

gotSpans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/adjuster/spanlinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ const (

var _ Adjuster = (*LinksAdjuster)(nil)

// SpanLinks creates an adjuster that removes span links with empty trace IDs.
func SpanLinks() LinksAdjuster {
// RemoveEmptySpanLinks creates an adjuster that removes span links with empty trace IDs.
func RemoveEmptySpanLinks() LinksAdjuster {
return LinksAdjuster{}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/spanlinks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestLinksAdjuster(t *testing.T) {
spanC.Links().AppendEmpty().SetTraceID(pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0}))
spanC.Links().AppendEmpty().SetTraceID(pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}))

SpanLinks().Adjust(traces)
RemoveEmptySpanLinks().Adjust(traces)
spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans()

gotSpansA := spans.At(0)
Expand Down
23 changes: 23 additions & 0 deletions cmd/query/app/querysvc/adjuster/standard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"time"
)

// StandardAdjusters returns a list of adjusters applied by the query service
// before returning the data to the API clients.
func StandardAdjusters(maxClockSkewAdjust time.Duration) []Adjuster {
return []Adjuster{
DeduplicateClientServerSpanIDs(),
SortCollections(),
// DeduplicateSpans depends on SortCollections running first
DeduplicateSpans(),
CorrectClockSkew(maxClockSkewAdjust),
NormalizeIPAttributes(),
MoveLibraryAttributes(),
RemoveEmptySpanLinks(),
}
}
25 changes: 25 additions & 0 deletions cmd/query/app/querysvc/adjuster/standard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestStandardAdjusters(t *testing.T) {
maxClockSkewAdjust := 10 * time.Second
adjusters := StandardAdjusters(maxClockSkewAdjust)

assert.Len(t, adjusters, 7, "Expected 7 adjusters")
assert.IsType(t, DeduplicateClientServerSpanIDs(), adjusters[0])
assert.IsType(t, SortCollections(), adjusters[1])
assert.IsType(t, DeduplicateSpans(), adjusters[2])
assert.IsType(t, CorrectClockSkew(maxClockSkewAdjust), adjusters[3])
assert.IsType(t, NormalizeIPAttributes(), adjusters[4])
assert.IsType(t, MoveLibraryAttributes(), adjusters[5])
assert.IsType(t, RemoveEmptySpanLinks(), adjusters[6])
}

0 comments on commit c8a1548

Please sign in to comment.