From dc4552491a3f679f6e6591e816a465443cb1d211 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 12 Apr 2024 13:30:24 +0800 Subject: [PATCH] [flink] support to infer parallelism for system table (#3201) --- .../flink/AbstractFlinkTableFactory.java | 5 +- .../paimon/flink/source/DataTableSource.java | 79 +--------- .../paimon/flink/source/FlinkTableSource.java | 77 +++++++++ .../flink/source/SystemTableSource.java | 32 +++- .../flink/source/DataTableSourceTest.java | 147 ++++++++++++------ 5 files changed, 211 insertions(+), 129 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index 782a9804e505..c42780afa5ae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -100,7 +100,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { == RuntimeExecutionMode.STREAMING; if (origin instanceof SystemCatalogTable) { return new PushedTableSource( - new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode)); + new SystemTableSource( + ((SystemCatalogTable) origin).table(), + isStreamingMode, + context.getObjectIdentifier())); } else { Table table = buildPaimonTable(context); if (table instanceof FileStoreTable) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index fd29dd0482d6..ff0312ee52ae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.LogChangelogMode; import org.apache.paimon.CoreOptions.LogConsistency; -import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy; import org.apache.paimon.flink.PaimonDataStreamScanProvider; import org.apache.paimon.flink.log.LogSourceProvider; @@ -32,13 +31,9 @@ import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.Split; import org.apache.paimon.utils.Projection; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.LookupTableSource.LookupContext; @@ -67,7 +62,6 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT; -import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX; import static org.apache.paimon.utils.Preconditions.checkState; /** @@ -79,9 +73,6 @@ * LogSourceProvider}. */ public class DataTableSource extends FlinkTableSource { - private static final String FLINK_INFER_SCAN_PARALLELISM = - String.format( - "%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key()); private final ObjectIdentifier tableIdentifier; private final boolean streaming; @@ -90,8 +81,6 @@ public class DataTableSource extends FlinkTableSource { @Nullable private WatermarkStrategy watermarkStrategy; - private SplitStatistics splitStatistics; - @Nullable private List dynamicPartitionFilteringFields; public DataTableSource( @@ -211,48 +200,12 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields); return new PaimonDataStreamScanProvider( - !streaming, env -> configureSource(sourceBuilder, env)); - } - - private DataStream configureSource( - FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) { - Options options = Options.fromMap(this.table.options()); - Configuration envConfig = (Configuration) env.getConfiguration(); - if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) { - options.set( - FlinkConnectorOptions.INFER_SCAN_PARALLELISM, - Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM))); - } - Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM); - if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) { - if (streaming) { - parallelism = options.get(CoreOptions.BUCKET); - } else { - scanSplitsForInference(); - parallelism = splitStatistics.splitNumber(); - if (null != limit && limit > 0) { - int limitCount = - limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue(); - parallelism = Math.min(parallelism, limitCount); - } - - parallelism = Math.max(1, parallelism); - } - parallelism = - Math.min( - parallelism, - options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM)); - } - - return sourceBuilder.withParallelism(parallelism).withEnv(env).build(); - } - - private void scanSplitsForInference() { - if (splitStatistics == null) { - List splits = - table.newReadBuilder().withFilter(predicate).newScan().plan().splits(); - splitStatistics = new SplitStatistics(splits); - } + !streaming, + env -> + sourceBuilder + .withParallelism(inferSourceParallelism(env)) + .withEnv(env) + .build()); } @Override @@ -335,24 +288,4 @@ public void applyDynamicFiltering(List candidateFilterFields) { public boolean isStreaming() { return streaming; } - - /** Split statistics for inferring row count and parallelism size. */ - protected static class SplitStatistics { - - private final int splitNumber; - private final long totalRowCount; - - private SplitStatistics(List splits) { - this.splitNumber = splits.size(); - this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum(); - } - - public int splitNumber() { - return splitNumber; - } - - public long totalRowCount() { - return totalRowCount; - } - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 68209240ed17..7254eefaa435 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -18,15 +18,21 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.PredicateConverter; +import org.apache.paimon.options.Options; import org.apache.paimon.predicate.PartitionPredicateVisitor; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.PredicateVisitor; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.Split; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.LookupTableSource.LookupContext; import org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider; @@ -46,16 +52,23 @@ import java.util.List; import java.util.Optional; +import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX; + /** A Flink {@link ScanTableSource} for paimon. */ public abstract class FlinkTableSource { private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class); + protected static final String FLINK_INFER_SCAN_PARALLELISM = + String.format( + "%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key()); + protected final Table table; @Nullable protected Predicate predicate; @Nullable protected int[][] projectFields; @Nullable protected Long limit; + protected SplitStatistics splitStatistics; public FlinkTableSource(Table table) { this(table, null, null, null); @@ -132,4 +145,68 @@ public void pushLimit(long limit) { public abstract void applyDynamicFiltering(List candidateFilterFields); public abstract boolean isStreaming(); + + @Nullable + protected Integer inferSourceParallelism(StreamExecutionEnvironment env) { + Options options = Options.fromMap(this.table.options()); + Configuration envConfig = (Configuration) env.getConfiguration(); + if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) { + options.set( + FlinkConnectorOptions.INFER_SCAN_PARALLELISM, + Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM))); + } + Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM); + if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) { + if (isStreaming()) { + parallelism = Math.max(1, options.get(CoreOptions.BUCKET)); + } else { + scanSplitsForInference(); + parallelism = splitStatistics.splitNumber(); + if (null != limit && limit > 0) { + int limitCount = + limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue(); + parallelism = Math.min(parallelism, limitCount); + } + + parallelism = Math.max(1, parallelism); + parallelism = + Math.min( + parallelism, + options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM)); + } + } + return parallelism; + } + + protected void scanSplitsForInference() { + if (splitStatistics == null) { + List splits = + table.newReadBuilder().withFilter(predicate).newScan().plan().splits(); + splitStatistics = new SplitStatistics(splits); + } + } + + /** Split statistics for inferring row count and parallelism size. */ + protected static class SplitStatistics { + + private final int splitNumber; + private final long totalRowCount; + + protected SplitStatistics(List splits) { + this.splitNumber = splits.size(); + this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum(); + } + + public int splitNumber() { + return splitNumber; + } + + public long totalRowCount() { + return totalRowCount; + } + } + + public Table getTable() { + return table; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index b577a73aadfd..49ed0c0b8368 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.PaimonDataStreamScanProvider; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.DataTable; @@ -26,12 +27,14 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; -import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.plan.stats.TableStats; @@ -46,13 +49,16 @@ public class SystemTableSource extends FlinkTableSource { private final boolean isStreamingMode; private final int splitBatchSize; private final FlinkConnectorOptions.SplitAssignMode splitAssignMode; + private final ObjectIdentifier tableIdentifier; - public SystemTableSource(Table table, boolean isStreamingMode) { + public SystemTableSource( + Table table, boolean isStreamingMode, ObjectIdentifier tableIdentifier) { super(table); this.isStreamingMode = isStreamingMode; Options options = Options.fromMap(table.options()); this.splitBatchSize = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE); this.splitAssignMode = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE); + this.tableIdentifier = tableIdentifier; } public SystemTableSource( @@ -62,11 +68,13 @@ public SystemTableSource( @Nullable int[][] projectFields, @Nullable Long limit, int splitBatchSize, - FlinkConnectorOptions.SplitAssignMode splitAssignMode) { + FlinkConnectorOptions.SplitAssignMode splitAssignMode, + ObjectIdentifier tableIdentifier) { super(table, predicate, projectFields, limit); this.isStreamingMode = isStreamingMode; this.splitBatchSize = splitBatchSize; this.splitAssignMode = splitAssignMode; + this.tableIdentifier = tableIdentifier; } @Override @@ -85,7 +93,20 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { } else { source = new StaticFileStoreSource(readBuilder, limit, splitBatchSize, splitAssignMode); } - return SourceProvider.of(source); + return new PaimonDataStreamScanProvider( + source.getBoundedness() == Boundedness.BOUNDED, + env -> { + Integer parallelism = inferSourceParallelism(env); + DataStreamSource dataStreamSource = + env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + tableIdentifier.asSummaryString()); + if (parallelism != null) { + dataStreamSource.setParallelism(parallelism); + } + return dataStreamSource; + }); } @Override @@ -97,7 +118,8 @@ public SystemTableSource copy() { projectFields, limit, splitBatchSize, - splitAssignMode); + splitAssignMode, + tableIdentifier); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java index 91af8071e5c3..b98762ae20c3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java @@ -31,10 +31,15 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.InnerTableWrite; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.system.ReadOptimizedTable; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; + +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -47,6 +52,7 @@ import org.junit.jupiter.api.io.TempDir; import java.util.Collections; +import java.util.Map; import java.util.Optional; import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX; @@ -55,35 +61,12 @@ /** Tests for {@link DataTableSource}. */ class DataTableSourceTest { - @Test - void testInferScanParallelism(@TempDir java.nio.file.Path path) throws Exception { - FileIO fileIO = LocalFileIO.create(); - Path tablePath = new Path(path.toString()); - SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); - TableSchema tableSchema = - schemaManager.createTable( - Schema.newBuilder() - .column("a", DataTypes.INT()) - .column("b", DataTypes.BIGINT()) - .option("bucket", "1") - .build()); - FileStoreTable fileStoreTable = - FileStoreTableFactory.create(fileIO, tablePath, tableSchema); - InnerTableWrite writer = fileStoreTable.newWrite("test"); - TableCommitImpl commit = fileStoreTable.newCommit("test"); - writer.write(GenericRow.of(1, 2L)); - writer.write(GenericRow.of(3, 4L)); - writer.write(GenericRow.of(5, 6L)); - writer.write(GenericRow.of(7, 8L)); - writer.write(GenericRow.of(9, 10L)); - writer.write(GenericRow.of(11, 12L)); - writer.write(GenericRow.of(13, 14L)); - writer.write(GenericRow.of(15, 16L)); - writer.write(GenericRow.of(17, 18L)); - commit.commit(writer.prepareCommit()); + @TempDir java.nio.file.Path path; - commit.close(); - writer.close(); + @Test + void testInferScanParallelism() throws Exception { + FileStoreTable fileStoreTable = createTable(ImmutableMap.of("bucket", "1")); + writeData(fileStoreTable); DataTableSource tableSource = new DataTableSource( @@ -92,28 +75,7 @@ void testInferScanParallelism(@TempDir java.nio.file.Path path) throws Exception true, null, null); - PaimonDataStreamScanProvider runtimeProvider = - (PaimonDataStreamScanProvider) - tableSource.getScanRuntimeProvider( - new ScanTableSource.ScanContext() { - @Override - public TypeInformation createTypeInformation( - DataType dataType) { - throw new UnsupportedOperationException(); - } - - @Override - public TypeInformation createTypeInformation( - LogicalType logicalType) { - throw new UnsupportedOperationException(); - } - - @Override - public DynamicTableSource.DataStructureConverter - createDataStructureConverter(DataType dataType) { - throw new UnsupportedOperationException(); - } - }); + PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment(); DataStream sourceStream1 = runtimeProvider.produceDataStream(s -> Optional.empty(), sEnv1); @@ -134,4 +96,89 @@ public TypeInformation createTypeInformation( assertThat(sourceStream2.getParallelism()).isNotEqualTo(1); assertThat(sourceStream2.getParallelism()).isEqualTo(sEnv2.getParallelism()); } + + @Test + public void testInferStreamParallelism() throws Exception { + FileStoreTable fileStoreTable = createTable(ImmutableMap.of("bucket", "-1")); + + DataTableSource tableSource = + new DataTableSource( + ObjectIdentifier.of("cat", "db", "table"), + fileStoreTable, + true, + null, + null); + PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); + + StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment(); + DataStream sourceStream1 = + runtimeProvider.produceDataStream(s -> Optional.empty(), sEnv1); + // parallelism = 1 for table with -1 bucket. + assertThat(sourceStream1.getParallelism()).isEqualTo(1); + } + + @Test + public void testSystemTableParallelism() throws Exception { + FileStoreTable fileStoreTable = + createTable(ImmutableMap.of("bucket", "1", "scan.parallelism", "3")); + + ReadOptimizedTable ro = new ReadOptimizedTable(fileStoreTable); + + SystemTableSource tableSource = + new SystemTableSource(ro, false, ObjectIdentifier.of("cat", "db", "table")); + PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(tableSource); + + Configuration configuration = new Configuration(); + configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); + StreamExecutionEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment(); + DataStream sourceStream1 = + runtimeProvider.produceDataStream(s -> Optional.empty(), sEnv1); + assertThat(sourceStream1.getParallelism()).isEqualTo(3); + } + + private PaimonDataStreamScanProvider runtimeProvider(FlinkTableSource tableSource) { + return (PaimonDataStreamScanProvider) + tableSource.getScanRuntimeProvider( + new ScanTableSource.ScanContext() { + @Override + public TypeInformation createTypeInformation(DataType dataType) { + throw new UnsupportedOperationException(); + } + + @Override + public TypeInformation createTypeInformation( + LogicalType logicalType) { + throw new UnsupportedOperationException(); + } + + @Override + public DynamicTableSource.DataStructureConverter + createDataStructureConverter(DataType dataType) { + throw new UnsupportedOperationException(); + } + }); + } + + private FileStoreTable createTable(Map options) throws Exception { + FileIO fileIO = LocalFileIO.create(); + Path tablePath = new Path(path.toString()); + SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); + TableSchema tableSchema = + schemaManager.createTable( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.BIGINT()) + .options(options) + .build()); + return FileStoreTableFactory.create(fileIO, tablePath, tableSchema); + } + + private void writeData(FileStoreTable table) throws Exception { + InnerTableWrite writer = table.newWrite("test"); + TableCommitImpl commit = table.newCommit("test"); + writer.write(GenericRow.of(1, 2L)); + commit.commit(writer.prepareCommit()); + commit.close(); + writer.close(); + } }