Skip to content

Commit

Permalink
[flink] Make DPP distribute the splits fairly for flink (apache#4348)
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangXingBo authored Oct 22, 2024
1 parent beb6901 commit 656d71e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.source.assigners.DynamicPartitionPruningAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;

import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,13 +130,23 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
checkNotNull(
dynamicPartitionFilteringInfo,
"Cannot apply dynamic filtering because dynamicPartitionFilteringInfo hasn't been set.");
this.splitAssigner =
DynamicPartitionPruningAssigner.createDynamicPartitionPruningAssignerIfNeeded(
subtaskId,
splitAssigner,
dynamicPartitionFilteringInfo.getPartitionRowProjection(),
sourceEvent,
LOG);

if (splitAssigner instanceof PreAssignSplitAssigner) {
this.splitAssigner =
((PreAssignSplitAssigner) splitAssigner)
.ofDynamicPartitionPruning(
dynamicPartitionFilteringInfo.getPartitionRowProjection(),
((DynamicFilteringEvent) sourceEvent).getData());
} else {
this.splitAssigner =
DynamicPartitionPruningAssigner
.createDynamicPartitionPruningAssignerIfNeeded(
subtaskId,
splitAssigner,
dynamicPartitionFilteringInfo.getPartitionRowProjection(),
sourceEvent,
LOG);
}
} else {
LOG.error("Received unrecognized event: {}", sourceEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package org.apache.paimon.flink.source.assigners;

import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.BinPacking;

import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.connector.source.DynamicFilteringData;

import javax.annotation.Nullable;

Expand All @@ -35,29 +40,53 @@
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId;

/**
* Pre-calculate which splits each task should process according to the weight, and then distribute
* the splits fairly.
* Pre-calculate which splits each task should process according to the weight or given
* DynamicFilteringData, and then distribute the splits fairly.
*/
public class PreAssignSplitAssigner implements SplitAssigner {

/** Default batch splits size to avoid exceed `akka.framesize`. */
private final int splitBatchSize;

private final int parallelism;

private final Map<Integer, LinkedList<FileStoreSourceSplit>> pendingSplitAssignment;

private final AtomicInteger numberOfPendingSplits;
private final Collection<FileStoreSourceSplit> splits;

public PreAssignSplitAssigner(
int splitBatchSize,
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> splits) {
this(splitBatchSize, context.currentParallelism(), splits);
}

public PreAssignSplitAssigner(
int splitBatchSize,
int parallelism,
Collection<FileStoreSourceSplit> splits,
Projection partitionRowProjection,
DynamicFilteringData dynamicFilteringData) {
this(
splitBatchSize,
parallelism,
splits.stream()
.filter(s -> filter(partitionRowProjection, dynamicFilteringData, s))
.collect(Collectors.toList()));
}

public PreAssignSplitAssigner(
int splitBatchSize, int parallelism, Collection<FileStoreSourceSplit> splits) {
this.splitBatchSize = splitBatchSize;
this.pendingSplitAssignment =
createBatchFairSplitAssignment(splits, context.currentParallelism());
this.parallelism = parallelism;
this.splits = splits;
this.pendingSplitAssignment = createBatchFairSplitAssignment(splits, parallelism);
this.numberOfPendingSplits = new AtomicInteger(splits.size());
}

Expand Down Expand Up @@ -127,4 +156,20 @@ public Optional<Long> getNextSnapshotId(int subtask) {
public int numberOfRemainingSplits() {
return numberOfPendingSplits.get();
}

public SplitAssigner ofDynamicPartitionPruning(
Projection partitionRowProjection, DynamicFilteringData dynamicFilteringData) {
return new PreAssignSplitAssigner(
splitBatchSize, parallelism, splits, partitionRowProjection, dynamicFilteringData);
}

private static boolean filter(
Projection partitionRowProjection,
DynamicFilteringData dynamicFilteringData,
FileStoreSourceSplit sourceSplit) {
DataSplit dataSplit = (DataSplit) sourceSplit.split();
BinaryRow partition = dataSplit.partition();
FlinkRowData projected = new FlinkRowData(partitionRowProjection.apply(partition));
return dynamicFilteringData.contains(projected);
}
}

0 comments on commit 656d71e

Please sign in to comment.