Skip to content

Commit

Permalink
[flink] Add validation for parallelism of the SortOperator. (apache#2691
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Aitozi authored Jan 15, 2024
1 parent 6bad5b9 commit 3d899fa
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class SortOperator extends TableStreamOperator<InternalRow>
private final int pageSize;
private final int arity;
private final int spillSortMaxNumFiles;
private final int sinkParallelism;

private transient BinaryExternalSortBuffer buffer;
private transient IOManager ioManager;
Expand All @@ -50,19 +51,26 @@ public SortOperator(
RowType rowType,
long maxMemory,
int pageSize,
int spillSortMaxNumFiles) {
int spillSortMaxNumFiles,
int sinkParallelism) {
this.keyType = keyType;
this.rowType = rowType;
this.maxMemory = maxMemory;
this.pageSize = pageSize;
this.arity = rowType.getFieldCount();
this.spillSortMaxNumFiles = spillSortMaxNumFiles;
this.sinkParallelism = sinkParallelism;
}

@Override
public void open() throws Exception {
super.open();
initBuffer();
if (sinkParallelism != getRuntimeContext().getNumberOfParallelSubtasks()) {
throw new IllegalArgumentException(
"Please ensure that the runtime parallelism of the sink matches the initial configuration "
+ "to avoid potential issues with skewed range partitioning.");
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public static <KEY> DataStream<RowData> sortStreamByKey(
sinkParallelismValue == null
? inputStream.getParallelism()
: Integer.parseInt(sinkParallelismValue);
if (sinkParallelism == -1) {
throw new UnsupportedOperationException(
"The adaptive batch scheduler is not supported. Please set the sink parallelism using the key: "
+ FlinkConnectorOptions.SINK_PARALLELISM.key());
}
final int sampleSize = sinkParallelism * 1000;
final int rangeNum = sinkParallelism * 10;

Expand Down Expand Up @@ -155,7 +160,8 @@ public Tuple2<KEY, RowData> map(RowData value) {
longRowType,
options.writeBufferSize(),
options.pageSize(),
options.localSortMaxNumFileHandles()))
options.localSortMaxNumFileHandles(),
sinkParallelism))
.setParallelism(sinkParallelism)
// remove the key column from every row
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void testSort() throws Exception {
rowType,
MemorySize.parse("10 mb").getBytes(),
(int) MemorySize.parse("16 kb").getBytes(),
128) {};
128,
1) {};

OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator);
harness.open();
Expand All @@ -88,7 +89,7 @@ public void testSort() throws Exception {
}

@Test
public void testCloseSortOprator() throws Exception {
public void testCloseSortOperator() throws Exception {
RowType keyRowType =
new RowType(
Collections.singletonList(
Expand All @@ -107,7 +108,8 @@ public void testCloseSortOprator() throws Exception {
rowType,
MemorySize.parse("10 mb").getBytes(),
(int) MemorySize.parse("16 kb").getBytes(),
128) {};
128,
1) {};
OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator);
harness.open();
File[] files = harness.getEnvironment().getIOManager().getSpillingDirectories();
Expand Down

0 comments on commit 3d899fa

Please sign in to comment.