Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] fix skew bug of stream read unware bucket table #3955

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@
<td>If the new snapshot has not been generated when the checkpoint starts to trigger, the enumerator will block the checkpoint and wait for the new snapshot. Set the maximum waiting time to avoid infinite waiting, if timeout, the checkpoint will fail. Note that it should be set smaller than the checkpoint timeout.</td>
</tr>
<tr>
<td><h5>streaming-read.shuffle-by-partition</h5></td>
<td><h5>streaming-read.shuffle-bucket-with-partition</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether shuffle by partition and bucket when streaming read.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ public class FlinkConnectorOptions {
+ " Note: This is dangerous and is likely to cause data errors if downstream"
+ " is used to calculate aggregation and the input is not complete changelog.");

public static final ConfigOption<Boolean> STREAMING_READ_SHUFFLE_BY_PARTITION =
key("streaming-read.shuffle-by-partition")
public static final ConfigOption<Boolean> STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION =
key("streaming-read.shuffle-bucket-with-partition")
.booleanType()
.defaultValue(true)
.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class ContinuousFileSplitEnumerator

private final int splitMaxNum;

private final boolean shuffleByPartition;
private final boolean shuffleBucketWithPartition;

@Nullable protected Long nextSnapshotId;

Expand All @@ -92,7 +92,7 @@ public ContinuousFileSplitEnumerator(
StreamTableScan scan,
BucketMode bucketMode,
int splitMaxPerTask,
boolean shuffleByPartition) {
boolean shuffleBucketWithPartition) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.nextSnapshotId = nextSnapshotId;
Expand All @@ -102,7 +102,7 @@ public ContinuousFileSplitEnumerator(
this.scan = scan;
this.splitAssigner = createSplitAssigner(bucketMode);
this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
this.shuffleByPartition = shuffleByPartition;
this.shuffleBucketWithPartition = shuffleBucketWithPartition;
addSplits(remainSplits);

this.consumerProgressCalculator =
Expand Down Expand Up @@ -281,7 +281,7 @@ protected synchronized void assignSplits() {

protected int assignSuggestedTask(FileStoreSourceSplit split) {
DataSplit dataSplit = ((DataSplit) split.split());
if (shuffleByPartition) {
if (shuffleBucketWithPartition) {
return ChannelComputer.select(
dataSplit.partition(), dataSplit.bucket(), context.currentParallelism());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,6 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
scan,
bucketMode,
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class FlinkSourceBuilder {

private final Table table;
private final Options conf;

private final BucketMode bucketMode;
private String sourceName;
private Boolean sourceBounded;
private StreamExecutionEnvironment env;
Expand All @@ -90,6 +90,10 @@ public class FlinkSourceBuilder {

public FlinkSourceBuilder(Table table) {
this.table = table;
this.bucketMode =
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.HASH_FIXED;
this.sourceName = table.name();
this.conf = Options.fromMap(table.options());
}
Expand Down Expand Up @@ -187,24 +191,14 @@ private DataStream<RowData> buildStaticFileSource() {
private DataStream<RowData> buildContinuousFileSource() {
return toDataStream(
new ContinuousFileStoreSource(
createReadBuilder(),
table.options(),
limit,
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.HASH_FIXED));
createReadBuilder(), table.options(), limit, bucketMode));
}

private DataStream<RowData> buildAlignedContinuousFileSource() {
assertStreamingConfigurationForAlignMode(env);
return toDataStream(
new AlignedContinuousFileStoreSource(
createReadBuilder(),
table.options(),
limit,
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.HASH_FIXED));
createReadBuilder(), table.options(), limit, bucketMode));
}

private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
Expand Down Expand Up @@ -306,7 +300,9 @@ private DataStream<RowData> buildContinuousStreamOperator() {
createReadBuilder(),
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
watermarkStrategy == null,
conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
conf.get(
FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
bucketMode);
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public AlignedContinuousFileSplitEnumerator(
BucketMode bucketMode,
long alignTimeout,
int splitPerTaskMax,
boolean shuffleByPartition) {
boolean shuffleBucketWithPartition) {
super(
context,
remainSplits,
Expand All @@ -104,7 +104,7 @@ public AlignedContinuousFileSplitEnumerator(
scan,
bucketMode,
splitPerTaskMax,
shuffleByPartition);
shuffleBucketWithPartition);
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
this.nextSnapshotId = nextSnapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
bucketMode,
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.source.operator;

import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -52,6 +54,8 @@
import java.util.OptionalLong;
import java.util.TreeMap;

import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;

/**
* This is the single (non-parallel) monitoring task, it is responsible for:
*
Expand Down Expand Up @@ -230,23 +234,44 @@ public static DataStream<RowData> buildSource(
ReadBuilder readBuilder,
long monitorInterval,
boolean emitSnapshotWatermark,
boolean shuffleByPartition) {
return env.addSource(
new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel()
.partitionCustom(
(key, numPartitions) -> {
if (shuffleByPartition) {
return ChannelComputer.select(key.f0, key.f1, numPartitions);
}
return ChannelComputer.select(key.f1, numPartitions);
},
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
})
.transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder));
boolean shuffleBucketWithPartition,
BucketMode bucketMode) {
SingleOutputStreamOperator<Split> singleOutputStreamOperator =
env.addSource(
new MonitorFunction(
readBuilder, monitorInterval, emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel();

DataStream<Split> sourceDataStream =
bucketMode == BUCKET_UNAWARE
? shuffleUnwareBucket(singleOutputStreamOperator)
: shuffleNonUnwareBucket(
singleOutputStreamOperator, shuffleBucketWithPartition);

return sourceDataStream.transform(
name + "-Reader", typeInfo, new ReadOperator(readBuilder));
}

private static DataStream<Split> shuffleUnwareBucket(
SingleOutputStreamOperator<Split> singleOutputStreamOperator) {
return singleOutputStreamOperator.rebalance();
}

private static DataStream<Split> shuffleNonUnwareBucket(
SingleOutputStreamOperator<Split> singleOutputStreamOperator,
boolean shuffleBucketWithPartition) {
return singleOutputStreamOperator.partitionCustom(
(key, numPartitions) -> {
if (shuffleBucketWithPartition) {
return ChannelComputer.select(key.f0, key.f1, numPartitions);
}
return ChannelComputer.select(key.f1, numPartitions);
},
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ public void testReadWrite() {
assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB"));
}

@Test
public void testReadUnwareBucketTableWithRebalanceShuffle() throws Exception {
batchSql(
"CREATE TABLE append_scalable_table (id INT, data STRING) "
+ "WITH ('bucket' = '-1', 'consumer-id' = 'test', 'consumer.expiration-time' = '365 d', 'target-file-size' = '1 B', 'source.split.target-size' = '1 B', 'scan.parallelism' = '4')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");

BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter(("SELECT id FROM append_scalable_table")));
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1), Row.of(2));
iterator.close();
}

@Test
public void testReadPartitionOrder() {
setParallelism(1);
Expand Down
Loading