Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests] Fix @Timeout annotation not working in PrimaryKeyFileStoreTableITCase #4634

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
/** Tests for changelog table with primary keys. */
public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {

private static final int TIMEOUT = 180;

// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -95,12 +97,38 @@ private String createCatalogSql(String catalogName, String warehouse) {
catalogName, warehouse, defaultPropertyString);
}

private CloseableIterator<Row> collect(TableResult result) {
return collect(result, TIMEOUT);
}

private CloseableIterator<Row> collect(TableResult result, int timeout) {
JobClient client = result.getJobClient().get();
Thread timeoutThread =
new Thread(
() -> {
for (int i = 0; i < timeout; i++) {
try {
Thread.sleep(1000);
if (client.getJobStatus().get().isGloballyTerminalState()) {
return;
}
} catch (Exception e) {
client.cancel();
throw new RuntimeException(e);
}
}
client.cancel();
});
timeoutThread.start();
return result.collect();
}

// ------------------------------------------------------------------------
// Constructed Tests
// ------------------------------------------------------------------------

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testFullCompactionTriggerInterval() throws Exception {
innerTestChangelogProducing(
Arrays.asList(
Expand All @@ -109,7 +137,7 @@ public void testFullCompactionTriggerInterval() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testFullCompactionWithLongCheckpointInterval() throws Exception {
// create table
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().parallelism(1).build();
Expand All @@ -135,7 +163,7 @@ public void testFullCompactionWithLongCheckpointInterval() throws Exception {
.build();
sEnv.executeSql(createCatalogSql("testCatalog", path));
sEnv.executeSql("USE CATALOG testCatalog");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM T"));

// run compact job
StreamExecutionEnvironment env =
Expand Down Expand Up @@ -168,7 +196,7 @@ public void testFullCompactionWithLongCheckpointInterval() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testLookupChangelog() throws Exception {
innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'"));
}
Expand All @@ -190,7 +218,7 @@ public void testTableReadWriteBranch() throws Exception {
+ "'bucket' = '2'"
+ ")");

CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T2").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM T2"));

// insert data
sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await();
Expand All @@ -213,7 +241,7 @@ public void testTableReadWriteBranch() throws Exception {
sEnv.executeSql("ALTER TABLE T2 SET ('changelog-producer'='full-compaction')");

CloseableIterator<Row> branchIt =
sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */").collect();
collect(sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */"));
// insert data to branch
sEnv.executeSql(
"INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')")
Expand Down Expand Up @@ -261,7 +289,7 @@ private void innerTestChangelogProducing(List<String> options) throws Exception

sEnv.executeSql(
"INSERT INTO T SELECT SUM(i) AS k, g AS v FROM `default_catalog`.`default_database`.`S` GROUP BY g");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM T"));

// write initial data
sEnv.executeSql(
Expand Down Expand Up @@ -329,7 +357,7 @@ public void testBatchJobWithConflictAndRestart() throws Exception {
result1.await();
result2.await();

try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * FROM t").collect()) {
try (CloseableIterator<Row> it = collect(tEnv.executeSql("SELECT * FROM t"))) {
for (int i = 0; i < 3; i++) {
assertThat(it).hasNext();
Row row = it.next();
Expand All @@ -338,7 +366,7 @@ public void testBatchJobWithConflictAndRestart() throws Exception {
}
}

@Timeout(60)
@Timeout(TIMEOUT)
@ParameterizedTest()
@ValueSource(booleans = {false, true})
public void testRecreateTableWithException(boolean isReloadData) throws Exception {
Expand All @@ -361,7 +389,7 @@ public void testRecreateTableWithException(boolean isReloadData) throws Exceptio
.build();
sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
sEnv.executeSql("USE CATALOG testCatalog");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM t").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM t"));

// first write
List<String> values = new ArrayList<>();
Expand Down Expand Up @@ -414,7 +442,7 @@ public void testRecreateTableWithException(boolean isReloadData) throws Exceptio
}

@Test
@Timeout(120)
@Timeout(TIMEOUT)
public void testChangelogCompactInBatchWrite() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
String catalogDdl =
Expand Down Expand Up @@ -504,7 +532,7 @@ public void testChangelogCompactInBatchWrite() throws Exception {
}

@Test
@Timeout(120)
@Timeout(TIMEOUT)
public void testChangelogCompactInStreamWrite() throws Exception {
TableEnvironment sEnv =
tableEnvironmentBuilder()
Expand Down Expand Up @@ -533,7 +561,7 @@ public void testChangelogCompactInStreamWrite() throws Exception {
+ "', 'source.monitor-interval' = '500ms' )");

sEnv.executeSql("INSERT INTO t SELECT * FROM `default_catalog`.`default_database`.`s`");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM t").collect();
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM t"));

// write initial data
List<String> values = new ArrayList<>();
Expand Down Expand Up @@ -589,7 +617,7 @@ private List<String> listAllFilesWithPrefix(String prefix) throws Exception {

private void assertStreamingResult(TableResult result, List<Row> expected) throws Exception {
List<Row> actual = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {
try (CloseableIterator<Row> it = collect(result)) {
while (actual.size() < expected.size() && it.hasNext()) {
actual.add(it.next());
}
Expand All @@ -611,14 +639,14 @@ private void assertStreamingResult(CloseableIterator<Row> it, List<Row> expected
// ------------------------------------------------------------------------

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testNoChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testNoChangelogProducerRandom(bEnv, 1, false);
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testNoChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand All @@ -631,14 +659,14 @@ public void testNoChangelogProducerStreamingRandom() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testFullCompactionChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testFullCompactionChangelogProducerRandom(bEnv, 1, false);
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testFullCompactionChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand All @@ -651,7 +679,7 @@ public void testFullCompactionChangelogProducerStreamingRandom() throws Exceptio
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testStandAloneFullCompactJobRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand All @@ -664,14 +692,14 @@ public void testStandAloneFullCompactJobRandom() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testLookupChangelogProducerBatchRandom() throws Exception {
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
testLookupChangelogProducerRandom(bEnv, 1, false);
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testLookupChangelogProducerStreamingRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand All @@ -684,7 +712,7 @@ public void testLookupChangelogProducerStreamingRandom() throws Exception {
}

@Test
@Timeout(180)
@Timeout(TIMEOUT)
public void testStandAloneLookupJobRandom() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
TableEnvironment sEnv =
Expand Down Expand Up @@ -868,7 +896,7 @@ private void checkChangelogTestResult(int numProducers) throws Exception {

ResultChecker checker = new ResultChecker();
int endCnt = 0;
try (CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T").collect()) {
try (CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM T"))) {
while (it.hasNext()) {
Row row = it.next();
checker.addChangelog(row);
Expand Down Expand Up @@ -986,7 +1014,7 @@ private void checkBatchResult(int numProducers) throws Exception {
bEnv.executeSql("USE CATALOG testCatalog");

ResultChecker checker = new ResultChecker();
try (CloseableIterator<Row> it = bEnv.executeSql("SELECT * FROM T").collect()) {
try (CloseableIterator<Row> it = collect(bEnv.executeSql("SELECT * FROM T"))) {
while (it.hasNext()) {
checker.addChangelog(it.next());
}
Expand Down
Loading