Skip to content

Commit

Permalink
Run v2 e2e integration tests for Kafka (jaegertracing#5782)
Browse files Browse the repository at this point in the history
## Description of the changes
- Update v2 e2e integration tests scripts for kafka
- Update v2 gh workflows for kafka

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: joeyyy09 <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
joeyyy09 and yurishkuro authored Jul 28, 2024
1 parent a8e98d9 commit a1fe858
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 42 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/ci-e2e-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ permissions: # added using https://github.com/step-security/secure-workflows
jobs:
kafka:
runs-on: ubuntu-latest
strategy:
matrix:
jaeger-version: [v1, v2] # Adjust if there are specific versions of Jaeger
name: Kafka Integration Tests ${{ matrix.jaeger-version }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6 # v2.8.1
Expand All @@ -30,9 +34,9 @@ jobs:
with:
go-version: 1.22.x

- name: Run kafka integration tests
- name: Run Kafka integration tests
id: test-execution
run: bash scripts/kafka-integration-test.sh -k
run: bash scripts/kafka-integration-test.sh -j ${{ matrix.jaeger-version }}

- name: Output Kafka logs on failure
run: docker compose -f ${{ steps.test-execution.outputs.docker_compose_file }} logs
Expand All @@ -42,4 +46,4 @@ jobs:
uses: ./.github/actions/upload-codecov
with:
files: cover.out
flags: kafka
flags: kafka-${{ matrix.jaeger-version }}
7 changes: 6 additions & 1 deletion cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ service:
receivers: [otlp, jaeger]
processors: [batch]
exporters: [kafka]

telemetry:
resource:
service.name: jaeger_collector
metrics:
level: detailed

receivers:
otlp:
protocols:
Expand Down
32 changes: 24 additions & 8 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ const otlpPort = 4317
// - At last, clean up anything declared in its own test functions.
// (e.g. close remote-storage)
type E2EStorageIntegration struct {
SkipStorageCleaner bool
integration.StorageIntegration
ConfigFile string

SkipStorageCleaner bool
ConfigFile string
HealthCheckEndpoint string
}

// e2eInitialize starts the Jaeger-v2 collector with the provided config file,
Expand All @@ -51,6 +53,11 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
if !s.SkipStorageCleaner {
configFile = createStorageCleanerConfig(t, s.ConfigFile, storage)
}

configFile, err := filepath.Abs(configFile)
require.NoError(t, err, "Failed to get absolute path of the config file")
require.FileExists(t, configFile, "Config file does not exist at the resolved path")

t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)

outFile, err := os.OpenFile(
Expand Down Expand Up @@ -79,22 +86,31 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
Stdout: outFile,
Stderr: errFile,
}
t.Logf("Running command: %v", cmd.Args)
require.NoError(t, cmd.Start())

// Wait for the binary to start and become ready to serve requests.
healthCheckEndpoint := s.HealthCheckEndpoint
if healthCheckEndpoint == "" {
healthCheckEndpoint = fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP)
}
require.Eventually(t, func() bool {
url := fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP)
t.Logf("Checking if Jaeger-v2 is available on %s", url)
t.Logf("Checking if Jaeger-v2 is available on %s", healthCheckEndpoint)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
require.NoError(t, err)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthCheckEndpoint, nil)
if err != nil {
t.Logf("HTTP request creation failed: %v", err)
return false
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Log(err)
t.Logf("HTTP request failed: %v", err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 30*time.Second, 500*time.Millisecond, "Jaeger-v2 did not start")
}, 60*time.Second, 3*time.Second, "Jaeger-v2 did not start")
t.Log("Jaeger-v2 is ready")
t.Cleanup(func() {
if err := cmd.Process.Kill(); err != nil {
Expand Down
13 changes: 11 additions & 2 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@ import (
func TestKafkaStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "kafka")

// TODO these config files use topic: "jaeger-spans",
// but for integration tests we want to use random topic in each run.
// https://github.com/jaegertracing/jaeger/blob/ed5cc2981c34158d0650cb96cb2fafcb753bea70/plugin/storage/integration/kafka_test.go#L50-L51
// Once OTEL Collector supports default values for env vars
// (https://github.com/open-telemetry/opentelemetry-collector/issues/5228)
// we can change the config to use topic: "${KAFKA_TOPIC:-jaeger-spans}"
// and export a KAFKA_TOPIC var with random topic name in the tests.

collectorConfig := "../../collector-with-kafka.yaml"
ingesterConfig := "../../ingester-remote-storage.yaml"

collector := &E2EStorageIntegration{
SkipStorageCleaner: true,
ConfigFile: collectorConfig,
SkipStorageCleaner: true,
ConfigFile: collectorConfig,
HealthCheckEndpoint: "http://localhost:8888/metrics",
}

// Initialize and start the collector
Expand Down
112 changes: 84 additions & 28 deletions scripts/kafka-integration-test.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,44 +1,100 @@
#!/bin/bash

set -e
set -euf -o pipefail

export STORAGE=kafka
compose_file="docker-compose/kafka-integration-test/docker-compose.yml"
echo "docker_compose_file=${compose_file}" >> "${GITHUB_OUTPUT:-/dev/null}"

# Check if the -k parameter is provided and start Kafka if it was
if [ "$1" == "-k" ]; then
echo "Starting Kafka using Docker Compose..."
docker compose -f "${compose_file}" up -d kafka
echo "docker_compose_file=${compose_file}" >> "${GITHUB_OUTPUT:-/dev/null}"
fi
jaeger_version=""
manage_kafka="true"

print_help() {
echo "Usage: $0 [-K] -j <jaeger_version>"
echo " -K: do not start or stop Kafka container (useful for local testing)"
echo " -j: major version of Jaeger to test (v1|v2)"
exit 1
}

parse_args() {
while getopts "j:Kh" opt; do
case "${opt}" in
j)
jaeger_version=${OPTARG}
;;
K)
manage_kafka="false"
;;
*)
print_help
;;
esac
done
if [ "$jaeger_version" != "v1" ] && [ "$jaeger_version" != "v2" ]; then
echo "Error: Invalid Jaeger version. Valid options are v1 or v2"
print_help
fi
}

