Skip to content

Commit

Permalink
[core] fix skew bug of stream read unware bucket table
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 committed Aug 13, 2024
1 parent 9ec3711 commit 4b1efe3
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 35 deletions.
6 changes: 6 additions & 0 deletions docs/content/flink/expire-partition.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,11 @@ More options:
<td>String</td>
<td>You can specify a pattern to get a timestamp from partitions. The formatter pattern is defined by 'partition.timestamp-formatter'.<ul><li>By default, read from the first field.</li><li>If the timestamp in the partition is a single field called 'dt', you can use '$dt'.</li><li>If it is spread across multiple fields for year, month, day, and hour, you can use '$year-$month-$day $hour:00:00'.</li><li>If the timestamp is in fields dt and hour, you can use '$dt $hour:00:00'.</li></ul></td>
</tr>
<tr>
<td><h5>end-input.check-partition-expire</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether check partition expire after batch mode or bounded stream job finish.</li></ul></td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ static int select(BinaryRow partition, int bucket, int numChannels) {
return (startChannel + bucket) % numChannels;
}

static int select(BinaryRow partition, int numChannels) {
return Math.abs(partition.hashCode()) % numChannels;
}

static int select(int bucket, int numChannels) {
return bucket % numChannels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class FlinkSourceBuilder {

private final Table table;
private final Options conf;

private final BucketMode bucketMode;
private String sourceName;
private Boolean sourceBounded;
private StreamExecutionEnvironment env;
Expand All @@ -90,6 +90,10 @@ public class FlinkSourceBuilder {

public FlinkSourceBuilder(Table table) {
this.table = table;
this.bucketMode =
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.HASH_FIXED;
this.sourceName = table.name();
this.conf = Options.fromMap(table.options());
}
Expand Down Expand Up @@ -187,24 +191,14 @@ private DataStream<RowData> buildStaticFileSource() {
private DataStream<RowData> buildContinuousFileSource() {
return toDataStream(
new ContinuousFileStoreSource(
createReadBuilder(),
table.options(),
limit,
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.HASH_FIXED));
createReadBuilder(), table.options(), limit, bucketMode));
}

private DataStream<RowData> buildAlignedContinuousFileSource() {
assertStreamingConfigurationForAlignMode(env);
return toDataStream(
new AlignedContinuousFileStoreSource(
createReadBuilder(),
table.options(),
limit,
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.HASH_FIXED));
createReadBuilder(), table.options(), limit, bucketMode));
}

private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
Expand Down Expand Up @@ -306,7 +300,8 @@ private DataStream<RowData> buildContinuousStreamOperator() {
createReadBuilder(),
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
watermarkStrategy == null,
conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION),
bucketMode);
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.source.operator;

import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -52,6 +54,8 @@
import java.util.OptionalLong;
import java.util.TreeMap;

import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;

/**
* This is the single (non-parallel) monitoring task, it is responsible for:
*
Expand Down Expand Up @@ -230,23 +234,52 @@ public static DataStream<RowData> buildSource(
ReadBuilder readBuilder,
long monitorInterval,
boolean emitSnapshotWatermark,
boolean shuffleByPartition,
BucketMode bucketMode) {
SingleOutputStreamOperator<Split> singleOutputStreamOperator =
env.addSource(
new MonitorFunction(
readBuilder, monitorInterval, emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel();

DataStream<Split> sourceDataStream =
bucketMode == BUCKET_UNAWARE
? shuffleUnwareBucket(singleOutputStreamOperator, shuffleByPartition)
: shuffleNonUnwareBucket(singleOutputStreamOperator, shuffleByPartition);

return sourceDataStream.transform(
name + "-Reader", typeInfo, new ReadOperator(readBuilder));
}

private static DataStream<Split> shuffleUnwareBucket(
SingleOutputStreamOperator<Split> singleOutputStreamOperator,
boolean shuffleByPartition) {
return env.addSource(
new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel()
.partitionCustom(
(key, numPartitions) -> {
if (shuffleByPartition) {
return ChannelComputer.select(key.f0, key.f1, numPartitions);
}
return ChannelComputer.select(key.f1, numPartitions);
},
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
})
.transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder));
if (shuffleByPartition) {
return singleOutputStreamOperator.partitionCustom(
ChannelComputer::select,
split -> {
DataSplit dataSplit = (DataSplit) split;
return dataSplit.partition();
});
}
return singleOutputStreamOperator.rebalance();
}

private static DataStream<Split> shuffleNonUnwareBucket(
SingleOutputStreamOperator<Split> singleOutputStreamOperator,
boolean shuffleByPartition) {
return singleOutputStreamOperator.partitionCustom(
(key, numPartitions) -> {
if (shuffleByPartition) {
return ChannelComputer.select(key.f0, key.f1, numPartitions);
}
return ChannelComputer.select(key.f1, numPartitions);
},
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ public void testReadWrite() {
assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB"));
}

@Test
public void testReadWriteWithUnwareBucket() throws Exception {
batchSql(
"CREATE TABLE append_scalable_table (id INT, data STRING) "
+ "WITH ('bucket' = '-1', 'consumer-id' = 'test', 'consumer.expiration-time' = '365 d', 'target-file-size' = '1 B', 'source.split.target-size' = '1 B', 'streaming-read.shuffle-by-partition' = 'false', 'scan.parallelism' = '4')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql("INSERT INTO append_scalable_table VALUES (1, 'AAA'), (2, 'BBB')");

BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter(("SELECT id FROM append_scalable_table")));
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1), Row.of(2));
iterator.close();
}

@Test
public void testReadPartitionOrder() {
setParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ public void testReadWriteFailRandom() throws Exception {
for (int i = 0; i < size; i++) {
Integer j = RANDOM.nextInt();
results.add(Row.of(j, String.valueOf(j)));
values.append("(" + j + ",'" + j + "'" + "),");
values.append("(").append(j).append(",'").append(j).append("'").append("),");
}

FailingFileIO.retryArtificialException(
() ->
batchSql(
String.format(
"INSERT INTO append_table VALUES %s",
values.toString().substring(0, values.length() - 1))));
values.substring(0, values.length() - 1))));

FailingFileIO.retryArtificialException(
() -> {
Expand All @@ -311,15 +311,15 @@ public void testReadWriteFailRandomString() throws Exception {
Integer j = RANDOM.nextInt();
String v = String.valueOf(RANDOM.nextInt());
results.add(Row.of(j, v));
values.append("(" + j + ",'" + v + "'" + "),");
values.append("(").append(j).append(",'").append(v).append("'").append("),");
}

FailingFileIO.retryArtificialException(
() ->
batchSql(
String.format(
"INSERT INTO append_table VALUES %s",
values.toString().substring(0, values.length() - 1))));
values.substring(0, values.length() - 1))));

FailingFileIO.retryArtificialException(
() -> {
Expand Down

0 comments on commit 4b1efe3

Please sign in to comment.