Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
wxplovecc committed Aug 5, 2024
1 parent b1fac38 commit 7e49fb7
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 27 deletions.
6 changes: 0 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,6 @@
<td>Integer</td>
<td>Max split size should be cached for one task while scanning. If splits size cached in enumerator are greater than tasks size multiply by this value, scanner will pause scanning.</td>
</tr>
<tr>
<td><h5>scan.shuffle-by-partition</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether shuffle by partition and bucket.</td>
</tr>
<tr>
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@
<td>Boolean</td>
<td>Whether to force the removal of the normalize node when streaming read. Note: This is dangerous and is likely to cause data errors if downstream is used to calculate aggregation and the input is not complete changelog.</td>
</tr>
<tr>
<td><h5>streaming-read.shuffle-by-partition</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether shuffle by partition and bucket when streaming read. </td>
</tr>
<tr>
<td><h5>scan.split-enumerator.batch-size</h5></td>
<td style="word-wrap: break-word;">10</td>
Expand Down
10 changes: 0 additions & 10 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,6 @@ public class CoreOptions implements Serializable {
"Max split size should be cached for one task while scanning. "
+ "If splits size cached in enumerator are greater than tasks size multiply by this value, scanner will pause scanning.");

public static final ConfigOption<Boolean> SCAN_SHUFFLE_BY_PARTITION =
key("scan.shuffle-by-partition")
.booleanType()
.defaultValue(false)
.withDescription("Whether shuffle by partition and bucket.");

@Immutable
public static final ConfigOption<MergeEngine> MERGE_ENGINE =
key("merge-engine")
Expand Down Expand Up @@ -1601,10 +1595,6 @@ public int scanSplitMaxPerTask() {
return options.get(SCAN_MAX_SPLITS_PER_TASK);
}

public boolean scanShuffleByPartition() {
return options.get(SCAN_SHUFFLE_BY_PARTITION);
}

public int localSortMaxNumFileHandles() {
return options.get(LOCAL_SORT_MAX_NUM_FILE_HANDLES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ public class FlinkConnectorOptions {
+ " Note: This is dangerous and is likely to cause data errors if downstream"
+ " is used to calculate aggregation and the input is not complete changelog.");

public static final ConfigOption<Boolean> STREAMING_READ_SHUFFLE_BY_PARTITION =
key("streaming-read.shuffle-by-partition")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether shuffle by partition and bucket when streaming read.");

/**
* Weight of writer buffer in managed memory, Flink will compute the memory size for writer
* according to the weight, the actual memory used depends on the running environment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ public class ContinuousFileSplitEnumerator

private final int splitMaxNum;

private final boolean shuffleByPartition;

@Nullable protected Long nextSnapshotId;

protected boolean finished = false;

private boolean stopTriggerScan = false;

private boolean shuffleByPartition = false;

public ContinuousFileSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> remainSplits,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.paimon.flink.source;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamDataTableScan;
Expand Down Expand Up @@ -99,15 +101,15 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
Collection<FileStoreSourceSplit> splits,
@Nullable Long nextSnapshotId,
StreamTableScan scan) {
CoreOptions coreOptions = CoreOptions.fromMap(options);
Options options = Options.fromMap(this.options);
return new ContinuousFileSplitEnumerator(
context,
splits,
nextSnapshotId,
coreOptions.continuousDiscoveryInterval().toMillis(),
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
scan,
bucketMode,
coreOptions.scanSplitMaxPerTask(),
coreOptions.scanShuffleByPartition());
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ private DataStream<RowData> buildContinuousStreamOperator() {
produceTypeInfo(),
createReadBuilder(),
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
watermarkStrategy == null);
watermarkStrategy == null,
conf.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
bucketMode,
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(CoreOptions.SCAN_SHUFFLE_BY_PARTITION));
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BY_PARTITION));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,20 @@ public static DataStream<RowData> buildSource(
TypeInformation<RowData> typeInfo,
ReadBuilder readBuilder,
long monitorInterval,
boolean emitSnapshotWatermark) {
boolean emitSnapshotWatermark,
boolean shuffleByPartition) {
return env.addSource(
new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel()
.partitionCustom(
(key, numPartitions) ->
ChannelComputer.select(key.f0, key.f1, numPartitions),
(key, numPartitions) -> {
if (shuffleByPartition) {
return ChannelComputer.select(key.f0, key.f1, numPartitions);
}
return ChannelComputer.select(key.f1, numPartitions);
},
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
Expand Down

0 comments on commit 7e49fb7

Please sign in to comment.