diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 3fa95edb8682..486ea45a6260 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -59,6 +59,8 @@ /** Tests for changelog table with primary keys. */ public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase { + private static final int TIMEOUT = 180; + // ------------------------------------------------------------------------ // Test Utilities // ------------------------------------------------------------------------ @@ -95,12 +97,38 @@ private String createCatalogSql(String catalogName, String warehouse) { catalogName, warehouse, defaultPropertyString); } + private CloseableIterator collect(TableResult result) { + return collect(result, TIMEOUT); + } + + private CloseableIterator collect(TableResult result, int timeout) { + JobClient client = result.getJobClient().get(); + Thread timeoutThread = + new Thread( + () -> { + for (int i = 0; i < timeout; i++) { + try { + Thread.sleep(1000); + if (client.getJobStatus().get().isGloballyTerminalState()) { + break; + } + } catch (Exception e) { + client.cancel(); + throw new RuntimeException(e); + } + } + client.cancel(); + }); + timeoutThread.start(); + return result.collect(); + } + // ------------------------------------------------------------------------ // Constructed Tests // ------------------------------------------------------------------------ @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testFullCompactionTriggerInterval() throws Exception { innerTestChangelogProducing( Arrays.asList( @@ -109,7 +137,7 @@ public void testFullCompactionTriggerInterval() throws Exception { } @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testFullCompactionWithLongCheckpointInterval() throws Exception { // create table TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().parallelism(1).build(); @@ -135,7 +163,7 @@ public void testFullCompactionWithLongCheckpointInterval() throws Exception { .build(); sEnv.executeSql(createCatalogSql("testCatalog", path)); sEnv.executeSql("USE CATALOG testCatalog"); - CloseableIterator it = sEnv.executeSql("SELECT * FROM T").collect(); + CloseableIterator it = collect(sEnv.executeSql("SELECT * FROM T")); // run compact job StreamExecutionEnvironment env = @@ -168,7 +196,7 @@ public void testFullCompactionWithLongCheckpointInterval() throws Exception { } @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testLookupChangelog() throws Exception { innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'")); } @@ -190,7 +218,7 @@ public void testTableReadWriteBranch() throws Exception { + "'bucket' = '2'" + ")"); - CloseableIterator it = sEnv.executeSql("SELECT * FROM T2").collect(); + CloseableIterator it = collect(sEnv.executeSql("SELECT * FROM T2")); // insert data sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await(); @@ -213,7 +241,7 @@ public void testTableReadWriteBranch() throws Exception { sEnv.executeSql("ALTER TABLE T2 SET ('changelog-producer'='full-compaction')"); CloseableIterator branchIt = - sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */").collect(); + collect(sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */")); // insert data to branch sEnv.executeSql( "INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')") @@ -261,7 +289,7 @@ private void innerTestChangelogProducing(List options) throws Exception sEnv.executeSql( "INSERT INTO T SELECT SUM(i) AS k, g AS v FROM `default_catalog`.`default_database`.`S` GROUP BY g"); - CloseableIterator it = sEnv.executeSql("SELECT * FROM T").collect(); + CloseableIterator it = collect(sEnv.executeSql("SELECT * FROM T")); // write initial data sEnv.executeSql( @@ -329,7 +357,7 @@ public void testBatchJobWithConflictAndRestart() throws Exception { result1.await(); result2.await(); - try (CloseableIterator it = tEnv.executeSql("SELECT * FROM t").collect()) { + try (CloseableIterator it = collect(tEnv.executeSql("SELECT * FROM t"))) { for (int i = 0; i < 3; i++) { assertThat(it).hasNext(); Row row = it.next(); @@ -338,7 +366,7 @@ public void testBatchJobWithConflictAndRestart() throws Exception { } } - @Timeout(60) + @Timeout(TIMEOUT) @ParameterizedTest() @ValueSource(booleans = {false, true}) public void testRecreateTableWithException(boolean isReloadData) throws Exception { @@ -361,7 +389,7 @@ public void testRecreateTableWithException(boolean isReloadData) throws Exceptio .build(); sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); sEnv.executeSql("USE CATALOG testCatalog"); - CloseableIterator it = sEnv.executeSql("SELECT * FROM t").collect(); + CloseableIterator it = collect(sEnv.executeSql("SELECT * FROM t")); // first write List values = new ArrayList<>(); @@ -414,7 +442,7 @@ public void testRecreateTableWithException(boolean isReloadData) throws Exceptio } @Test - @Timeout(120) + @Timeout(TIMEOUT) public void testChangelogCompactInBatchWrite() throws Exception { TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); String catalogDdl = @@ -504,7 +532,7 @@ public void testChangelogCompactInBatchWrite() throws Exception { } @Test - @Timeout(120) + @Timeout(TIMEOUT) public void testChangelogCompactInStreamWrite() throws Exception { TableEnvironment sEnv = tableEnvironmentBuilder() @@ -533,7 +561,7 @@ public void testChangelogCompactInStreamWrite() throws Exception { + "', 'source.monitor-interval' = '500ms' )"); sEnv.executeSql("INSERT INTO t SELECT * FROM `default_catalog`.`default_database`.`s`"); - CloseableIterator it = sEnv.executeSql("SELECT * FROM t").collect(); + CloseableIterator it = collect(sEnv.executeSql("SELECT * FROM t")); // write initial data List values = new ArrayList<>(); @@ -589,7 +617,7 @@ private List listAllFilesWithPrefix(String prefix) throws Exception { private void assertStreamingResult(TableResult result, List expected) throws Exception { List actual = new ArrayList<>(); - try (CloseableIterator it = result.collect()) { + try (CloseableIterator it = collect(result)) { while (actual.size() < expected.size() && it.hasNext()) { actual.add(it.next()); } @@ -611,14 +639,14 @@ private void assertStreamingResult(CloseableIterator it, List expected // ------------------------------------------------------------------------ @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testNoChangelogProducerBatchRandom() throws Exception { TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); testNoChangelogProducerRandom(bEnv, 1, false); } @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testNoChangelogProducerStreamingRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); TableEnvironment sEnv = @@ -631,14 +659,14 @@ public void testNoChangelogProducerStreamingRandom() throws Exception { } @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testFullCompactionChangelogProducerBatchRandom() throws Exception { TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); testFullCompactionChangelogProducerRandom(bEnv, 1, false); } @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testFullCompactionChangelogProducerStreamingRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); TableEnvironment sEnv = @@ -651,7 +679,7 @@ public void testFullCompactionChangelogProducerStreamingRandom() throws Exceptio } @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testStandAloneFullCompactJobRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); TableEnvironment sEnv = @@ -664,14 +692,14 @@ public void testStandAloneFullCompactJobRandom() throws Exception { } @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testLookupChangelogProducerBatchRandom() throws Exception { TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); testLookupChangelogProducerRandom(bEnv, 1, false); } @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testLookupChangelogProducerStreamingRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); TableEnvironment sEnv = @@ -684,7 +712,7 @@ public void testLookupChangelogProducerStreamingRandom() throws Exception { } @Test - @Timeout(180) + @Timeout(TIMEOUT) public void testStandAloneLookupJobRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); TableEnvironment sEnv = @@ -868,7 +896,7 @@ private void checkChangelogTestResult(int numProducers) throws Exception { ResultChecker checker = new ResultChecker(); int endCnt = 0; - try (CloseableIterator it = sEnv.executeSql("SELECT * FROM T").collect()) { + try (CloseableIterator it = collect(sEnv.executeSql("SELECT * FROM T"))) { while (it.hasNext()) { Row row = it.next(); checker.addChangelog(row); @@ -986,7 +1014,7 @@ private void checkBatchResult(int numProducers) throws Exception { bEnv.executeSql("USE CATALOG testCatalog"); ResultChecker checker = new ResultChecker(); - try (CloseableIterator it = bEnv.executeSql("SELECT * FROM T").collect()) { + try (CloseableIterator it = collect(bEnv.executeSql("SELECT * FROM T"))) { while (it.hasNext()) { checker.addChangelog(it.next()); }