Skip to content

Commit

Permalink
Merge branch 'main' into es-kind
Browse files Browse the repository at this point in the history
  • Loading branch information
Manik2708 authored Dec 26, 2024
2 parents 02bff1c + b02900c commit b0da654
Show file tree
Hide file tree
Showing 35 changed files with 716 additions and 209 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/ci-e2e-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ jobs:
strategy:
fail-fast: false
matrix:
jaeger-version: [v1, v2] # Adjust if there are specific versions of Jaeger
name: kafka ${{ matrix.jaeger-version }}
jaeger-version: [v1, v2]
kafka-version: ["3.x", "2.x"]
name: kafka ${{matrix.kafka-version }} ${{ matrix.jaeger-version }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1
Expand All @@ -31,12 +32,12 @@ jobs:
with:
go-version: 1.23.x

- name: Run Kafka integration tests
- name: Run kafka integration tests
id: test-execution
run: bash scripts/kafka-integration-test.sh -j ${{ matrix.jaeger-version }}
run: bash scripts/kafka-integration-test.sh -j ${{ matrix.jaeger-version }} -v ${{ matrix.kafka-version }}

- name: Upload coverage to codecov
uses: ./.github/actions/upload-codecov
with:
files: cover.out
flags: kafka-${{ matrix.jaeger-version }}
flags: kafka-${{ matrix.kafka-version }}-${{ matrix.jaeger-version }}
12 changes: 10 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,17 @@ linters-settings:
disallow-otel-contrib-translator:
deny:
- pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger
desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
desc: "Use v1adapter package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
files:
- "!**/jptrace/**"
- "!**/v1adapter/**"

# TODO: remove once we have upgraded to Go 1.23
disallow-iter:
deny:
- pkg: iter
desc: "Use github.com/jaegertracing/jaeger/pkg/iter"
files:
- "**"

goimports:
local-prefixes: github.com/jaegertracing/jaeger
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var _ component.Host = (*otelHost)(nil) // API check
Expand Down Expand Up @@ -108,7 +108,7 @@ type consumerDelegate struct {
}

func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error {
batches := jptrace.ProtoFromTraces(td)
batches := v1adapter.ProtoFromTraces(td)
for _, batch := range batches {
err := c.batchConsumer.consume(ctx, batch)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/jaeger/internal/all-in-one.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ receivers:
endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:14250"
thrift_http:
endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:14268"
thrift_binary:
endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:6832"
thrift_compact:
endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:6831"

zipkin:
endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:9411"
Expand Down
8 changes: 6 additions & 2 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner"
"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const otlpPort = 4317
Expand Down Expand Up @@ -149,8 +150,9 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {

s.SpanWriter, err = createSpanWriter(logger, otlpPort)
require.NoError(t, err)
s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC)
spanReader, err := createSpanReader(logger, ports.QueryGRPC)
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)

t.Cleanup(func() {
// Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection.
Expand Down Expand Up @@ -207,7 +209,9 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool {
// e2eCleanUp closes the SpanReader and SpanWriter gRPC connection.
// This function should be called after all the tests are finished.
func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) {
require.NoError(t, s.SpanReader.(io.Closer).Close())
spanReader, err := v1adapter.GetV1Reader(s.TraceReader)
require.NoError(t, err)
require.NoError(t, spanReader.(io.Closer).Close())
require.NoError(t, s.SpanWriter.(io.Closer).Close())
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/integration/span_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var (
Expand Down Expand Up @@ -68,7 +68,7 @@ func (w *spanWriter) Close() error {
}

func (w *spanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
td := jptrace.ProtoToTraces([]*model.Batch{
td := v1adapter.V1BatchesToTraces([]*model.Batch{
{
Spans: []*model.Span{span},
Process: span.Process,
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/tailsampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ts *TailSamplingIntegration) testTailSamplingProccessor(t *testing.T) {
var actual []string
assert.Eventually(t, func() bool {
var err error
actual, err = ts.SpanReader.GetServices(context.Background())
actual, err = ts.TraceReader.GetServices(context.Background())
require.NoError(t, err)
sort.Strings(actual)
return assert.ObjectsAreEqualValues(ts.expectedServices, actual)
Expand Down
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/processors/adaptivesampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type traceProcessor struct {
Expand Down Expand Up @@ -65,7 +65,7 @@ func (tp *traceProcessor) close(context.Context) error {
}

func (tp *traceProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
batches := jptrace.ProtoFromTraces(td)
batches := v1adapter.ProtoFromTraces(td)
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/apiv3/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ package apiv3
import (
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

func modelToOTLP(spans []*model.Span) ptrace.Traces {
batch := &model.Batch{Spans: spans}
tr := jptrace.ProtoToTraces([]*model.Batch{batch})
tr := v1adapter.V1BatchesToTraces([]*model.Batch{batch})
return tr
}
4 changes: 2 additions & 2 deletions cmd/query/app/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

func otlp2traces(otlpSpans []byte) ([]*model.Trace, error) {
Expand All @@ -18,7 +18,7 @@ func otlp2traces(otlpSpans []byte) ([]*model.Trace, error) {
if err != nil {
return nil, fmt.Errorf("cannot unmarshal OTLP : %w", err)
}
jaegerBatches := jptrace.ProtoFromTraces(otlpTraces)
jaegerBatches := v1adapter.ProtoFromTraces(otlpTraces)
var traces []*model.Trace
traceMap := make(map[model.TraceID]*model.Trace)
for _, batch := range jaegerBatches {
Expand Down
2 changes: 1 addition & 1 deletion docker-compose/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,4 @@ services:
jaeger-remote-storage:
condition: service_healthy
links:
- jaeger-remote-storage
- jaeger-remote-storage
21 changes: 21 additions & 0 deletions docker-compose/kafka/v2/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
services:
zookeeper:
image: bitnami/zookeeper:3.9.3
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes

kafka:
image: bitnami/kafka:2.8.0
ports:
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=PLAINTEXT://:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
File renamed without changes.
55 changes: 55 additions & 0 deletions internal/jptrace/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package jptrace

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/pkg/iter"
)

// AggregateTraces aggregates a sequence of trace batches into individual traces.
//
// The `tracesSeq` input must adhere to the chunking requirements of tracestore.Reader.GetTraces.
func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptrace.Traces, error] {
return func(yield func(trace ptrace.Traces, err error) bool) {
currentTrace := ptrace.NewTraces()
currentTraceID := pcommon.NewTraceIDEmpty()

tracesSeq(func(traces []ptrace.Traces, err error) bool {
if err != nil {
yield(ptrace.NewTraces(), err)
return false
}
for _, trace := range traces {
resources := trace.ResourceSpans()
traceID := resources.At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
if currentTraceID == traceID {
mergeTraces(trace, currentTrace)
} else {
if currentTrace.ResourceSpans().Len() > 0 {
if !yield(currentTrace, nil) {
return false
}
}
currentTrace = trace
currentTraceID = traceID
}
}
return true
})
if currentTrace.ResourceSpans().Len() > 0 {
yield(currentTrace, nil)
}
}
}

func mergeTraces(src, dest ptrace.Traces) {
resources := src.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
resource := resources.At(i)
resource.CopyTo(dest.ResourceSpans().AppendEmpty())
}
}
Loading

0 comments on commit b0da654

Please sign in to comment.