diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java index abd12aa37736..c30b1c96d2b4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java @@ -21,12 +21,14 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.source.assigners.DynamicPartitionPruningAssigner; +import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner; import org.apache.paimon.flink.source.assigners.SplitAssigner; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,13 +130,23 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { checkNotNull( dynamicPartitionFilteringInfo, "Cannot apply dynamic filtering because dynamicPartitionFilteringInfo hasn't been set."); - this.splitAssigner = - DynamicPartitionPruningAssigner.createDynamicPartitionPruningAssignerIfNeeded( - subtaskId, - splitAssigner, - dynamicPartitionFilteringInfo.getPartitionRowProjection(), - sourceEvent, - LOG); + + if (splitAssigner instanceof PreAssignSplitAssigner) { + this.splitAssigner = + ((PreAssignSplitAssigner) splitAssigner) + .ofDynamicPartitionPruning( + dynamicPartitionFilteringInfo.getPartitionRowProjection(), + ((DynamicFilteringEvent) sourceEvent).getData()); + } else { + this.splitAssigner = + DynamicPartitionPruningAssigner + .createDynamicPartitionPruningAssignerIfNeeded( + subtaskId, + splitAssigner, + dynamicPartitionFilteringInfo.getPartitionRowProjection(), + sourceEvent, + LOG); + } } else { LOG.error("Received unrecognized event: {}", sourceEvent); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java index 400a2e5c54eb..fbb31bd1080a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java @@ -18,10 +18,15 @@ package org.apache.paimon.flink.source.assigners; +import org.apache.paimon.codegen.Projection; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.source.FileStoreSourceSplit; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.utils.BinPacking; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.table.connector.source.DynamicFilteringData; import javax.annotation.Nullable; @@ -35,29 +40,53 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId; /** - * Pre-calculate which splits each task should process according to the weight, and then distribute - * the splits fairly. + * Pre-calculate which splits each task should process according to the weight or given + * DynamicFilteringData, and then distribute the splits fairly. */ public class PreAssignSplitAssigner implements SplitAssigner { /** Default batch splits size to avoid exceed `akka.framesize`. */ private final int splitBatchSize; + private final int parallelism; + private final Map> pendingSplitAssignment; private final AtomicInteger numberOfPendingSplits; + private final Collection splits; public PreAssignSplitAssigner( int splitBatchSize, SplitEnumeratorContext context, Collection splits) { + this(splitBatchSize, context.currentParallelism(), splits); + } + + public PreAssignSplitAssigner( + int splitBatchSize, + int parallelism, + Collection splits, + Projection partitionRowProjection, + DynamicFilteringData dynamicFilteringData) { + this( + splitBatchSize, + parallelism, + splits.stream() + .filter(s -> filter(partitionRowProjection, dynamicFilteringData, s)) + .collect(Collectors.toList())); + } + + public PreAssignSplitAssigner( + int splitBatchSize, int parallelism, Collection splits) { this.splitBatchSize = splitBatchSize; - this.pendingSplitAssignment = - createBatchFairSplitAssignment(splits, context.currentParallelism()); + this.parallelism = parallelism; + this.splits = splits; + this.pendingSplitAssignment = createBatchFairSplitAssignment(splits, parallelism); this.numberOfPendingSplits = new AtomicInteger(splits.size()); } @@ -127,4 +156,20 @@ public Optional getNextSnapshotId(int subtask) { public int numberOfRemainingSplits() { return numberOfPendingSplits.get(); } + + public SplitAssigner ofDynamicPartitionPruning( + Projection partitionRowProjection, DynamicFilteringData dynamicFilteringData) { + return new PreAssignSplitAssigner( + splitBatchSize, parallelism, splits, partitionRowProjection, dynamicFilteringData); + } + + private static boolean filter( + Projection partitionRowProjection, + DynamicFilteringData dynamicFilteringData, + FileStoreSourceSplit sourceSplit) { + DataSplit dataSplit = (DataSplit) sourceSplit.split(); + BinaryRow partition = dataSplit.partition(); + FlinkRowData projected = new FlinkRowData(partitionRowProjection.apply(partition)); + return dynamicFilteringData.contains(projected); + } }