Skip to content

Commit

Permalink
[core] fix skew bug of stream read unware bucket table
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 committed Aug 14, 2024
1 parent 9ec3711 commit ef0ab81
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ static int select(BinaryRow partition, int bucket, int numChannels) {
return (startChannel + bucket) % numChannels;
}

static int select(BinaryRow partition, int numChannels) {
return Math.abs(partition.hashCode()) % numChannels;
}

static int select(int bucket, int numChannels) {
return bucket % numChannels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class FlinkSourceBuilder {

private final Table table;
private final Options conf;

private final BucketMode bucketMode;
private String sourceName;
private Boolean sourceBounded;
private StreamExecutionEnvironment env;
Expand All @@ -90,6 +90,10 @@ public class FlinkSourceBuilder {

public FlinkSourceBuilder(Table table) {
this.table = table;
this.bucketMode =
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.HASH_FIXED;
this.sourceName = table.name();
this.conf = Options.fromMap(table.options());
}
Expand Down Expand Up @@ -187,24 +191,14 @@ private DataStream<RowData> buildStaticFileSource() {
private DataStream<RowData> buildContinuousFileSource() {
return toDataStream(
new ContinuousFileStoreSource(
createReadBuilder(),
table.options(),
limit,
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.HASH_FIXED));
createReadBuilder(), table.options(), limit, bucketMode));
}

private DataStream<RowData> buildAlignedContinuousFileSource() {
assertStreamingConfigurationForAlignMode(env);
return toDataStream(
new AlignedContinuousFileStoreSource(
createReadBuilder(),
table.options(),
limit,
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.HASH_FIXED));
createReadBuilder(), table.options(), limit, bucketMode));
}

private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
Expand Down Expand Up @@ -306,7 +300,8 @@ private DataStream<RowData> buildContinuousStreamOperator() {
createReadBuilder(),
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
watermarkStrategy == null,
conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION),
bucketMode);
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.source.operator;

import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -52,6 +54,8 @@
import java.util.OptionalLong;
import java.util.TreeMap;

import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;

/**
* This is the single (non-parallel) monitoring task, it is responsible for:
*
Expand Down Expand Up @@ -230,23 +234,52 @@ public static DataStream<RowData> buildSource(
ReadBuilder readBuilder,
long monitorInterval,
boolean emitSnapshotWatermark,
boolean shuffleByPartition,
BucketMode bucketMode) {
SingleOutputStreamOperator<Split> singleOutputStreamOperator =
env.addSource(
new MonitorFunction(
readBuilder, monitorInterval, emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel();

DataStream<Split> sourceDataStream =
bucketMode == BUCKET_UNAWARE
? shuffleUnwareBucket(singleOutputStreamOperator, shuffleByPartition)
: shuffleNonUnwareBucket(singleOutputStreamOperator, shuffleByPartition);

return sourceDataStream.transform(
name + "-Reader", typeInfo, new ReadOperator(readBuilder));
}

private static DataStream<Split> shuffleUnwareBucket(
SingleOutputStreamOperator<Split> singleOutputStreamOperator,
boolean shuffleByPartition) {
return env.addSource(
new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel()
.partitionCustom(
(key, numPartitions) -> {
if (shuffleByPartition) {
return ChannelComputer.select(key.f0, key.f1, numPartitions);
}
return ChannelComputer.select(key.f1, numPartitions);
},
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
})
.transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder));
if (shuffleByPartition) {
return singleOutputStreamOperator.partitionCustom(
ChannelComputer::select,
split -> {
DataSplit dataSplit = (DataSplit) split;
return dataSplit.partition();
});
}
return singleOutputStreamOperator.rebalance();
}

private static DataStream<Split> shuffleNonUnwareBucket(
SingleOutputStreamOperator<Split> singleOutputStreamOperator,
boolean shuffleByPartition) {
return singleOutputStreamOperator.partitionCustom(
(key, numPartitions) -> {
if (shuffleByPartition) {
return ChannelComputer.select(key.f0, key.f1, numPartitions);
}
return ChannelComputer.select(key.f1, numPartitions);
},
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ public void testReadWrite() {
assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB"));
}

@Test
public void testReadUnwareBucketTableWithRebalanceShuffle() throws Exception {
batchSql(
"CREATE TABLE append_scalable_table (id INT, data STRING) "
+ "WITH ('bucket' = '-1', 'consumer-id' = 'test', 'consumer.expiration-time' = '365 d', 'target-file-size' = '1 B', 'source.split.target-size' = '1 B', 'streaming-read.shuffle-by-partition' = 'false', 'scan.parallelism' = '4')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");

BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter(("SELECT id FROM append_scalable_table")));
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1), Row.of(2));
iterator.close();
}

@Test
public void testReadPartitionOrder() {
setParallelism(1);
Expand Down

0 comments on commit ef0ab81

Please sign in to comment.