Skip to content

Commit

Permalink
[v2] Add legacy formats into e2e kafka integration tests (#5824)
Browse files Browse the repository at this point in the history
## Description of the changes
- Add Kafka tests for legacy Jaeger formats, because people migrating
from v1 are not able to start using otlp in v1

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: joeyyy09 <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
3 people authored Aug 18, 2024
1 parent 4e6c510 commit 34a8aa5
Showing 1 changed file with 100 additions and 29 deletions.
129 changes: 100 additions & 29 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,117 @@
package integration

import (
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
)

// createConfigWithEncoding rewrites the base configuration files to use the given encoding
// and Kafka topic which are varied between the test runs.
//
// Once OTEL Collector supports default values for env vars
// (https://github.com/open-telemetry/opentelemetry-collector/issues/5228)
// we can change the config to use topic: "${KAFKA_TOPIC:-jaeger-spans}"
// and export a KAFKA_TOPIC var with random topic name in the tests.
func createConfigWithEncoding(t *testing.T, configFile string, targetEncoding string, uniqueTopic string) string {
data, err := os.ReadFile(configFile)
require.NoError(t, err, "Failed to read config file: %s", configFile)

var config map[string]any
err = yaml.Unmarshal(data, &config)
require.NoError(t, err, "Failed to unmarshal YAML data from config file: %s", configFile)

// Function to recursively search and replace the encoding and topic
var replaceEncodingAndTopic func(m map[string]any) int
replaceEncodingAndTopic = func(m map[string]any) int {
replacements := 0
for k, v := range m {
if k == "encoding" {
oldEncoding := v.(string)
m[k] = targetEncoding
t.Logf("Replaced encoding '%s' with '%s' in key: %s", oldEncoding, targetEncoding, k)
replacements++
} else if k == "topic" {
oldTopic := v.(string)
m[k] = uniqueTopic
t.Logf("Replaced topic '%s' with '%s' in key: %s", oldTopic, uniqueTopic, k)
replacements++
} else if subMap, ok := v.(map[string]any); ok {
replacements += replaceEncodingAndTopic(subMap)
}
}
return replacements
}

totalReplacements := replaceEncodingAndTopic(config)
require.Equal(t, 2, totalReplacements, "Expected exactly 2 replacements (encoding and topic), but got %d", totalReplacements)

newData, err := yaml.Marshal(config)
require.NoError(t, err, "Failed to marshal YAML data after encoding replacement")

tempFile := filepath.Join(t.TempDir(), fmt.Sprintf("config_%s.yaml", targetEncoding))
err = os.WriteFile(tempFile, newData, 0o600)
require.NoError(t, err, "Failed to write updated config file to: %s", tempFile)

t.Logf("Configuration file with encoding '%s' and topic '%s' written to: %s", targetEncoding, uniqueTopic, tempFile)

return tempFile
}

func TestKafkaStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "kafka")

// TODO these config files use topic: "jaeger-spans",
// but for integration tests we want to use random topic in each run.
// https://github.com/jaegertracing/jaeger/blob/ed5cc2981c34158d0650cb96cb2fafcb753bea70/plugin/storage/integration/kafka_test.go#L50-L51
// Once OTEL Collector supports default values for env vars
// (https://github.com/open-telemetry/opentelemetry-collector/issues/5228)
// we can change the config to use topic: "${KAFKA_TOPIC:-jaeger-spans}"
// and export a KAFKA_TOPIC var with random topic name in the tests.

collectorConfig := "../../collector-with-kafka.yaml"
ingesterConfig := "../../ingester-remote-storage.yaml"

collector := &E2EStorageIntegration{
SkipStorageCleaner: true,
ConfigFile: collectorConfig,
HealthCheckEndpoint: "http://localhost:8888/metrics",
tests := []struct {
encoding string
skip string
}{
{encoding: "otlp_proto"},
{encoding: "otlp_json", skip: "not supported: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33627"},
{encoding: "jaeger_proto"},
{encoding: "jaeger_json"},
}

// Initialize and start the collector
collector.e2eInitialize(t, "kafka")
for _, test := range tests {
t.Run(test.encoding, func(t *testing.T) {
if test.skip != "" {
t.Skip(test.skip)
}
uniqueTopic := fmt.Sprintf("jaeger-spans-%d", time.Now().UnixNano())
t.Logf("Using unique Kafka topic: %s", uniqueTopic)

ingester := &E2EStorageIntegration{
ConfigFile: ingesterConfig,
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
},
}
// Unlike the other storage tests where "collector" has access to the storage,
// here we have two distinct binaries, collector and ingester, and only the ingester
// has access to the storage and allows the test to query it.
// We reuse E2EStorageIntegration struct to manage lifecycle of the collector,
// but the tests are run against the ingester.

collector := &E2EStorageIntegration{
SkipStorageCleaner: true,
ConfigFile: createConfigWithEncoding(t, "../../collector-with-kafka.yaml", test.encoding, uniqueTopic),
HealthCheckEndpoint: "http://localhost:8888/metrics",
}
collector.e2eInitialize(t, "kafka")
t.Log("Collector initialized")

// Initialize and start the ingester
ingester.e2eInitialize(t, "kafka")
ingester := &E2EStorageIntegration{
ConfigFile: createConfigWithEncoding(t, "../../ingester-remote-storage.yaml", test.encoding, uniqueTopic),
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
},
}
ingester.e2eInitialize(t, "kafka")
t.Log("Ingester initialized")

// Run the span store tests
ingester.RunSpanStoreTests(t)
ingester.RunSpanStoreTests(t)
})
}
}

0 comments on commit 34a8aa5

Please sign in to comment.