diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java index 52f1dda36b817..8bdc10d58846e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java @@ -39,6 +39,10 @@ static int select(BinaryRow partition, int bucket, int numChannels) { return (startChannel + bucket) % numChannels; } + static int select(BinaryRow partition, int numChannels) { + return Math.abs(partition.hashCode()) % numChannels; + } + static int select(int bucket, int numChannels) { return bucket % numChannels; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index c81b6f39b17f2..db9abcfea4def 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -76,7 +76,7 @@ public class FlinkSourceBuilder { private final Table table; private final Options conf; - + private final BucketMode bucketMode; private String sourceName; private Boolean sourceBounded; private StreamExecutionEnvironment env; @@ -90,6 +90,10 @@ public class FlinkSourceBuilder { public FlinkSourceBuilder(Table table) { this.table = table; + this.bucketMode = + table instanceof FileStoreTable + ? ((FileStoreTable) table).bucketMode() + : BucketMode.HASH_FIXED; this.sourceName = table.name(); this.conf = Options.fromMap(table.options()); } @@ -187,24 +191,14 @@ private DataStream buildStaticFileSource() { private DataStream buildContinuousFileSource() { return toDataStream( new ContinuousFileStoreSource( - createReadBuilder(), - table.options(), - limit, - table instanceof FileStoreTable - ? ((FileStoreTable) table).bucketMode() - : BucketMode.HASH_FIXED)); + createReadBuilder(), table.options(), limit, bucketMode)); } private DataStream buildAlignedContinuousFileSource() { assertStreamingConfigurationForAlignMode(env); return toDataStream( new AlignedContinuousFileStoreSource( - createReadBuilder(), - table.options(), - limit, - table instanceof FileStoreTable - ? ((FileStoreTable) table).bucketMode() - : BucketMode.HASH_FIXED)); + createReadBuilder(), table.options(), limit, bucketMode)); } private DataStream toDataStream(Source source) { @@ -306,7 +300,8 @@ private DataStream buildContinuousStreamOperator() { createReadBuilder(), conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(), watermarkStrategy == null, - conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION)); + conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION), + bucketMode); if (parallelism != null) { dataStream.getTransformation().setParallelism(parallelism); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java index 2f8ce1ea5d96e..c3b3c3f0504d0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; @@ -52,6 +54,8 @@ import java.util.OptionalLong; import java.util.TreeMap; +import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE; + /** * This is the single (non-parallel) monitoring task, it is responsible for: * @@ -230,23 +234,52 @@ public static DataStream buildSource( ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark, + boolean shuffleByPartition, + BucketMode bucketMode) { + SingleOutputStreamOperator singleOutputStreamOperator = + env.addSource( + new MonitorFunction( + readBuilder, monitorInterval, emitSnapshotWatermark), + name + "-Monitor", + new JavaTypeInfo<>(Split.class)) + .forceNonParallel(); + + DataStream sourceDataStream = + bucketMode == BUCKET_UNAWARE + ? shuffleUnwareBucket(singleOutputStreamOperator, shuffleByPartition) + : shuffleNonUnwareBucket(singleOutputStreamOperator, shuffleByPartition); + + return sourceDataStream.transform( + name + "-Reader", typeInfo, new ReadOperator(readBuilder)); + } + + private static DataStream shuffleUnwareBucket( + SingleOutputStreamOperator singleOutputStreamOperator, boolean shuffleByPartition) { - return env.addSource( - new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark), - name + "-Monitor", - new JavaTypeInfo<>(Split.class)) - .forceNonParallel() - .partitionCustom( - (key, numPartitions) -> { - if (shuffleByPartition) { - return ChannelComputer.select(key.f0, key.f1, numPartitions); - } - return ChannelComputer.select(key.f1, numPartitions); - }, - split -> { - DataSplit dataSplit = (DataSplit) split; - return Tuple2.of(dataSplit.partition(), dataSplit.bucket()); - }) - .transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder)); + if (shuffleByPartition) { + return singleOutputStreamOperator.partitionCustom( + ChannelComputer::select, + split -> { + DataSplit dataSplit = (DataSplit) split; + return dataSplit.partition(); + }); + } + return singleOutputStreamOperator.rebalance(); + } + + private static DataStream shuffleNonUnwareBucket( + SingleOutputStreamOperator singleOutputStreamOperator, + boolean shuffleByPartition) { + return singleOutputStreamOperator.partitionCustom( + (key, numPartitions) -> { + if (shuffleByPartition) { + return ChannelComputer.select(key.f0, key.f1, numPartitions); + } + return ChannelComputer.select(key.f1, numPartitions); + }, + split -> { + DataSplit dataSplit = (DataSplit) split; + return Tuple2.of(dataSplit.partition(), dataSplit.bucket()); + }); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java index 9a5b9d901448f..c8151f76a2ef7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java @@ -85,6 +85,22 @@ public void testReadWrite() { assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB")); } + @Test + public void testReadUnwareBucketTableWithRebalanceShuffle() throws Exception { + batchSql( + "CREATE TABLE append_scalable_table (id INT, data STRING) " + + "WITH ('bucket' = '-1', 'consumer-id' = 'test', 'consumer.expiration-time' = '365 d', 'target-file-size' = '1 B', 'source.split.target-size' = '1 B', 'streaming-read.shuffle-by-partition' = 'false', 'scan.parallelism' = '4')"); + batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')"); + batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')"); + batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')"); + batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')"); + + BlockingIterator iterator = + BlockingIterator.of(streamSqlIter(("SELECT id FROM append_scalable_table"))); + assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1), Row.of(2)); + iterator.close(); + } + @Test public void testReadPartitionOrder() { setParallelism(1);