From f39e1b9ede83471fea75505c9e8fefdcf13c8f9c Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 12 Apr 2024 11:13:53 +0800 Subject: [PATCH] expose source too --- docs/content/program-api/flink-api.md | 55 ++++--- .../flink/sink/cdc/RichCdcSinkBuilder.java | 70 +++++++-- .../flink/action/SortCompactAction.java | 19 +-- .../apache/paimon/flink/sink/FlinkSink.java | 6 +- .../paimon/flink/source/DataTableSource.java | 21 +-- .../flink/source/FlinkSourceBuilder.java | 144 +++++++++++------- .../apache/paimon/flink/FileStoreITCase.java | 48 +++--- 7 files changed, 224 insertions(+), 139 deletions(-) diff --git a/docs/content/program-api/flink-api.md b/docs/content/program-api/flink-api.md index 0fe555eed84d..74625862923c 100644 --- a/docs/content/program-api/flink-api.md +++ b/docs/content/program-api/flink-api.md @@ -127,10 +127,14 @@ public class WriteToTable { ## Read from Table ```java +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.source.FlinkSourceBuilder; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; + import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class ReadFromTable { @@ -138,15 +142,22 @@ public class ReadFromTable { public static void readFrom() throws Exception { // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); - // create paimon catalog - tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')"); - tableEnv.executeSql("USE CATALOG paimon"); + // get table from catalog + Options catalogOptions = new Options(); + catalogOptions.set("warehouse", "/path/to/warehouse"); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + Table table = catalog.getTable(Identifier.create("my_db", "T")); + + FlinkSourceBuilder builder = new FlinkSourceBuilder(table).env(env); + + // builder.sourceBounded(true); + // builder.projection(...); + // builder.predicate(...); + // builder.limit(...); + // builder.sourceParallelism(...); - // convert to DataStream - Table table = tableEnv.sqlQuery("SELECT * FROM my_paimon_table"); - DataStream dataStream = tableEnv.toChangelogStream(table); + DataStream dataStream = builder.buildForRow(); // use this datastream dataStream.executeAndCollect().forEachRemaining(System.out::println); @@ -211,29 +222,15 @@ public class WriteCdcToTable { catalogOptions.set("warehouse", "/path/to/warehouse"); Catalog.Loader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); - - new RichCdcSinkBuilder() - .withInput(dataStream) - .withTable(createTableIfNotExists(catalogLoader.load(), identifier)) - .withIdentifier(identifier) - .withCatalogLoader(catalogLoader) + Table table = catalogLoader.load().getTable(identifier); + + new RichCdcSinkBuilder(table) + .forRichCdcRecord(dataStream) + .identifier(identifier) + .catalogLoader(catalogLoader) .build(); env.execute(); } - - private static Table createTableIfNotExists(Catalog catalog, Identifier identifier) throws Exception { - Schema.Builder schemaBuilder = Schema.newBuilder(); - schemaBuilder.primaryKey("order_id"); - schemaBuilder.column("order_id", DataTypes.BIGINT()); - schemaBuilder.column("price", DataTypes.DOUBLE()); - Schema schema = schemaBuilder.build(); - try { - catalog.createTable(identifier, schema, false); - } catch (Catalog.TableAlreadyExistException e) { - // do something - } - return catalog.getTable(identifier); - } } ``` \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java index 77489575743c..610856d3af54 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.sink.cdc; -import org.apache.paimon.annotation.Experimental; +import org.apache.paimon.annotation.Public; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.Table; @@ -28,8 +28,12 @@ import javax.annotation.Nullable; -/** Builder for sink when syncing {@link RichCdcRecord} records into one Paimon table. */ -@Experimental +/** + * DataStream API for building Flink Sink for {@link RichCdcRecord} to write with schema evolution. + * + * @since 0.8 + */ +@Public public class RichCdcSinkBuilder { private DataStream input = null; @@ -39,27 +43,26 @@ public class RichCdcSinkBuilder { @Nullable private Integer parallelism; - public RichCdcSinkBuilder withInput(DataStream input) { - this.input = input; - return this; + public RichCdcSinkBuilder(Table table) { + this.table = table; } - public RichCdcSinkBuilder withTable(Table table) { - this.table = table; + public RichCdcSinkBuilder forRichCdcRecord(DataStream input) { + this.input = input; return this; } - public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) { - this.parallelism = parallelism; + public RichCdcSinkBuilder identifier(Identifier identifier) { + this.identifier = identifier; return this; } - public RichCdcSinkBuilder withIdentifier(Identifier identifier) { - this.identifier = identifier; + public RichCdcSinkBuilder parallelism(@Nullable Integer parallelism) { + this.parallelism = parallelism; return this; } - public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) { + public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) { this.catalogLoader = catalogLoader; return this; } @@ -74,4 +77,45 @@ public DataStreamSink build() { .withCatalogLoader(catalogLoader) .build(); } + + // ====================== Deprecated ============================ + + /** @deprecated Use constructor to pass table. */ + @Deprecated + public RichCdcSinkBuilder() {} + + /** @deprecated Use {@link #forRichCdcRecord}. */ + @Deprecated + public RichCdcSinkBuilder withInput(DataStream input) { + this.input = input; + return this; + } + + /** @deprecated Use constructor to pass Table. */ + @Deprecated + public RichCdcSinkBuilder withTable(Table table) { + this.table = table; + return this; + } + + /** @deprecated Use {@link #parallelism}. */ + @Deprecated + public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) { + this.parallelism = parallelism; + return this; + } + + /** @deprecated Use {@link #identifier}. */ + @Deprecated + public RichCdcSinkBuilder withIdentifier(Identifier identifier) { + this.identifier = identifier; + return this; + } + + /** @deprecated Use {@link #catalogLoader}. */ + @Deprecated + public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) { + this.catalogLoader = catalogLoader; + return this; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index 44d8acdad2dc..0936943ed2eb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -84,12 +84,13 @@ public void build() { } Map tableConfig = fileStoreTable.options(); FlinkSourceBuilder sourceBuilder = - new FlinkSourceBuilder( - ObjectIdentifier.of( - catalogName, - identifier.getDatabaseName(), - identifier.getObjectName()), - fileStoreTable); + new FlinkSourceBuilder(fileStoreTable) + .sourceName( + ObjectIdentifier.of( + catalogName, + identifier.getDatabaseName(), + identifier.getObjectName()) + .asSummaryString()); if (getPartitions() != null) { Predicate partitionPredicate = @@ -97,15 +98,15 @@ public void build() { getPartitions().stream() .map(p -> PredicateBuilder.partition(p, table.rowType())) .toArray(Predicate[]::new)); - sourceBuilder.withPredicate(partitionPredicate); + sourceBuilder.predicate(partitionPredicate); } String scanParallelism = tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key()); if (scanParallelism != null) { - sourceBuilder.withParallelism(Integer.parseInt(scanParallelism)); + sourceBuilder.sourceParallelism(Integer.parseInt(scanParallelism)); } - DataStream source = sourceBuilder.withEnv(env).withContinuousMode(false).build(); + DataStream source = sourceBuilder.env(env).sourceBounded(true).build(); TableSorter sorter = TableSorter.getSorter(env, source, fileStoreTable, sortStrategy, orderColumns); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index c65ef6a35440..f7ebf5afdf24 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -311,7 +311,11 @@ protected abstract Committer.Factory createCom protected abstract CommittableStateManager createCommittableStateManager(); public static boolean isStreaming(DataStream input) { - return input.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE) + return isStreaming(input.getExecutionEnvironment()); + } + + public static boolean isStreaming(StreamExecutionEnvironment env) { + return env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index ff0312ee52ae..c4544426fdc6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -190,21 +190,22 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { } FlinkSourceBuilder sourceBuilder = - new FlinkSourceBuilder(tableIdentifier, table) - .withContinuousMode(streaming) - .withLogSourceProvider(logSourceProvider) - .withProjection(projectFields) - .withPredicate(predicate) - .withLimit(limit) - .withWatermarkStrategy(watermarkStrategy) - .withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields); + new FlinkSourceBuilder(table) + .sourceName(tableIdentifier.asSummaryString()) + .sourceBounded(!streaming) + .logSourceProvider(logSourceProvider) + .projection(projectFields) + .predicate(predicate) + .limit(limit) + .watermarkStrategy(watermarkStrategy) + .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields); return new PaimonDataStreamScanProvider( !streaming, env -> sourceBuilder - .withParallelism(inferSourceParallelism(env)) - .withEnv(env) + .sourceParallelism(inferSourceParallelism(env)) + .env(env) .build()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index d8878f360c6f..6b542f71cce5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -24,6 +24,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.Projection; import org.apache.paimon.flink.log.LogSourceProvider; +import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource; import org.apache.paimon.flink.source.operator.MonitorFunction; import org.apache.paimon.flink.utils.TableScanUtils; @@ -35,6 +36,7 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -45,33 +47,38 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; import javax.annotation.Nullable; import java.util.List; import java.util.Optional; +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; /** - * Source builder to build a Flink {@link StaticFileStoreSource} or {@link - * ContinuousFileStoreSource}. This is for normal read/write jobs. + * DataStream API for building Flink Source. + * + * @since 0.8 */ public class FlinkSourceBuilder { - private final ObjectIdentifier tableIdentifier; private final Table table; private final Options conf; - private boolean isContinuous = false; + private String sourceName; + private Boolean sourceBounded; private StreamExecutionEnvironment env; @Nullable private int[][] projectedFields; @Nullable private Predicate predicate; @@ -81,54 +88,61 @@ public class FlinkSourceBuilder { @Nullable private WatermarkStrategy watermarkStrategy; @Nullable private DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo; - public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, Table table) { - this.tableIdentifier = tableIdentifier; + public FlinkSourceBuilder(Table table) { this.table = table; + this.sourceName = table.name(); this.conf = Options.fromMap(table.options()); } - public FlinkSourceBuilder withContinuousMode(boolean isContinuous) { - this.isContinuous = isContinuous; + public FlinkSourceBuilder env(StreamExecutionEnvironment env) { + this.env = env; + if (sourceBounded == null) { + sourceBounded = !FlinkSink.isStreaming(env); + } return this; } - public FlinkSourceBuilder withEnv(StreamExecutionEnvironment env) { - this.env = env; + public FlinkSourceBuilder sourceName(String name) { + this.sourceName = name; return this; } - public FlinkSourceBuilder withProjection(int[][] projectedFields) { - this.projectedFields = projectedFields; + public FlinkSourceBuilder sourceBounded(boolean bounded) { + this.sourceBounded = bounded; return this; } - public FlinkSourceBuilder withPredicate(Predicate predicate) { - this.predicate = predicate; + public FlinkSourceBuilder projection(int[] projectedFields) { + return projection(Projection.of(projectedFields).toNestedIndexes()); + } + + public FlinkSourceBuilder projection(int[][] projectedFields) { + this.projectedFields = projectedFields; return this; } - public FlinkSourceBuilder withLimit(@Nullable Long limit) { - this.limit = limit; + public FlinkSourceBuilder predicate(Predicate predicate) { + this.predicate = predicate; return this; } - public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) { - this.logSourceProvider = logSourceProvider; + public FlinkSourceBuilder limit(@Nullable Long limit) { + this.limit = limit; return this; } - public FlinkSourceBuilder withParallelism(@Nullable Integer parallelism) { + public FlinkSourceBuilder sourceParallelism(@Nullable Integer parallelism) { this.parallelism = parallelism; return this; } - public FlinkSourceBuilder withWatermarkStrategy( + public FlinkSourceBuilder watermarkStrategy( @Nullable WatermarkStrategy watermarkStrategy) { this.watermarkStrategy = watermarkStrategy; return this; } - public FlinkSourceBuilder withDynamicPartitionFilteringFields( + public FlinkSourceBuilder dynamicPartitionFilteringFields( List dynamicPartitionFilteringFields) { if (dynamicPartitionFilteringFields != null && !dynamicPartitionFilteringFields.isEmpty()) { checkState( @@ -144,6 +158,12 @@ public FlinkSourceBuilder withDynamicPartitionFilteringFields( return this; } + @Deprecated + FlinkSourceBuilder logSourceProvider(LogSourceProvider logSourceProvider) { + this.logSourceProvider = logSourceProvider; + return this; + } + private ReadBuilder createReadBuilder() { ReadBuilder readBuilder = table.newReadBuilder().withProjection(projectedFields).withFilter(predicate); @@ -194,7 +214,7 @@ private DataStream toDataStream(Source source) { watermarkStrategy == null ? WatermarkStrategy.noWatermarks() : watermarkStrategy, - tableIdentifier.asSummaryString(), + sourceName, produceTypeInfo()); if (parallelism != null) { dataStream.setParallelism(parallelism); @@ -212,44 +232,58 @@ private TypeInformation produceTypeInfo() { return InternalTypeInfo.of(produceType); } + /** Build source {@link DataStream} with {@link RowData}. */ + public DataStream buildForRow() { + DataType rowType = fromLogicalToDataType(toLogicalType(table.rowType())); + DataType[] fieldDataTypes = rowType.getChildren().toArray(new DataType[0]); + + DataFormatConverters.RowConverter converter = + new DataFormatConverters.RowConverter(fieldDataTypes); + DataStream source = build(); + return source.map((MapFunction) converter::toExternal) + .setParallelism(source.getParallelism()) + .returns(ExternalTypeInfo.of(rowType)); + } + + /** Build source {@link DataStream} with {@link RowData}. */ public DataStream build() { if (env == null) { throw new IllegalArgumentException("StreamExecutionEnvironment should not be null."); } - if (isContinuous) { - TableScanUtils.streamingReadingValidate(table); - - // TODO visit all options through CoreOptions - StartupMode startupMode = CoreOptions.startupMode(conf); - StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf); - - if (logSourceProvider != null && streamingReadMode != FILE) { - if (startupMode != StartupMode.LATEST_FULL) { - return toDataStream(logSourceProvider.createSource(null)); - } else { - return toDataStream( - HybridSource.builder( - LogHybridSourceFactory.buildHybridFirstSource( - table, projectedFields, predicate)) - .addSource( - new LogHybridSourceFactory(logSourceProvider), - Boundedness.CONTINUOUS_UNBOUNDED) - .build()); - } + if (sourceBounded) { + return buildStaticFileSource(); + } + + TableScanUtils.streamingReadingValidate(table); + + // TODO visit all options through CoreOptions + StartupMode startupMode = CoreOptions.startupMode(conf); + StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf); + + if (logSourceProvider != null && streamingReadMode != FILE) { + if (startupMode != StartupMode.LATEST_FULL) { + return toDataStream(logSourceProvider.createSource(null)); } else { - if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) { - return buildAlignedContinuousFileSource(); - } else if (conf.contains(CoreOptions.CONSUMER_ID) - && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) - == CoreOptions.ConsumerMode.EXACTLY_ONCE) { - return buildContinuousStreamOperator(); - } else { - return buildContinuousFileSource(); - } + return toDataStream( + HybridSource.builder( + LogHybridSourceFactory.buildHybridFirstSource( + table, projectedFields, predicate)) + .addSource( + new LogHybridSourceFactory(logSourceProvider), + Boundedness.CONTINUOUS_UNBOUNDED) + .build()); } } else { - return buildStaticFileSource(); + if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) { + return buildAlignedContinuousFileSource(); + } else if (conf.contains(CoreOptions.CONSUMER_ID) + && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) + == CoreOptions.ConsumerMode.EXACTLY_ONCE) { + return buildContinuousStreamOperator(); + } else { + return buildContinuousFileSource(); + } } } @@ -262,7 +296,7 @@ private DataStream buildContinuousStreamOperator() { dataStream = MonitorFunction.buildSource( env, - tableIdentifier.asSummaryString(), + sourceName, produceTypeInfo(), createReadBuilder(), conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(), @@ -276,7 +310,7 @@ private DataStream buildContinuousStreamOperator() { return dataStream; } - public void assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) { + private void assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) { CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkArgument( checkpointConfig.isCheckpointingEnabled(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java index 7cd6c7b2a84d..9eb55ae845e4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java @@ -139,7 +139,7 @@ private static SerializableRowData wrap(RowData row) { } @TestTemplate - public void testRowSink() throws Exception { + public void testRowSourceSink() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); // write @@ -163,7 +163,8 @@ public void testRowSink() throws Exception { // read List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + executeAndCollectRow( + new FlinkSourceBuilder(table).env(env).sourceBounded(true).buildForRow()); // assert Row[] expected = @@ -182,8 +183,7 @@ public void testPartitioned() throws Exception { env.execute(); // read - List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + List results = executeAndCollect(new FlinkSourceBuilder(table).env(env).build()); // assert Row[] expected = @@ -202,8 +202,7 @@ public void testNonPartitioned() throws Exception { env.execute(); // read - List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + List results = executeAndCollect(new FlinkSourceBuilder(table).env(env).build()); // assert Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2), Row.of(3, "p2", 5)}; @@ -232,8 +231,7 @@ public void testOverwrite() throws Exception { env.execute(); // read - List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + List results = executeAndCollect(new FlinkSourceBuilder(table).env(env).build()); Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), Row.of(0, "p1", 2)}; assertThat(results).containsExactlyInAnyOrder(expected); @@ -248,7 +246,7 @@ public void testOverwrite() throws Exception { env.execute(); // read - results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + results = executeAndCollect(new FlinkSourceBuilder(table).env(env).build()); expected = new Row[] {Row.of(19, "p2", 6), Row.of(5, "p1", 1), Row.of(0, "p1", 2)}; assertThat(results).containsExactlyInAnyOrder(expected); @@ -268,7 +266,7 @@ public void testOverwrite() throws Exception { env.execute(); // read - results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + results = executeAndCollect(new FlinkSourceBuilder(table).env(env).build()); expected = new Row[] {Row.of(20, "p2", 3)}; assertThat(results).containsExactlyInAnyOrder(expected); } @@ -282,8 +280,7 @@ public void testPartitionedNonKey() throws Exception { env.execute(); // read - List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + List results = executeAndCollect(new FlinkSourceBuilder(table).env(env).build()); // assert // in streaming mode, expect origin data X 2 (FiniteTestSource) @@ -320,9 +317,9 @@ private void testProjection(FileStoreTable table) throws Exception { projection.project(TABLE_TYPE))); List results = executeAndCollect( - new FlinkSourceBuilder(IDENTIFIER, table) - .withProjection(projection.toNestedIndexes()) - .withEnv(env) + new FlinkSourceBuilder(table) + .projection(projection.toNestedIndexes()) + .env(env) .build(), converter); @@ -360,10 +357,7 @@ public void testContinuousBounded() throws Exception { table.copy( Collections.singletonMap(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "1024")); DataStream source = - new FlinkSourceBuilder(IDENTIFIER, table) - .withContinuousMode(true) - .withEnv(env) - .build(); + new FlinkSourceBuilder(table).sourceBounded(false).env(env).build(); Transformation transformation = source.getTransformation(); assertThat(transformation).isInstanceOf(SourceTransformation.class); assertThat(((SourceTransformation) transformation).getSource().getBoundedness()) @@ -375,9 +369,9 @@ private void innerTestContinuous(FileStoreTable table) throws Exception { BlockingIterator iterator = BlockingIterator.of( - new FlinkSourceBuilder(IDENTIFIER, table) - .withContinuousMode(true) - .withEnv(env) + new FlinkSourceBuilder(table) + .sourceBounded(false) + .env(env) .build() .executeAndCollect(), CONVERTER::toExternal); @@ -500,4 +494,14 @@ public static List executeAndCollect( iterator.close(); return results; } + + public static List executeAndCollectRow(DataStream source) throws Exception { + CloseableIterator iterator = source.executeAndCollect(); + List results = new ArrayList<>(); + while (iterator.hasNext()) { + results.add(iterator.next()); + } + iterator.close(); + return results; + } }