setup_kafka() {
echo "Starting Kafka using Docker Compose..."
docker compose -f "${compose_file}" up -d kafka
}

teardown_kafka() {
echo "Stopping Kafka..."
docker compose -f "${compose_file}" down
}

# Check if Kafka is ready by attempting to list topics
is_kafka_ready() {
docker compose -f "${compose_file}" \
exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server localhost:9092 \
>/dev/null 2>&1
docker compose -f "${compose_file}" \
exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server localhost:9092 \
>/dev/null 2>&1
}

# Set the timeout in seconds
timeout=180
# Set the interval between checks in seconds
interval=5
# Calculate the end time
end_time=$((SECONDS + timeout))
wait_for_kafka() {
local timeout=180
local interval=5
local end_time=$((SECONDS + timeout))

while [ $SECONDS -lt $end_time ]; do
while [ $SECONDS -lt $end_time ]; do
if is_kafka_ready; then
break
return
fi
echo "Kafka broker not ready, waiting ${interval} seconds"
sleep $interval
done
done

if ! is_kafka_ready; then
echo "Timed out waiting for Kafka to start"
exit 1
fi
echo "Timed out waiting for Kafka to start"
exit 1
}

run_integration_test() {
export STORAGE=kafka
if [ "${jaeger_version}" = "v1" ]; then
make storage-integration-test
elif [ "${jaeger_version}" = "v2" ]; then
make jaeger-v2-storage-integration-test
else
echo "Unknown Jaeger version ${jaeger_version}."
print_help
fi
}

main() {
parse_args "$@"

echo "Executing Kafka integration test for version $2"
set -x

if [[ "$manage_kafka" == "true" ]]; then
setup_kafka
trap 'teardown_kafka' EXIT
fi
wait_for_kafka

run_integration_test
}

make storage-integration-test
main "$@"

0 comments on commit a1fe858

Please sign in to comment.