Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka Spring starter smoke test #11262

Merged
merged 13 commits into from
May 14, 2024
22 changes: 22 additions & 0 deletions .github/graal-native-docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.10
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "22181:2181"

kafka:
image: confluentinc/cp-kafka:6.2.10
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
7 changes: 7 additions & 0 deletions .github/workflows/reusable-native-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@ jobs:
java-version: "21"
components: "native-image"
- name: Running test
env:
DOCKER_COMPOSE_TEST: "true"
run: |
echo "GRAALVM_HOME: $GRAALVM_HOME"
echo "JAVA_HOME: $JAVA_HOME"
java --version
native-image --version
# Testcontainers does not work in some cases with GraalVM native images,
# therefore we're starting a Kafka container manually for the tests
docker compose -f .github/graal-native-docker-compose.yaml up -d
# don't wait for startup - gradle compile takes long enough
./gradlew nativeTest
docker compose -f .github/graal-native-docker-compose.yaml down # is this needed?
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ dependencies {
testLibrary("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
exclude("org.junit.vintage", "junit-vintage-engine")
}
testImplementation("org.testcontainers:kafka")
testImplementation("javax.servlet:javax.servlet-api:3.1.0")
testImplementation("jakarta.servlet:jakarta.servlet-api:5.0.0")

Expand Down Expand Up @@ -151,10 +150,6 @@ tasks {
options.compilerArgs.add("-parameters")
}

test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
}

