diff --git a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java index 70eac0d35af7..d074e2dc47d9 100644 --- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java +++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java @@ -29,6 +29,7 @@ import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitStrategy; import java.io.File; import java.io.IOException; @@ -109,6 +110,7 @@ public void before() throws Exception { for (String s : kafkaServices) { environment.withLogConsumer(s + "_1", new Slf4jLogConsumer(LOG)); } + environment.waitingFor("kafka_1", buildWaitStrategy(".*Recorded new controller.*", 2)); } if (withHive) { List hiveServices = @@ -122,11 +124,8 @@ public void before() throws Exception { for (String s : hiveServices) { environment.withLogConsumer(s + "_1", new Slf4jLogConsumer(LOG)); } - // Increase timeout from 60s (default value) to 180s environment.waitingFor( - "hive-server_1", - Wait.forLogMessage(".*Starting HiveServer2.*", 1) - .withStartupTimeout(Duration.ofSeconds(180))); + "hive-server_1", buildWaitStrategy(".*Starting HiveServer2.*", 1)); } if (withSpark) { List sparkServices = Arrays.asList("spark-master", "spark-worker"); @@ -136,22 +135,26 @@ public void before() throws Exception { } environment.waitingFor( "spark-master_1", - Wait.forLogMessage( + buildWaitStrategy( ".*Master: I have been elected leader! New state: ALIVE.*", 1)); } environment.withServices(services.toArray(new String[0])).withLocalCompose(true); - environment.waitingFor( - "jobmanager_1", Wait.forLogMessage(".*Registering TaskManager.*", 1)); + environment.waitingFor("jobmanager_1", buildWaitStrategy(".*Registering TaskManager.*", 1)); environment.waitingFor( "taskmanager_1", - Wait.forLogMessage(".*Successful registration at resource manager.*", 1)); + buildWaitStrategy(".*Successful registration at resource manager.*", 1)); environment.start(); jobManager = environment.getContainerByServiceName("jobmanager_1").get(); jobManager.execInContainer("chown", "-R", "flink:flink", TEST_DATA_DIR); } + private WaitStrategy buildWaitStrategy(String regex, int times) { + // Increase timeout from 60s (default value) to 180s + return Wait.forLogMessage(regex, times).withStartupTimeout(Duration.ofSeconds(180)); + } + @AfterEach public void after() { if (environment != null) {