Skip to content

Commit

Permalink
[tests] Fix @timeout annotation not working in PrimaryKeyFileStoreTab…
Browse files Browse the repository at this point in the history
…leITCase
  • Loading branch information
tsreaper committed Dec 4, 2024
1 parent 11cebd4 commit 0baa838
Showing 1 changed file with 52 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
/** Tests for changelog table with primary keys. */
public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {

private static final int TIMEOUT = 180;

// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -95,12 +97,38 @@ private String createCatalogSql(String catalogName, String warehouse) {
catalogName, warehouse, defaultPropertyString);
}

private CloseableIterator<Row> collect(TableResult result) {
return collect(result, TIMEOUT);
}

private CloseableIterator<Row> collect(TableResult result, int timeout) {
JobClient client = result.getJobClient().get();
Thread timeoutThread =
new Thread(
() -> {
for (int i = 0; i < timeout; i++) {
try {
Thread.sleep(1000);
if (client.getJobStatus().get().isGloballyTerminalState()) {
break;
}
} catch (Exception e) {
client.cancel();
throw new RuntimeException(e);
}
}
client.cancel();
});
timeoutThread.start();
return result.collect();
}

// ------------------------------------------------------------------------
// Constructed Tests
// ------------------------------------------------------------------------

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testFullCompactionTriggerInterval() throws Exception {
innerTestChangelogProducing(
Arrays.asList(
Expand All @@ -109,7 +137,7 @@ public void testFullCompactionTriggerInterval() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testFullCompactionWithLongCheckpointInterval() throws Exception {
// create table
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().parallelism(1).build();
Expand All @@ -135,7 +163,7 @@ public void testFullCompactionWithLongCheckpointInterval() throws Exception {
.build();
sEnv.executeSql(createCatalogSql("testCatalog", path));
sEnv.executeSql("USE CATALOG testCatalog");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM T"));

// run compact job
StreamExecutionEnvironment env =
Expand Down Expand Up @@ -168,7 +196,7 @@ public void testFullCompactionWithLongCheckpointInterval() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testLookupChangelog() throws Exception {
innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'"));
}
Expand All @@ -190,7 +218,7 @@ public void testTableReadWriteBranch() throws Exception {
+ "'bucket' = '2'"
+ ")");

CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T2").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM T2"));

// insert data
sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await();
Expand All @@ -213,7 +241,7 @@ public void testTableReadWriteBranch() throws Exception {
sEnv.executeSql("ALTER TABLE T2 SET ('changelog-producer'='full-compaction')");

CloseableIterator<Row> branchIt =
sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */").collect();
collect(sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */"));
// insert data to branch
sEnv.executeSql(
"INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')")
Expand Down Expand Up @@ -261,7 +289,7 @@ private void innerTestChangelogProducing(List<String> options) throws Exception

sEnv.executeSql(
"INSERT INTO T SELECT SUM(i) AS k, g AS v FROM `default_catalog`.`default_database`.`S` GROUP BY g");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM T"));

// write initial data
sEnv.executeSql(
Expand Down Expand Up @@ -329,7 +357,7 @@ public void testBatchJobWithConflictAndRestart() throws Exception {
result1.await();
result2.await();

try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * FROM t").collect()) {
try (CloseableIterator<Row> it = collect(tEnv.executeSql("SELECT * FROM t"))) {
for (int i = 0; i < 3; i++) {
assertThat(it).hasNext();
Row row = it.next();
Expand All @@ -338,7 +366,7 @@ public void testBatchJobWithConflictAndRestart() throws Exception {
}
}

@Timeout(60)
@Timeout(TIMEOUT)
@ParameterizedTest()
@ValueSource(booleans = {false, true})
public void testRecreateTableWithException(boolean isReloadData) throws Exception {
Expand All @@ -361,7 +389,7 @@ public void testRecreateTableWithException(boolean isReloadData) throws Exceptio
.build();
sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
sEnv.executeSql("USE CATALOG testCatalog");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM t").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM t"));

// first write
List<String> values = new ArrayList<>();
Expand Down Expand Up @@ -414,7 +442,7 @@ public void testRecreateTableWithException(boolean isReloadData) throws Exceptio
}

@Test
@Timeout(120)
@Timeout(TIMEOUT)
public void testChangelogCompactInBatchWrite() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
String catalogDdl =
Expand Down Expand Up @@ -504,7 +532,7 @@ public void testChangelogCompactInBatchWrite() throws Exception {
}

@Test
@Timeout(120)
@Timeout(TIMEOUT)
public void testChangelogCompactInStreamWrite() throws Exception {
TableEnvironment sEnv =
tableEnvironmentBuilder()
Expand Down Expand Up @@ -533,7 +561,7 @@ public void testChangelogCompactInStreamWrite() throws Exception {
+ "', 'source.monitor-interval' = '500ms' )");

sEnv.executeSql("INSERT INTO t SELECT * FROM `default_catalog`.`default_database`.`s`");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM t").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM t"));

// write initial data
List<String> values = new ArrayList<>();
Expand Down Expand Up @@ -589,7 +617,7 @@ private List<String> listAllFilesWithPrefix(String prefix) throws Exception {

private void assertStreamingResult(TableResult result, List<Row> expected) throws Exception {
List<Row> actual = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {
try (CloseableIterator<Row> it = collect(result)) {
while (actual.size() < expected.size() && it.hasNext()) {
actual.add(it.next());
}
Expand All @@ -611,14 +639,14 @@ private void assertStreamingResult(CloseableIterator<Row> it, List<Row> expected
// ------------------------------------------------------------------------

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testNoChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testNoChangelogProducerRandom(bEnv, 1, false);
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testNoChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand All @@ -631,14 +659,14 @@ public void testNoChangelogProducerStreamingRandom() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testFullCompactionChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testFullCompactionChangelogProducerRandom(bEnv, 1, false);
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testFullCompactionChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand All @@ -651,7 +679,7 @@ public void testFullCompactionChangelogProducerStreamingRandom() throws Exceptio
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testStandAloneFullCompactJobRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand All @@ -664,14 +692,14 @@ public void testStandAloneFullCompactJobRandom() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testLookupChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testLookupChangelogProducerRandom(bEnv, 1, false);
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testLookupChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand All @@ -684,7 +712,7 @@ public void testLookupChangelogProducerStreamingRandom() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testStandAloneLookupJobRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand Down Expand Up @@ -868,7 +896,7 @@ private void checkChangelogTestResult(int numProducers) throws Exception {

ResultChecker checker = new ResultChecker();
int endCnt = 0;
try (CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T").collect()) {
try (CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM T"))) {
while (it.hasNext()) {
Row row = it.next();
checker.addChangelog(row);
Expand Down Expand Up @@ -986,7 +1014,7 @@ private void checkBatchResult(int numProducers) throws Exception {
bEnv.executeSql("USE CATALOG testCatalog");

ResultChecker checker = new ResultChecker();
try (CloseableIterator<Row> it = bEnv.executeSql("SELECT * FROM T").collect()) {
try (CloseableIterator<Row> it = collect(bEnv.executeSql("SELECT * FROM T"))) {
while (it.hasNext()) {
checker.addChangelog(it.next());
}
Expand Down

0 comments on commit 0baa838

Please sign in to comment.