withType<Test>().configureEach {
systemProperty("testLatestDeps", latestDepTest)

Expand Down
5 changes: 4 additions & 1 deletion smoke-tests-otel-starter/spring-boot-2/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
runtimeOnly("com.h2database:h2")
implementation("org.apache.commons:commons-dbcp2")
implementation("org.springframework.kafka:spring-kafka") // not tested here, just make sure there are no warnings when it's included
implementation("org.springframework.kafka:spring-kafka")
implementation(platform(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES))

implementation(project(":smoke-tests-otel-starter:spring-boot-common"))

testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:kafka")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.spring.smoketest;

import org.junit.jupiter.api.condition.DisabledInNativeImage;

@DisabledInNativeImage // See GraalVmNativeKafkaSpringStarterSmokeTest for the GraalVM native test
public class KafkaSpringStarterSmokeTest extends AbstractJvmKafkaSpringStarterSmokeTest {}
5 changes: 4 additions & 1 deletion smoke-tests-otel-starter/spring-boot-3/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
runtimeOnly("com.h2database:h2")
implementation("org.apache.commons:commons-dbcp2")
implementation("org.springframework.kafka:spring-kafka") // not tested here, just make sure there are no warnings when it's included
implementation("org.springframework.kafka:spring-kafka")
implementation(platform(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES))

implementation(project(":smoke-tests-otel-starter:spring-boot-common"))

testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:kafka")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.spring.smoketest;

import org.junit.jupiter.api.condition.EnabledInNativeImage;
import org.springframework.boot.test.context.SpringBootTest;

/**
* GraalVM native image doesn't support Testcontainers in our case, so the docker container is
* started manually before running the tests.
*
* <p>In other cases, it does work, e.g. <a
* href="https://info.michael-simons.eu/2023/10/25/run-your-integration-tests-against-testcontainers-with-graalvm-native-image/">here</a>,
* it's not yet clear why it doesn't work in our case.
*
* <p>In CI, this is done in reusable-native-tests.yml. If you want to run the tests locally, you
* need to start the container manually: see .github/workflows/reusable-native-tests.yml for the
* command.
*/
@SpringBootTest(
classes = {
OtelSpringStarterSmokeTestApplication.class,
SpringSmokeOtelConfiguration.class,
AbstractKafkaSpringStarterSmokeTest.KafkaConfig.class
},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@EnabledInNativeImage // see JvmMongodbSpringStarterSmokeTest for the JVM test
@RequiresDockerComposeEnvVariable
public class GraalVmNativeKafkaSpringStarterSmokeTest extends AbstractKafkaSpringStarterSmokeTest {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.spring.smoketest;

import org.junit.jupiter.api.condition.DisabledInNativeImage;

@DisabledInNativeImage // See GraalVmNativeKafkaSpringStarterSmokeTest for the GraalVM native test
public class KafkaSpringStarterSmokeTest extends AbstractJvmKafkaSpringStarterSmokeTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ dependencies {
compileOnly("org.springframework.boot:spring-boot-starter-test")
compileOnly("org.springframework.boot:spring-boot-starter-data-jdbc")
compileOnly("org.apache.commons:commons-dbcp2")
compileOnly("org.springframework.kafka:spring-kafka")
compileOnly("org.testcontainers:junit-jupiter")
compileOnly("org.testcontainers:kafka")

api(project(":smoke-tests-otel-starter:spring-smoke-testing"))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.spring.smoketest;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.spring.autoconfigure.OpenTelemetryAutoConfiguration;
import io.opentelemetry.instrumentation.spring.autoconfigure.instrumentation.kafka.KafkaInstrumentationAutoConfiguration;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

/** Spring has a test container integration, but that doesn't work for Spring Boot 2 */
public class AbstractJvmKafkaSpringStarterSmokeTest extends AbstractKafkaSpringStarterSmokeTest {
static KafkaContainer kafka;

private ApplicationContextRunner contextRunner;

@BeforeAll
static void setUpKafka() {
kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.10"))
.withEnv("KAFKA_HEAP_OPTS", "-Xmx256m")
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1));
kafka.start();
}

@AfterAll
static void tearDownKafka() {
kafka.stop();
}

@BeforeEach
void setUpContext() {
contextRunner =
new ApplicationContextRunner()
.withAllowBeanDefinitionOverriding(true)
.withConfiguration(
AutoConfigurations.of(
OpenTelemetryAutoConfiguration.class,
SpringSmokeOtelConfiguration.class,
KafkaAutoConfiguration.class,
KafkaInstrumentationAutoConfiguration.class,
KafkaConfig.class))
.withPropertyValues(
"spring.kafka.bootstrap-servers=" + kafka.getBootstrapServers(),
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.linger-ms=10",
"spring.kafka.listener.idle-between-polls=1000",
"spring.kafka.producer.transaction-id-prefix=test-");
}

@SuppressWarnings("unchecked")
@Override
@Test
void shouldInstrumentProducerAndConsumer() {
contextRunner.run(
applicationContext -> {
testing = new SpringSmokeTestRunner(applicationContext.getBean(OpenTelemetry.class));
kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
super.shouldInstrumentProducerAndConsumer();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,98 +3,47 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.spring.autoconfigure.instrumentation.kafka;
package io.opentelemetry.spring.smoketest;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import java.time.Duration;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

class KafkaIntegrationTest {
abstract class AbstractKafkaSpringStarterSmokeTest extends AbstractSpringStarterSmokeTest {

@RegisterExtension
static final LibraryInstrumentationExtension testing = LibraryInstrumentationExtension.create();

static KafkaContainer kafka;

private ApplicationContextRunner contextRunner;

@BeforeAll
static void setUpKafka() {
kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.10"))
.withEnv("KAFKA_HEAP_OPTS", "-Xmx256m")
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1));
kafka.start();
}

@AfterAll
static void tearDownKafka() {
kafka.stop();
}

@BeforeEach
void setUpContext() {
contextRunner =
new ApplicationContextRunner()
.withConfiguration(
AutoConfigurations.of(
KafkaAutoConfiguration.class,
KafkaInstrumentationAutoConfiguration.class,
TestConfig.class))
.withBean("openTelemetry", OpenTelemetry.class, testing::getOpenTelemetry)
.withPropertyValues(
"spring.kafka.bootstrap-servers=" + kafka.getBootstrapServers(),
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.linger-ms=10",
"spring.kafka.listener.idle-between-polls=1000",
"spring.kafka.producer.transaction-id-prefix=test-");
}
@Autowired protected KafkaTemplate<String, String> kafkaTemplate;

@Test
void shouldInstrumentProducerAndConsumer() {
contextRunner.run(KafkaIntegrationTest::runShouldInstrumentProducerAndConsumer);
}

// In kafka 2 ops.send is deprecated. We are using it to avoid reflection because kafka 3 also has
// ops.send, although with different return type.
@SuppressWarnings({"unchecked", "deprecation"})
private static void runShouldInstrumentProducerAndConsumer(
ConfigurableApplicationContext applicationContext) {
KafkaTemplate<String, String> kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
testing.clearAllExportedData(); // ignore data from application startup

testing.runWithSpan(
"producer",
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testTopic", "10", "testSpan");
// return type is incompatible between Spring Boot 2 and 3
try {
ops.getClass()
.getDeclaredMethod("send", String.class, Object.class, Object.class)
.invoke(ops, "testTopic", "10", "testSpan");
} catch (Exception e) {
throw new IllegalStateException(e);
}
return 0;
});
});
Expand Down Expand Up @@ -128,7 +77,7 @@ private static void runShouldInstrumentProducerAndConsumer(
span.hasName("testTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
.hasAttributesSatisfying(
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
Expand All @@ -155,7 +104,9 @@ private static void runShouldInstrumentProducerAndConsumer(
}

@Configuration
static class TestConfig {
public static class KafkaConfig {

@Autowired OpenTelemetry openTelemetry;

@Bean
public NewTopic testTopic() {
Expand All @@ -164,7 +115,12 @@ public NewTopic testTopic() {

@KafkaListener(id = "testListener", topics = "testTopic")
public void listener(ConsumerRecord<String, String> record) {
testing.runWithSpan("consumer", () -> {});
openTelemetry
.getTracer("consumer", "1.0")
.spanBuilder("consumer")
.setSpanKind(SpanKind.CONSUMER)
.startSpan()
.end();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,12 @@ otel:
resource:
attributes:
attributeFromYaml: true # boolean will be automatically converted to string by spring

spring:
kafka:
consumer:
auto-offset-reset: earliest
listener:
idle-between-polls: 1000
producer:
transaction-id-prefix: test-
Loading
Loading