Skip to content

Commit

Permalink
fix: kafka container config
Browse files Browse the repository at this point in the history
  • Loading branch information
yurimssilva committed Sep 8, 2023
1 parent a146759 commit 11ff02a
Showing 1 changed file with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,18 @@
@EndToEndTest
public class Streaming02KafkaToHttpTest {

private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka";
private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0";
private static final String TOPIC = "kafka-stream-topic";
private static final String MAX_DURATION = "PT30S";
private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-02-kafka-to-http";
private static final Duration TIMEOUT = Duration.ofSeconds(30);
@Container
public static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName
.parse(KAFKA_IMAGE_NAME)).withEnv("KAFKA_CREATE_TOPICS", TOPIC.concat(":1:1"));

private static final Participant PROVIDER = Participant.Builder.newInstance()
.name("provider")
.id("provider")
.managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management")))
.protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol")))
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control")))
.build();

private static final Participant CONSUMER = Participant.Builder.newInstance()
.name("consumer")
.id("consumer")
Expand All @@ -74,6 +69,12 @@ public class Streaming02KafkaToHttpTest {
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control")))
.build();

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withKraft()
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.withEnv("KAFKA_CREATE_TOPICS", TOPIC.concat(":1:1"));

@RegisterExtension
static EdcRuntimeExtension providerConnector = new EdcRuntimeExtension(
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime",
Expand All @@ -86,7 +87,7 @@ public class Streaming02KafkaToHttpTest {
@RegisterExtension
static EdcRuntimeExtension consumerConnector = new EdcRuntimeExtension(
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime",
"provider",
"consumer",
Map.of(
"edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/consumer.properties").getAbsolutePath()
)
Expand All @@ -96,7 +97,6 @@ public class Streaming02KafkaToHttpTest {

@BeforeEach
void setUp() throws IOException {
KAFKA_CONTAINER.start();
consumerReceiverServer.start(httpReceiverPort);
}

Expand All @@ -106,7 +106,7 @@ void streamData() {
PROVIDER.registerDataPlane(List.of("Kafka"), List.of("HttpData"));

PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/1-asset.json")
.replace("{{bootstrap.servers}}", KAFKA_CONTAINER.getBootstrapServers())
.replace("{{bootstrap.servers}}", kafkaContainer.getBootstrapServers())
.replace("{{max.duration}}", MAX_DURATION)
.replace("{{topic}}", TOPIC));
PROVIDER.createPolicyDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-policy-definition.json"));
Expand Down Expand Up @@ -140,7 +140,7 @@ void streamData() {

private Producer<String, String> createKafkaProducer() {
var props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
Expand Down

0 comments on commit 11ff02a

Please sign in to comment.