Skip to content

Commit

Permalink
[e2e] Fix unstable e2e tests with Kafka service (apache#2303)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Nov 13, 2023
1 parent e9bfc68 commit 194bb75
Showing 1 changed file with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -109,6 +110,7 @@ public void before() throws Exception {
for (String s : kafkaServices) {
environment.withLogConsumer(s + "_1", new Slf4jLogConsumer(LOG));
}
environment.waitingFor("kafka_1", buildWaitStrategy(".*Recorded new controller.*", 2));
}
if (withHive) {
List<String> hiveServices =
Expand All @@ -122,11 +124,8 @@ public void before() throws Exception {
for (String s : hiveServices) {
environment.withLogConsumer(s + "_1", new Slf4jLogConsumer(LOG));
}
// Increase timeout from 60s (default value) to 180s
environment.waitingFor(
"hive-server_1",
Wait.forLogMessage(".*Starting HiveServer2.*", 1)
.withStartupTimeout(Duration.ofSeconds(180)));
"hive-server_1", buildWaitStrategy(".*Starting HiveServer2.*", 1));
}
if (withSpark) {
List<String> sparkServices = Arrays.asList("spark-master", "spark-worker");
Expand All @@ -136,22 +135,26 @@ public void before() throws Exception {
}
environment.waitingFor(
"spark-master_1",
Wait.forLogMessage(
buildWaitStrategy(
".*Master: I have been elected leader! New state: ALIVE.*", 1));
}
environment.withServices(services.toArray(new String[0])).withLocalCompose(true);

environment.waitingFor(
"jobmanager_1", Wait.forLogMessage(".*Registering TaskManager.*", 1));
environment.waitingFor("jobmanager_1", buildWaitStrategy(".*Registering TaskManager.*", 1));
environment.waitingFor(
"taskmanager_1",
Wait.forLogMessage(".*Successful registration at resource manager.*", 1));
buildWaitStrategy(".*Successful registration at resource manager.*", 1));
environment.start();

jobManager = environment.getContainerByServiceName("jobmanager_1").get();
jobManager.execInContainer("chown", "-R", "flink:flink", TEST_DATA_DIR);
}

private WaitStrategy buildWaitStrategy(String regex, int times) {
// Increase timeout from 60s (default value) to 180s
return Wait.forLogMessage(regex, times).withStartupTimeout(Duration.ofSeconds(180));
}

@AfterEach
public void after() {
if (environment != null) {
Expand Down

0 comments on commit 194bb75

Please sign in to comment.