Skip to content

Commit

Permalink
[fix] Fix insufficient task slots
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Apr 10, 2024
1 parent 1847a5e commit fff3342
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ public void testFullCompactionTriggerInterval() throws Exception {
@Timeout(1200)
public void testFullCompactionWithLongCheckpointInterval() throws Exception {
// create table
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
bEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().parallelism(1).build();
bEnv.executeSql(createCatalogSql("testCatalog", path));
bEnv.executeSql("USE CATALOG testCatalog");
bEnv.executeSql(
Expand All @@ -126,10 +123,11 @@ public void testFullCompactionWithLongCheckpointInterval() throws Exception {

// run select job
TableEnvironment sEnv =
tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
sEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
tableEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(100)
.parallelism(1)
.build();
sEnv.executeSql(createCatalogSql("testCatalog", path));
sEnv.executeSql("USE CATALOG testCatalog");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T").collect();
Expand Down Expand Up @@ -435,9 +433,9 @@ private void testStandAloneFullCompactJobRandom(
streamExecutionEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(random.nextInt(1900) + 100)
.parallelism(2)
.allowRestart()
.build();
env.setParallelism(2);
new CompactAction(path, "default", "T").withStreamExecutionEnvironment(env).build();
env.executeAsync();
}
Expand Down Expand Up @@ -491,10 +489,11 @@ private void testStandAloneLookupJobRandom(

private void checkChangelogTestResult(int numProducers) throws Exception {
TableEnvironment sEnv =
tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
sEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
tableEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(100)
.parallelism(1)
.build();
sEnv.executeSql(createCatalogSql("testCatalog", path));
sEnv.executeSql("USE CATALOG testCatalog");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
/** Similar to Flink's AbstractTestBase but using Junit5. */
public class AbstractTestBase {

private static final int DEFAULT_PARALLELISM = 8;
private static final int DEFAULT_PARALLELISM = 16;

@RegisterExtension
protected static final MiniClusterWithClientExtension MINI_CLUSTER_EXTENSION =
Expand Down

0 comments on commit fff3342

Please sign in to comment.