Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Add Shuffle by partition Option to ContinuousFileSplitEnumerator #3877

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@
<td>Duration</td>
<td>If the new snapshot has not been generated when the checkpoint starts to trigger, the enumerator will block the checkpoint and wait for the new snapshot. Set the maximum waiting time to avoid infinite waiting, if timeout, the checkpoint will fail. Note that it should be set smaller than the checkpoint timeout.</td>
</tr>
<tr>
<td><h5>streaming-read.shuffle-by-partition</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether shuffle by partition and bucket when streaming read.</td>
</tr>
<tr>
<td><h5>unaware-bucket.compaction.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ public class FlinkConnectorOptions {
+ " Note: This is dangerous and is likely to cause data errors if downstream"
+ " is used to calculate aggregation and the input is not complete changelog.");

public static final ConfigOption<Boolean> STREAMING_READ_SHUFFLE_BY_PARTITION =
key("streaming-read.shuffle-by-partition")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether shuffle by partition and bucket when streaming read.");

/**
* Weight of writer buffer in managed memory, Flink will compute the memory size for writer
* according to the weight, the actual memory used depends on the running environment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
Expand Down Expand Up @@ -75,6 +76,8 @@ public class ContinuousFileSplitEnumerator

private final int splitMaxNum;

private final boolean shuffleByPartition;

@Nullable protected Long nextSnapshotId;

protected boolean finished = false;
Expand All @@ -88,7 +91,8 @@ public ContinuousFileSplitEnumerator(
long discoveryInterval,
StreamTableScan scan,
BucketMode bucketMode,
int splitMaxPerTask) {
int splitMaxPerTask,
boolean shuffleByPartition) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.nextSnapshotId = nextSnapshotId;
Expand All @@ -98,6 +102,7 @@ public ContinuousFileSplitEnumerator(
this.scan = scan;
this.splitAssigner = createSplitAssigner(bucketMode);
this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
this.shuffleByPartition = shuffleByPartition;
addSplits(remainSplits);

this.consumerProgressCalculator =
Expand Down Expand Up @@ -275,7 +280,12 @@ protected synchronized void assignSplits() {
}

protected int assignSuggestedTask(FileStoreSourceSplit split) {
return ((DataSplit) split.split()).bucket() % context.currentParallelism();
DataSplit dataSplit = ((DataSplit) split.split());
if (shuffleByPartition) {
return ChannelComputer.select(
dataSplit.partition(), dataSplit.bucket(), context.currentParallelism());
}
return ChannelComputer.select(dataSplit.bucket(), context.currentParallelism());
}

protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.paimon.flink.source;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamDataTableScan;
Expand Down Expand Up @@ -99,14 +101,15 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
Collection<FileStoreSourceSplit> splits,
@Nullable Long nextSnapshotId,
StreamTableScan scan) {
CoreOptions coreOptions = CoreOptions.fromMap(options);
Options options = Options.fromMap(this.options);
return new ContinuousFileSplitEnumerator(
context,
splits,
nextSnapshotId,
coreOptions.continuousDiscoveryInterval().toMillis(),
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
scan,
bucketMode,
coreOptions.scanSplitMaxPerTask());
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ private DataStream<RowData> buildContinuousStreamOperator() {
produceTypeInfo(),
createReadBuilder(),
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
watermarkStrategy == null);
watermarkStrategy == null,
conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,17 @@ public AlignedContinuousFileSplitEnumerator(
StreamTableScan scan,
BucketMode bucketMode,
long alignTimeout,
int splitPerTaskMax) {
int splitPerTaskMax,
boolean shuffleByPartition) {
super(
context,
remainSplits,
nextSnapshotId,
discoveryInterval,
scan,
bucketMode,
splitPerTaskMax);
splitPerTaskMax,
shuffleByPartition);
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
this.nextSnapshotId = nextSnapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
scan,
bucketMode,
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK));
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,20 @@ public static DataStream<RowData> buildSource(
TypeInformation<RowData> typeInfo,
ReadBuilder readBuilder,
long monitorInterval,
boolean emitSnapshotWatermark) {
boolean emitSnapshotWatermark,
boolean shuffleByPartition) {
return env.addSource(
new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel()
.partitionCustom(
(key, numPartitions) ->
ChannelComputer.select(key.f0, key.f1, numPartitions),
(key, numPartitions) -> {
if (shuffleByPartition) {
return ChannelComputer.select(key.f0, key.f1, numPartitions);
}
return ChannelComputer.select(key.f1, numPartitions);
},
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ public Builder withBucketMode(BucketMode bucketMode) {

public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
context, initialSplits, null, discoveryInterval, scan, bucketMode, 10);
context, initialSplits, null, discoveryInterval, scan, bucketMode, 10, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,15 @@ public Builder setAlignedTimeout(long timeout) {

public AlignedContinuousFileSplitEnumerator build() {
return new AlignedContinuousFileSplitEnumerator(
context, initialSplits, null, discoveryInterval, scan, bucketMode, timeout, 10);
context,
initialSplits,
null,
discoveryInterval,
scan,
bucketMode,
timeout,
10,
false);
}
}
}
Loading