diff --git a/docs/content/flink/expire-partition.md b/docs/content/flink/expire-partition.md
index de4a17e534411..3acf6e59d58cd 100644
--- a/docs/content/flink/expire-partition.md
+++ b/docs/content/flink/expire-partition.md
@@ -130,5 +130,11 @@ More options:
String |
You can specify a pattern to get a timestamp from partitions. The formatter pattern is defined by 'partition.timestamp-formatter'.- By default, read from the first field.
- If the timestamp in the partition is a single field called 'dt', you can use '$dt'.
- If it is spread across multiple fields for year, month, day, and hour, you can use '$year-$month-$day $hour:00:00'.
- If the timestamp is in fields dt and hour, you can use '$dt $hour:00:00'.
|
+
+ end-input.check-partition-expire |
+ false |
+ Boolean |
+ Whether check partition expire after batch mode or bounded stream job finish. |
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
index 52f1dda36b817..8bdc10d58846e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
@@ -39,6 +39,10 @@ static int select(BinaryRow partition, int bucket, int numChannels) {
return (startChannel + bucket) % numChannels;
}
+ static int select(BinaryRow partition, int numChannels) {
+ return Math.abs(partition.hashCode()) % numChannels;
+ }
+
static int select(int bucket, int numChannels) {
return bucket % numChannels;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index c81b6f39b17f2..db9abcfea4def 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -76,7 +76,7 @@ public class FlinkSourceBuilder {
private final Table table;
private final Options conf;
-
+ private final BucketMode bucketMode;
private String sourceName;
private Boolean sourceBounded;
private StreamExecutionEnvironment env;
@@ -90,6 +90,10 @@ public class FlinkSourceBuilder {
public FlinkSourceBuilder(Table table) {
this.table = table;
+ this.bucketMode =
+ table instanceof FileStoreTable
+ ? ((FileStoreTable) table).bucketMode()
+ : BucketMode.HASH_FIXED;
this.sourceName = table.name();
this.conf = Options.fromMap(table.options());
}
@@ -187,24 +191,14 @@ private DataStream buildStaticFileSource() {
private DataStream buildContinuousFileSource() {
return toDataStream(
new ContinuousFileStoreSource(
- createReadBuilder(),
- table.options(),
- limit,
- table instanceof FileStoreTable
- ? ((FileStoreTable) table).bucketMode()
- : BucketMode.HASH_FIXED));
+ createReadBuilder(), table.options(), limit, bucketMode));
}
private DataStream buildAlignedContinuousFileSource() {
assertStreamingConfigurationForAlignMode(env);
return toDataStream(
new AlignedContinuousFileStoreSource(
- createReadBuilder(),
- table.options(),
- limit,
- table instanceof FileStoreTable
- ? ((FileStoreTable) table).bucketMode()
- : BucketMode.HASH_FIXED));
+ createReadBuilder(), table.options(), limit, bucketMode));
}
private DataStream toDataStream(Source source) {
@@ -306,7 +300,8 @@ private DataStream buildContinuousStreamOperator() {
createReadBuilder(),
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
watermarkStrategy == null,
- conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
+ conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION),
+ bucketMode);
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index 2f8ce1ea5d96e..c3b3c3f0504d0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.source.operator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
@@ -38,6 +39,7 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -52,6 +54,8 @@
import java.util.OptionalLong;
import java.util.TreeMap;
+import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
+
/**
* This is the single (non-parallel) monitoring task, it is responsible for:
*
@@ -230,23 +234,52 @@ public static DataStream buildSource(
ReadBuilder readBuilder,
long monitorInterval,
boolean emitSnapshotWatermark,
+ boolean shuffleByPartition,
+ BucketMode bucketMode) {
+ SingleOutputStreamOperator singleOutputStreamOperator =
+ env.addSource(
+ new MonitorFunction(
+ readBuilder, monitorInterval, emitSnapshotWatermark),
+ name + "-Monitor",
+ new JavaTypeInfo<>(Split.class))
+ .forceNonParallel();
+
+ DataStream sourceDataStream =
+ bucketMode == BUCKET_UNAWARE
+ ? shuffleUnwareBucket(singleOutputStreamOperator, shuffleByPartition)
+ : shuffleNonUnwareBucket(singleOutputStreamOperator, shuffleByPartition);
+
+ return sourceDataStream.transform(
+ name + "-Reader", typeInfo, new ReadOperator(readBuilder));
+ }
+
+ private static DataStream shuffleUnwareBucket(
+ SingleOutputStreamOperator singleOutputStreamOperator,
boolean shuffleByPartition) {
- return env.addSource(
- new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark),
- name + "-Monitor",
- new JavaTypeInfo<>(Split.class))
- .forceNonParallel()
- .partitionCustom(
- (key, numPartitions) -> {
- if (shuffleByPartition) {
- return ChannelComputer.select(key.f0, key.f1, numPartitions);
- }
- return ChannelComputer.select(key.f1, numPartitions);
- },
- split -> {
- DataSplit dataSplit = (DataSplit) split;
- return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
- })
- .transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder));
+ if (shuffleByPartition) {
+ return singleOutputStreamOperator.partitionCustom(
+ ChannelComputer::select,
+ split -> {
+ DataSplit dataSplit = (DataSplit) split;
+ return dataSplit.partition();
+ });
+ }
+ return singleOutputStreamOperator.rebalance();
+ }
+
+ private static DataStream shuffleNonUnwareBucket(
+ SingleOutputStreamOperator singleOutputStreamOperator,
+ boolean shuffleByPartition) {
+ return singleOutputStreamOperator.partitionCustom(
+ (key, numPartitions) -> {
+ if (shuffleByPartition) {
+ return ChannelComputer.select(key.f0, key.f1, numPartitions);
+ }
+ return ChannelComputer.select(key.f1, numPartitions);
+ },
+ split -> {
+ DataSplit dataSplit = (DataSplit) split;
+ return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
+ });
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 9a5b9d901448f..4d00d8a4d5b5c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -85,6 +85,22 @@ public void testReadWrite() {
assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB"));
}
+ @Test
+ public void testReadWriteWithUnwareBucket() throws Exception {
+ batchSql(
+ "CREATE TABLE append_scalable_table (id INT, data STRING) "
+ + "WITH ('bucket' = '-1', 'consumer-id' = 'test', 'consumer.expiration-time' = '365 d', 'target-file-size' = '1 B', 'source.split.target-size' = '1 B', 'streaming-read.shuffle-by-partition' = 'false', 'scan.parallelism' = '4')");
+ batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
+ batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
+ batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
+ batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
+
+ BlockingIterator iterator =
+ BlockingIterator.of(streamSqlIter(("SELECT id FROM append_scalable_table")));
+ assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1), Row.of(2));
+ iterator.close();
+ }
+
@Test
public void testReadPartitionOrder() {
setParallelism(1);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 55bd89b0189cc..b792ed30b7540 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -282,7 +282,7 @@ public void testReadWriteFailRandom() throws Exception {
for (int i = 0; i < size; i++) {
Integer j = RANDOM.nextInt();
results.add(Row.of(j, String.valueOf(j)));
- values.append("(" + j + ",'" + j + "'" + "),");
+ values.append("(").append(j).append(",'").append(j).append("'").append("),");
}
FailingFileIO.retryArtificialException(
@@ -290,7 +290,7 @@ public void testReadWriteFailRandom() throws Exception {
batchSql(
String.format(
"INSERT INTO append_table VALUES %s",
- values.toString().substring(0, values.length() - 1))));
+ values.substring(0, values.length() - 1))));
FailingFileIO.retryArtificialException(
() -> {
@@ -311,7 +311,7 @@ public void testReadWriteFailRandomString() throws Exception {
Integer j = RANDOM.nextInt();
String v = String.valueOf(RANDOM.nextInt());
results.add(Row.of(j, v));
- values.append("(" + j + ",'" + v + "'" + "),");
+ values.append("(").append(j).append(",'").append(v).append("'").append("),");
}
FailingFileIO.retryArtificialException(
@@ -319,7 +319,7 @@ public void testReadWriteFailRandomString() throws Exception {
batchSql(
String.format(
"INSERT INTO append_table VALUES %s",
- values.toString().substring(0, values.length() - 1))));
+ values.substring(0, values.length() - 1))));
FailingFileIO.retryArtificialException(
() -> {