diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index 782a9804e505..c42780afa5ae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -100,7 +100,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { == RuntimeExecutionMode.STREAMING; if (origin instanceof SystemCatalogTable) { return new PushedTableSource( - new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode)); + new SystemTableSource( + ((SystemCatalogTable) origin).table(), + isStreamingMode, + context.getObjectIdentifier())); } else { Table table = buildPaimonTable(context); if (table instanceof FileStoreTable) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index fd29dd0482d6..c2e080937a02 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -32,7 +32,6 @@ import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.Split; import org.apache.paimon.utils.Projection; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -67,7 +66,6 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT; -import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX; import static org.apache.paimon.utils.Preconditions.checkState; /** @@ -79,9 +77,6 @@ * LogSourceProvider}. */ public class DataTableSource extends FlinkTableSource { - private static final String FLINK_INFER_SCAN_PARALLELISM = - String.format( - "%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key()); private final ObjectIdentifier tableIdentifier; private final boolean streaming; @@ -90,8 +85,6 @@ public class DataTableSource extends FlinkTableSource { @Nullable private WatermarkStrategy watermarkStrategy; - private SplitStatistics splitStatistics; - @Nullable private List dynamicPartitionFilteringFields; public DataTableSource( @@ -247,14 +240,6 @@ private DataStream configureSource( return sourceBuilder.withParallelism(parallelism).withEnv(env).build(); } - private void scanSplitsForInference() { - if (splitStatistics == null) { - List splits = - table.newReadBuilder().withFilter(predicate).newScan().plan().splits(); - splitStatistics = new SplitStatistics(splits); - } - } - @Override public DataTableSource copy() { return new DataTableSource( @@ -335,24 +320,4 @@ public void applyDynamicFiltering(List candidateFilterFields) { public boolean isStreaming() { return streaming; } - - /** Split statistics for inferring row count and parallelism size. */ - protected static class SplitStatistics { - - private final int splitNumber; - private final long totalRowCount; - - private SplitStatistics(List splits) { - this.splitNumber = splits.size(); - this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum(); - } - - public int splitNumber() { - return splitNumber; - } - - public long totalRowCount() { - return totalRowCount; - } - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 68209240ed17..3b4494433aa9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.PredicateConverter; import org.apache.paimon.predicate.PartitionPredicateVisitor; @@ -25,6 +26,7 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.PredicateVisitor; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.Split; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.table.connector.ChangelogMode; @@ -46,16 +48,23 @@ import java.util.List; import java.util.Optional; +import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX; + /** A Flink {@link ScanTableSource} for paimon. */ public abstract class FlinkTableSource { private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class); + protected static final String FLINK_INFER_SCAN_PARALLELISM = + String.format( + "%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key()); + protected final Table table; @Nullable protected Predicate predicate; @Nullable protected int[][] projectFields; @Nullable protected Long limit; + protected SplitStatistics splitStatistics; public FlinkTableSource(Table table) { this(table, null, null, null); @@ -132,4 +141,36 @@ public void pushLimit(long limit) { public abstract void applyDynamicFiltering(List candidateFilterFields); public abstract boolean isStreaming(); + + protected void scanSplitsForInference() { + if (splitStatistics == null) { + List splits = + table.newReadBuilder().withFilter(predicate).newScan().plan().splits(); + splitStatistics = new SplitStatistics(splits); + } + } + + /** Split statistics for inferring row count and parallelism size. */ + protected static class SplitStatistics { + + private final int splitNumber; + private final long totalRowCount; + + protected SplitStatistics(List splits) { + this.splitNumber = splits.size(); + this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum(); + } + + public int splitNumber() { + return splitNumber; + } + + public long totalRowCount() { + return totalRowCount; + } + } + + public Table getTable() { + return table; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index b577a73aadfd..77848d154733 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.PaimonDataStreamScanProvider; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.DataTable; @@ -26,12 +27,16 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; -import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.plan.stats.TableStats; @@ -46,13 +51,16 @@ public class SystemTableSource extends FlinkTableSource { private final boolean isStreamingMode; private final int splitBatchSize; private final FlinkConnectorOptions.SplitAssignMode splitAssignMode; + private final ObjectIdentifier tableIdentifier; - public SystemTableSource(Table table, boolean isStreamingMode) { + public SystemTableSource( + Table table, boolean isStreamingMode, ObjectIdentifier tableIdentifier) { super(table); this.isStreamingMode = isStreamingMode; Options options = Options.fromMap(table.options()); this.splitBatchSize = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE); this.splitAssignMode = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE); + this.tableIdentifier = tableIdentifier; } public SystemTableSource( @@ -62,11 +70,13 @@ public SystemTableSource( @Nullable int[][] projectFields, @Nullable Long limit, int splitBatchSize, - FlinkConnectorOptions.SplitAssignMode splitAssignMode) { + FlinkConnectorOptions.SplitAssignMode splitAssignMode, + ObjectIdentifier tableIdentifier) { super(table, predicate, projectFields, limit); this.isStreamingMode = isStreamingMode; this.splitBatchSize = splitBatchSize; this.splitAssignMode = splitAssignMode; + this.tableIdentifier = tableIdentifier; } @Override @@ -85,7 +95,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { } else { source = new StaticFileStoreSource(readBuilder, limit, splitBatchSize, splitAssignMode); } - return SourceProvider.of(source); + return new PaimonDataStreamScanProvider( + source.getBoundedness() == Boundedness.BOUNDED, + env -> + configSourceParallelism( + env, + env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + tableIdentifier.asSummaryString()))); } @Override @@ -97,7 +115,8 @@ public SystemTableSource copy() { projectFields, limit, splitBatchSize, - splitAssignMode); + splitAssignMode, + tableIdentifier); } @Override @@ -139,4 +158,34 @@ public void applyDynamicFiltering(List candidateFilterFields) { public boolean isStreaming() { return isStreamingMode; } + + private DataStreamSource configSourceParallelism( + StreamExecutionEnvironment env, DataStreamSource source) { + Options options = Options.fromMap(this.table.options()); + Configuration envConfig = (Configuration) env.getConfiguration(); + if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) { + options.set( + FlinkConnectorOptions.INFER_SCAN_PARALLELISM, + Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM))); + } + Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM); + if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) { + scanSplitsForInference(); + parallelism = splitStatistics.splitNumber(); + if (null != limit && limit > 0) { + int limitCount = limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue(); + parallelism = Math.min(parallelism, limitCount); + } + + parallelism = Math.max(1, parallelism); + parallelism = + Math.min( + parallelism, + options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM)); + } + if (parallelism != null) { + source.setParallelism(parallelism); + } + return source; + } }