diff --git a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java deleted file mode 100644 index 77fd3a5e5289..000000000000 --- a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source; - -import org.apache.paimon.Snapshot; -import org.apache.paimon.flink.source.assigners.SplitAssigner; - -import org.apache.flink.api.connector.source.SplitEnumerator; -import org.apache.flink.api.connector.source.SplitEnumeratorContext; - -import javax.annotation.Nullable; - -/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */ -public class StaticFileStoreSplitEnumerator extends StaticFileStoreSplitEnumeratorBase { - - public StaticFileStoreSplitEnumerator( - SplitEnumeratorContext context, - @Nullable Snapshot snapshot, - SplitAssigner splitAssigner) { - this(context, snapshot, splitAssigner, null); - } - - public StaticFileStoreSplitEnumerator( - SplitEnumeratorContext context, - @Nullable Snapshot snapshot, - SplitAssigner splitAssigner, - @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) { - super(context, snapshot, splitAssigner); - } -} diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java deleted file mode 100644 index 77fd3a5e5289..000000000000 --- a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source; - -import org.apache.paimon.Snapshot; -import org.apache.paimon.flink.source.assigners.SplitAssigner; - -import org.apache.flink.api.connector.source.SplitEnumerator; -import org.apache.flink.api.connector.source.SplitEnumeratorContext; - -import javax.annotation.Nullable; - -/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */ -public class StaticFileStoreSplitEnumerator extends StaticFileStoreSplitEnumeratorBase { - - public StaticFileStoreSplitEnumerator( - SplitEnumeratorContext context, - @Nullable Snapshot snapshot, - SplitAssigner splitAssigner) { - this(context, snapshot, splitAssigner, null); - } - - public StaticFileStoreSplitEnumerator( - SplitEnumeratorContext context, - @Nullable Snapshot snapshot, - SplitAssigner splitAssigner, - @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) { - super(context, snapshot, splitAssigner); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java index e68338b24541..828be1eb6da8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java @@ -19,21 +19,35 @@ package org.apache.paimon.flink.source; import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.source.assigners.DynamicPartitionPruningAssigner; import org.apache.paimon.flink.source.assigners.SplitAssigner; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.table.connector.source.DynamicFilteringData; -import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; + import static org.apache.paimon.utils.Preconditions.checkNotNull; /** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */ -public class StaticFileStoreSplitEnumerator extends StaticFileStoreSplitEnumeratorBase { +public class StaticFileStoreSplitEnumerator + implements SplitEnumerator { + + private static final Logger LOG = LoggerFactory.getLogger(StaticFileStoreSplitEnumerator.class); + + private final SplitEnumeratorContext context; + + @Nullable private final Snapshot snapshot; + + private SplitAssigner splitAssigner; @Nullable private final DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo; @@ -49,33 +63,79 @@ public StaticFileStoreSplitEnumerator( @Nullable Snapshot snapshot, SplitAssigner splitAssigner, @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) { - super(context, snapshot, splitAssigner); + this.context = context; + this.snapshot = snapshot; + this.splitAssigner = splitAssigner; this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo; } @Override - public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { - if (sourceEvent instanceof DynamicFilteringEvent) { - DynamicFilteringData dynamicFilteringData = - ((DynamicFilteringEvent) sourceEvent).getData(); - LOG.info( - "Received DynamicFilteringEvent: {}, is filtering: {}.", - subtaskId, - dynamicFilteringData.isFiltering()); + public void start() { + // no resources to start + } + + @Override + public void handleSplitRequest(int subtask, @Nullable String hostname) { + if (!context.registeredReaders().containsKey(subtask)) { + // reader failed between sending the request and now. skip this request. + return; + } + + List assignment = splitAssigner.getNext(subtask, hostname); + if (assignment.size() > 0) { + context.assignSplits( + new SplitsAssignment<>(Collections.singletonMap(subtask, assignment))); + } else { + context.signalNoMoreSplits(subtask); + } + } + + @Override + public void addSplitsBack(List backSplits, int subtaskId) { + splitAssigner.addSplitsBack(subtaskId, backSplits); + } + @Override + public void addReader(int subtaskId) { + // this source is purely lazy-pull-based, nothing to do upon registration + } + + @Override + public PendingSplitsCheckpoint snapshotState(long checkpointId) { + return new PendingSplitsCheckpoint( + splitAssigner.remainingSplits(), snapshot == null ? null : snapshot.id()); + } + + @Override + public void close() { + // no resources to close + } + + @Nullable + public Snapshot snapshot() { + return snapshot; + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent.getClass().getSimpleName().equals("DynamicFilteringEvent")) { checkNotNull( dynamicPartitionFilteringInfo, "Cannot apply dynamic filtering because dynamicPartitionFilteringInfo hasn't been set."); - - if (dynamicFilteringData.isFiltering()) { - this.splitAssigner = - new DynamicPartitionPruningAssigner( - splitAssigner, - dynamicPartitionFilteringInfo.getPartitionRowProjection(), - dynamicFilteringData); - } + this.splitAssigner = + DynamicPartitionPruningAssigner.createDynamicPartitionPruningAssignerIfNeeded( + subtaskId, + splitAssigner, + dynamicPartitionFilteringInfo.getPartitionRowProjection(), + sourceEvent, + LOG); } else { - super.handleSourceEvent(subtaskId, sourceEvent); + LOG.error("Received unrecognized event: {}", sourceEvent); } } + + @VisibleForTesting + public SplitAssigner getSplitAssigner() { + return splitAssigner; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorBase.java deleted file mode 100644 index 9b1b534bdf55..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorBase.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source; - -import org.apache.paimon.Snapshot; -import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.flink.source.assigners.SplitAssigner; - -import org.apache.flink.api.connector.source.SourceEvent; -import org.apache.flink.api.connector.source.SplitEnumerator; -import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.api.connector.source.SplitsAssignment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.List; - -/** A {@link SplitEnumerator} base implementation for {@link StaticFileStoreSource} input. */ -public abstract class StaticFileStoreSplitEnumeratorBase - implements SplitEnumerator { - - protected static final Logger LOG = - LoggerFactory.getLogger(StaticFileStoreSplitEnumeratorBase.class); - - protected final SplitEnumeratorContext context; - - @Nullable protected final Snapshot snapshot; - - protected SplitAssigner splitAssigner; - - public StaticFileStoreSplitEnumeratorBase( - SplitEnumeratorContext context, - @Nullable Snapshot snapshot, - SplitAssigner splitAssigner) { - this.context = context; - this.snapshot = snapshot; - this.splitAssigner = splitAssigner; - } - - @Override - public void start() { - // no resources to start - } - - @Override - public void handleSplitRequest(int subtask, @Nullable String hostname) { - if (!context.registeredReaders().containsKey(subtask)) { - // reader failed between sending the request and now. skip this request. - return; - } - - List assignment = splitAssigner.getNext(subtask, hostname); - if (assignment.size() > 0) { - context.assignSplits( - new SplitsAssignment<>(Collections.singletonMap(subtask, assignment))); - } else { - context.signalNoMoreSplits(subtask); - } - } - - @Override - public void addSplitsBack(List backSplits, int subtaskId) { - splitAssigner.addSplitsBack(subtaskId, backSplits); - } - - @Override - public void addReader(int subtaskId) { - // this source is purely lazy-pull-based, nothing to do upon registration - } - - @Override - public PendingSplitsCheckpoint snapshotState(long checkpointId) { - return new PendingSplitsCheckpoint( - splitAssigner.remainingSplits(), snapshot == null ? null : snapshot.id()); - } - - @Override - public void close() { - // no resources to close - } - - @Nullable - public Snapshot snapshot() { - return snapshot; - } - - @Override - public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { - LOG.error("Received unrecognized event: {}", sourceEvent); - } - - @VisibleForTesting - public SplitAssigner getSplitAssigner() { - return splitAssigner; - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java index 5e5a7247ef08..7b997dae226a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java @@ -24,7 +24,10 @@ import org.apache.paimon.flink.source.FileStoreSourceSplit; import org.apache.paimon.table.source.DataSplit; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.slf4j.Logger; import javax.annotation.Nullable; @@ -83,6 +86,23 @@ public Collection remainingSplits() { .collect(Collectors.toList()); } + public static SplitAssigner createDynamicPartitionPruningAssignerIfNeeded( + int subtaskId, + SplitAssigner oriAssigner, + Projection partitionRowProjection, + SourceEvent sourceEvent, + Logger logger) { + DynamicFilteringData dynamicFilteringData = ((DynamicFilteringEvent) sourceEvent).getData(); + logger.info( + "Received DynamicFilteringEvent: {}, is filtering: {}.", + subtaskId, + dynamicFilteringData.isFiltering()); + return dynamicFilteringData.isFiltering() + ? new DynamicPartitionPruningAssigner( + oriAssigner, partitionRowProjection, dynamicFilteringData) + : oriAssigner; + } + private boolean filter(FileStoreSourceSplit sourceSplit) { DataSplit dataSplit = (DataSplit) sourceSplit.split(); BinaryRow partition = dataSplit.partition();