From ffb903287d38035550255ab69a5ad820ee4785ff Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 15 Apr 2024 16:51:25 +0800 Subject: [PATCH] [api] Publish DataStream Sink & Source API (#3192) --- docs/content/program-api/flink-api.md | 114 +++++++------- .../flink/sink/cdc/RichCdcSinkBuilder.java | 70 +++++++-- .../flink/action/SortCompactAction.java | 28 ++-- .../paimon/flink/action/TableActionBase.java | 2 +- .../apache/paimon/flink/sink/FlinkSink.java | 17 ++- .../paimon/flink/sink/FlinkSinkBuilder.java | 88 +++++++---- .../paimon/flink/sink/FlinkTableSinkBase.java | 28 ++-- .../flink/sink/LogFlinkSinkBuilder.java | 36 +++++ .../flink/sink/SortCompactSinkBuilder.java | 34 +++++ .../paimon/flink/source/DataTableSource.java | 21 +-- .../flink/source/FlinkSourceBuilder.java | 144 +++++++++++------- .../apache/paimon/flink/FileStoreITCase.java | 113 ++++++++++---- 12 files changed, 469 insertions(+), 226 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java diff --git a/docs/content/program-api/flink-api.md b/docs/content/program-api/flink-api.md index 942208e1aa1a..74625862923c 100644 --- a/docs/content/program-api/flink-api.md +++ b/docs/content/program-api/flink-api.md @@ -61,58 +61,65 @@ Please choose your Flink version. Paimon relies on Hadoop environment, you should add hadoop classpath or bundled jar. -Paimon does not provide a DataStream API, but you can read or write to Paimon tables by the conversion between DataStream and Table in Flink. +Not only DataStream API, you can also read or write to Paimon tables by the conversion between DataStream and Table in Flink. See [DataStream API Integration](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/data_stream_api/). ## Write to Table ```java +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; + +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; public class WriteToTable { - public static void writeTo() { + public static void writeTo() throws Exception { // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // for CONTINUOUS_UNBOUNDED source, set checkpoint interval // env.enableCheckpointing(60_000); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a changelog DataStream - DataStream dataStream = + DataStream input = env.fromElements( - Row.ofKind(RowKind.INSERT, "Alice", 12), - Row.ofKind(RowKind.INSERT, "Bob", 5), - Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), - Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)) + Row.ofKind(RowKind.INSERT, "Alice", 12), + Row.ofKind(RowKind.INSERT, "Bob", 5), + Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)) .returns( Types.ROW_NAMED( - new String[] {"name", "age"}, - Types.STRING, Types.INT)); + new String[] {"name", "age"}, Types.STRING, Types.INT)); - // interpret the DataStream as a Table - Schema schema = Schema.newBuilder() - .column("name", DataTypes.STRING()) - .column("age", DataTypes.INT()) - .build(); - Table table = tableEnv.fromChangelogStream(dataStream, schema); + // get table from catalog + Options catalogOptions = new Options(); + catalogOptions.set("warehouse", "/path/to/warehouse"); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + Table table = catalog.getTable(Identifier.create("my_db", "T")); - // create paimon catalog - tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')"); - tableEnv.executeSql("USE CATALOG paimon"); + DataType inputType = + DataTypes.ROW( + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("age", DataTypes.INT())); + FlinkSinkBuilder builder = new FlinkSinkBuilder(table).forRow(input, inputType); - // register the table under a name and perform an aggregation - tableEnv.createTemporaryView("InputTable", table); + // set sink parallelism + // builder.parallelism(_your_parallelism) - // insert into paimon table from your data stream table - tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT * FROM InputTable"); + // set overwrite mode + // builder.overwrite(...) + + builder.build(); + env.execute(); } } ``` @@ -120,10 +127,14 @@ public class WriteToTable { ## Read from Table ```java +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.source.FlinkSourceBuilder; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; + import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class ReadFromTable { @@ -131,15 +142,22 @@ public class ReadFromTable { public static void readFrom() throws Exception { // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); - // create paimon catalog - tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')"); - tableEnv.executeSql("USE CATALOG paimon"); + // get table from catalog + Options catalogOptions = new Options(); + catalogOptions.set("warehouse", "/path/to/warehouse"); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + Table table = catalog.getTable(Identifier.create("my_db", "T")); + + FlinkSourceBuilder builder = new FlinkSourceBuilder(table).env(env); + + // builder.sourceBounded(true); + // builder.projection(...); + // builder.predicate(...); + // builder.limit(...); + // builder.sourceParallelism(...); - // convert to DataStream - Table table = tableEnv.sqlQuery("SELECT * FROM my_paimon_table"); - DataStream dataStream = tableEnv.toChangelogStream(table); + DataStream dataStream = builder.buildForRow(); // use this datastream dataStream.executeAndCollect().forEachRemaining(System.out::println); @@ -204,29 +222,15 @@ public class WriteCdcToTable { catalogOptions.set("warehouse", "/path/to/warehouse"); Catalog.Loader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); - - new RichCdcSinkBuilder() - .withInput(dataStream) - .withTable(createTableIfNotExists(catalogLoader.load(), identifier)) - .withIdentifier(identifier) - .withCatalogLoader(catalogLoader) + Table table = catalogLoader.load().getTable(identifier); + + new RichCdcSinkBuilder(table) + .forRichCdcRecord(dataStream) + .identifier(identifier) + .catalogLoader(catalogLoader) .build(); env.execute(); } - - private static Table createTableIfNotExists(Catalog catalog, Identifier identifier) throws Exception { - Schema.Builder schemaBuilder = Schema.newBuilder(); - schemaBuilder.primaryKey("order_id"); - schemaBuilder.column("order_id", DataTypes.BIGINT()); - schemaBuilder.column("price", DataTypes.DOUBLE()); - Schema schema = schemaBuilder.build(); - try { - catalog.createTable(identifier, schema, false); - } catch (Catalog.TableAlreadyExistException e) { - // do something - } - return catalog.getTable(identifier); - } } ``` \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java index 77489575743c..610856d3af54 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.sink.cdc; -import org.apache.paimon.annotation.Experimental; +import org.apache.paimon.annotation.Public; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.Table; @@ -28,8 +28,12 @@ import javax.annotation.Nullable; -/** Builder for sink when syncing {@link RichCdcRecord} records into one Paimon table. */ -@Experimental +/** + * DataStream API for building Flink Sink for {@link RichCdcRecord} to write with schema evolution. + * + * @since 0.8 + */ +@Public public class RichCdcSinkBuilder { private DataStream input = null; @@ -39,27 +43,26 @@ public class RichCdcSinkBuilder { @Nullable private Integer parallelism; - public RichCdcSinkBuilder withInput(DataStream input) { - this.input = input; - return this; + public RichCdcSinkBuilder(Table table) { + this.table = table; } - public RichCdcSinkBuilder withTable(Table table) { - this.table = table; + public RichCdcSinkBuilder forRichCdcRecord(DataStream input) { + this.input = input; return this; } - public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) { - this.parallelism = parallelism; + public RichCdcSinkBuilder identifier(Identifier identifier) { + this.identifier = identifier; return this; } - public RichCdcSinkBuilder withIdentifier(Identifier identifier) { - this.identifier = identifier; + public RichCdcSinkBuilder parallelism(@Nullable Integer parallelism) { + this.parallelism = parallelism; return this; } - public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) { + public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) { this.catalogLoader = catalogLoader; return this; } @@ -74,4 +77,45 @@ public DataStreamSink build() { .withCatalogLoader(catalogLoader) .build(); } + + // ====================== Deprecated ============================ + + /** @deprecated Use constructor to pass table. */ + @Deprecated + public RichCdcSinkBuilder() {} + + /** @deprecated Use {@link #forRichCdcRecord}. */ + @Deprecated + public RichCdcSinkBuilder withInput(DataStream input) { + this.input = input; + return this; + } + + /** @deprecated Use constructor to pass Table. */ + @Deprecated + public RichCdcSinkBuilder withTable(Table table) { + this.table = table; + return this; + } + + /** @deprecated Use {@link #parallelism}. */ + @Deprecated + public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) { + this.parallelism = parallelism; + return this; + } + + /** @deprecated Use {@link #identifier}. */ + @Deprecated + public RichCdcSinkBuilder withIdentifier(Identifier identifier) { + this.identifier = identifier; + return this; + } + + /** @deprecated Use {@link #catalogLoader}. */ + @Deprecated + public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) { + this.catalogLoader = catalogLoader; + return this; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index f49be619a139..0936943ed2eb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -20,7 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.flink.sink.SortCompactSinkBuilder; import org.apache.paimon.flink.sorter.TableSorter; import org.apache.paimon.flink.source.FlinkSourceBuilder; import org.apache.paimon.predicate.Predicate; @@ -38,7 +38,6 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -85,12 +84,13 @@ public void build() { } Map tableConfig = fileStoreTable.options(); FlinkSourceBuilder sourceBuilder = - new FlinkSourceBuilder( - ObjectIdentifier.of( - catalogName, - identifier.getDatabaseName(), - identifier.getObjectName()), - fileStoreTable); + new FlinkSourceBuilder(fileStoreTable) + .sourceName( + ObjectIdentifier.of( + catalogName, + identifier.getDatabaseName(), + identifier.getObjectName()) + .asSummaryString()); if (getPartitions() != null) { Predicate partitionPredicate = @@ -98,22 +98,22 @@ public void build() { getPartitions().stream() .map(p -> PredicateBuilder.partition(p, table.rowType())) .toArray(Predicate[]::new)); - sourceBuilder.withPredicate(partitionPredicate); + sourceBuilder.predicate(partitionPredicate); } String scanParallelism = tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key()); if (scanParallelism != null) { - sourceBuilder.withParallelism(Integer.parseInt(scanParallelism)); + sourceBuilder.sourceParallelism(Integer.parseInt(scanParallelism)); } - DataStream source = sourceBuilder.withEnv(env).withContinuousMode(false).build(); + DataStream source = sourceBuilder.env(env).sourceBounded(true).build(); TableSorter sorter = TableSorter.getSorter(env, source, fileStoreTable, sortStrategy, orderColumns); - new FlinkSinkBuilder(fileStoreTable) - .withInput(sorter.sort()) + new SortCompactSinkBuilder(fileStoreTable) .forCompact(true) - .withOverwritePartition(new HashMap<>()) + .forRowData(sorter.sort()) + .overwrite() .build(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index f10f3d625dcd..6a9793f44f0c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -61,7 +61,7 @@ public TableResult batchSink(DataStream dataStream) { List> transformations = Collections.singletonList( new FlinkSinkBuilder((FileStoreTable) table) - .withInput(dataStream) + .forRowData(dataStream) .build() .getTransformation()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 97c426ee5685..f7ebf5afdf24 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -187,9 +187,7 @@ private boolean hasSinkMaterializer(DataStream input) { public DataStream doWrite( DataStream input, String commitUser, @Nullable Integer parallelism) { StreamExecutionEnvironment env = input.getExecutionEnvironment(); - boolean isStreaming = - env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) - == RuntimeExecutionMode.STREAMING; + boolean isStreaming = isStreaming(input); boolean writeOnly = table.coreOptions().writeOnly(); SingleOutputStreamOperator written = @@ -221,10 +219,8 @@ protected DataStreamSink doCommit(DataStream written, String com StreamExecutionEnvironment env = written.getExecutionEnvironment(); ReadableConfig conf = env.getConfiguration(); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); - boolean isStreaming = - conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; boolean streamingCheckpointEnabled = - isStreaming && checkpointConfig.isCheckpointingEnabled(); + isStreaming(written) && checkpointConfig.isCheckpointingEnabled(); if (streamingCheckpointEnabled) { assertStreamingConfiguration(env); } @@ -313,4 +309,13 @@ protected abstract Committer.Factory createCom boolean streamingCheckpointEnabled); protected abstract CommittableStateManager createCommittableStateManager(); + + public static boolean isStreaming(DataStream input) { + return isStreaming(input.getExecutionEnvironment()); + } + + public static boolean isStreaming(StreamExecutionEnvironment env) { + return env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) + == RuntimeExecutionMode.STREAMING; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index b5de897b92f8..3117f5e877db 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -18,77 +18,108 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.annotation.Public; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; import javax.annotation.Nullable; +import java.util.HashMap; import java.util.Map; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; import static org.apache.paimon.utils.Preconditions.checkArgument; -/** Builder for {@link FlinkSink}. */ +/** + * DataStream API for building Flink Sink. + * + * @since 0.8 + */ +@Public public class FlinkSinkBuilder { private final FileStoreTable table; private DataStream input; @Nullable private Map overwritePartition; - @Nullable private LogSinkFunction logSinkFunction; @Nullable private Integer parallelism; - private boolean boundedInput = false; - private boolean compactSink = false; + private Boolean boundedInput = null; - public FlinkSinkBuilder(FileStoreTable table) { - this.table = table; - } + // ============== for extension ============== - public FlinkSinkBuilder withInput(DataStream input) { - this.input = input; - return this; + protected boolean compactSink = false; + @Nullable protected LogSinkFunction logSinkFunction; + + public FlinkSinkBuilder(Table table) { + if (!(table instanceof FileStoreTable)) { + throw new UnsupportedOperationException("Unsupported table type: " + table); + } + this.table = (FileStoreTable) table; } /** - * Whether we need to overwrite partitions. - * - * @param overwritePartition If we pass null, it means not overwrite. If we pass an empty map, - * it means to overwrite every partition it received. If we pass a non-empty map, it means - * we only overwrite the partitions match the map. - * @return returns this. + * From {@link DataStream} with {@link Row}, need to provide a {@link DataType} for builder to + * convert those {@link Row}s to a {@link RowData} DataStream. */ - public FlinkSinkBuilder withOverwritePartition( - @Nullable Map overwritePartition) { - this.overwritePartition = overwritePartition; + public FlinkSinkBuilder forRow(DataStream input, DataType rowDataType) { + RowType rowType = (RowType) rowDataType.getLogicalType(); + DataType[] fieldDataTypes = rowDataType.getChildren().toArray(new DataType[0]); + + DataFormatConverters.RowConverter converter = + new DataFormatConverters.RowConverter(fieldDataTypes); + this.input = + input.map((MapFunction) converter::toInternal) + .setParallelism(input.getParallelism()) + .returns(InternalTypeInfo.of(rowType)); return this; } - public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) { - this.logSinkFunction = logSinkFunction; + /** From {@link DataStream} with {@link RowData}. */ + public FlinkSinkBuilder forRowData(DataStream input) { + this.input = input; return this; } - public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) { - this.parallelism = parallelism; + /** INSERT OVERWRITE. */ + public FlinkSinkBuilder overwrite() { + return overwrite(new HashMap<>()); + } + + /** INSERT OVERWRITE PARTITION (...). */ + public FlinkSinkBuilder overwrite(Map overwritePartition) { + this.overwritePartition = overwritePartition; return this; } - public FlinkSinkBuilder withBoundedInputStream(boolean bounded) { - this.boundedInput = bounded; + /** Set sink parallelism. */ + public FlinkSinkBuilder parallelism(int parallelism) { + this.parallelism = parallelism; return this; } - public FlinkSinkBuilder forCompact(boolean compactSink) { - this.compactSink = compactSink; + /** + * Set input bounded, if it is bounded, append table sink does not generate a topology for + * merging small files. + */ + public FlinkSinkBuilder inputBounded(boolean bounded) { + this.boundedInput = bounded; return this; } + /** Build {@link DataStreamSink}. */ public DataStreamSink build() { DataStream input = MapToInternalRow.map(this.input, table.rowType()); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { @@ -143,6 +174,9 @@ private DataStreamSink buildUnawareBucketSink(DataStream input) checkArgument( table.primaryKeys().isEmpty(), "Unaware bucket mode only works with append-only table for now."); + if (boundedInput == null) { + boundedInput = !FlinkSink.isStreaming(input); + } return new RowUnawareBucketSink( table, overwritePartition, logSinkFunction, parallelism, boundedInput) .sinkFrom(input); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index 30039188d148..b6a944703469 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -22,12 +22,10 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.LogChangelogMode; import org.apache.paimon.CoreOptions.MergeEngine; -import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.PaimonDataStreamSinkProvider; import org.apache.paimon.flink.log.LogSinkProvider; import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.options.Options; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.flink.streaming.api.datastream.DataStream; @@ -47,6 +45,7 @@ import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE; import static org.apache.paimon.CoreOptions.MERGE_ENGINE; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM; /** Table sink to create sink. */ public abstract class FlinkTableSinkBase @@ -125,17 +124,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { final LogSinkFunction logSinkFunction = overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink()); return new PaimonDataStreamSinkProvider( - (dataStream) -> - new FlinkSinkBuilder((FileStoreTable) table) - .withInput( - new DataStream<>( - dataStream.getExecutionEnvironment(), - dataStream.getTransformation())) - .withLogSinkFunction(logSinkFunction) - .withOverwritePartition(overwrite ? staticPartitions : null) - .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM)) - .withBoundedInputStream(context.isBounded()) - .build()); + (dataStream) -> { + LogFlinkSinkBuilder builder = new LogFlinkSinkBuilder(table); + builder.logSinkFunction(logSinkFunction) + .forRowData( + new DataStream<>( + dataStream.getExecutionEnvironment(), + dataStream.getTransformation())) + .inputBounded(context.isBounded()); + if (overwrite) { + builder.overwrite(staticPartitions); + } + conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism); + return builder.build(); + }); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java new file mode 100644 index 000000000000..aa64b3e35f47 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.table.Table; + +import javax.annotation.Nullable; + +/** A special version {@link FlinkSinkBuilder} with log sink. */ +public class LogFlinkSinkBuilder extends FlinkSinkBuilder { + + public LogFlinkSinkBuilder(Table table) { + super(table); + } + + FlinkSinkBuilder logSinkFunction(@Nullable LogSinkFunction logSinkFunction) { + this.logSinkFunction = logSinkFunction; + return this; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java new file mode 100644 index 000000000000..c30ebc824b85 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.table.Table; + +/** A special version {@link FlinkSinkBuilder} for sort compact. */ +public class SortCompactSinkBuilder extends FlinkSinkBuilder { + + public SortCompactSinkBuilder(Table table) { + super(table); + } + + public FlinkSinkBuilder forCompact(boolean compactSink) { + this.compactSink = compactSink; + return this; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index ff0312ee52ae..c4544426fdc6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -190,21 +190,22 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { } FlinkSourceBuilder sourceBuilder = - new FlinkSourceBuilder(tableIdentifier, table) - .withContinuousMode(streaming) - .withLogSourceProvider(logSourceProvider) - .withProjection(projectFields) - .withPredicate(predicate) - .withLimit(limit) - .withWatermarkStrategy(watermarkStrategy) - .withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields); + new FlinkSourceBuilder(table) + .sourceName(tableIdentifier.asSummaryString()) + .sourceBounded(!streaming) + .logSourceProvider(logSourceProvider) + .projection(projectFields) + .predicate(predicate) + .limit(limit) + .watermarkStrategy(watermarkStrategy) + .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields); return new PaimonDataStreamScanProvider( !streaming, env -> sourceBuilder - .withParallelism(inferSourceParallelism(env)) - .withEnv(env) + .sourceParallelism(inferSourceParallelism(env)) + .env(env) .build()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index d8878f360c6f..6b542f71cce5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -24,6 +24,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.Projection; import org.apache.paimon.flink.log.LogSourceProvider; +import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource; import org.apache.paimon.flink.source.operator.MonitorFunction; import org.apache.paimon.flink.utils.TableScanUtils; @@ -35,6 +36,7 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -45,33 +47,38 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; import javax.annotation.Nullable; import java.util.List; import java.util.Optional; +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; /** - * Source builder to build a Flink {@link StaticFileStoreSource} or {@link - * ContinuousFileStoreSource}. This is for normal read/write jobs. + * DataStream API for building Flink Source. + * + * @since 0.8 */ public class FlinkSourceBuilder { - private final ObjectIdentifier tableIdentifier; private final Table table; private final Options conf; - private boolean isContinuous = false; + private String sourceName; + private Boolean sourceBounded; private StreamExecutionEnvironment env; @Nullable private int[][] projectedFields; @Nullable private Predicate predicate; @@ -81,54 +88,61 @@ public class FlinkSourceBuilder { @Nullable private WatermarkStrategy watermarkStrategy; @Nullable private DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo; - public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, Table table) { - this.tableIdentifier = tableIdentifier; + public FlinkSourceBuilder(Table table) { this.table = table; + this.sourceName = table.name(); this.conf = Options.fromMap(table.options()); } - public FlinkSourceBuilder withContinuousMode(boolean isContinuous) { - this.isContinuous = isContinuous; + public FlinkSourceBuilder env(StreamExecutionEnvironment env) { + this.env = env; + if (sourceBounded == null) { + sourceBounded = !FlinkSink.isStreaming(env); + } return this; } - public FlinkSourceBuilder withEnv(StreamExecutionEnvironment env) { - this.env = env; + public FlinkSourceBuilder sourceName(String name) { + this.sourceName = name; return this; } - public FlinkSourceBuilder withProjection(int[][] projectedFields) { - this.projectedFields = projectedFields; + public FlinkSourceBuilder sourceBounded(boolean bounded) { + this.sourceBounded = bounded; return this; } - public FlinkSourceBuilder withPredicate(Predicate predicate) { - this.predicate = predicate; + public FlinkSourceBuilder projection(int[] projectedFields) { + return projection(Projection.of(projectedFields).toNestedIndexes()); + } + + public FlinkSourceBuilder projection(int[][] projectedFields) { + this.projectedFields = projectedFields; return this; } - public FlinkSourceBuilder withLimit(@Nullable Long limit) { - this.limit = limit; + public FlinkSourceBuilder predicate(Predicate predicate) { + this.predicate = predicate; return this; } - public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) { - this.logSourceProvider = logSourceProvider; + public FlinkSourceBuilder limit(@Nullable Long limit) { + this.limit = limit; return this; } - public FlinkSourceBuilder withParallelism(@Nullable Integer parallelism) { + public FlinkSourceBuilder sourceParallelism(@Nullable Integer parallelism) { this.parallelism = parallelism; return this; } - public FlinkSourceBuilder withWatermarkStrategy( + public FlinkSourceBuilder watermarkStrategy( @Nullable WatermarkStrategy watermarkStrategy) { this.watermarkStrategy = watermarkStrategy; return this; } - public FlinkSourceBuilder withDynamicPartitionFilteringFields( + public FlinkSourceBuilder dynamicPartitionFilteringFields( List dynamicPartitionFilteringFields) { if (dynamicPartitionFilteringFields != null && !dynamicPartitionFilteringFields.isEmpty()) { checkState( @@ -144,6 +158,12 @@ public FlinkSourceBuilder withDynamicPartitionFilteringFields( return this; } + @Deprecated + FlinkSourceBuilder logSourceProvider(LogSourceProvider logSourceProvider) { + this.logSourceProvider = logSourceProvider; + return this; + } + private ReadBuilder createReadBuilder() { ReadBuilder readBuilder = table.newReadBuilder().withProjection(projectedFields).withFilter(predicate); @@ -194,7 +214,7 @@ private DataStream toDataStream(Source source) { watermarkStrategy == null ? WatermarkStrategy.noWatermarks() : watermarkStrategy, - tableIdentifier.asSummaryString(), + sourceName, produceTypeInfo()); if (parallelism != null) { dataStream.setParallelism(parallelism); @@ -212,44 +232,58 @@ private TypeInformation produceTypeInfo() { return InternalTypeInfo.of(produceType); } + /** Build source {@link DataStream} with {@link RowData}. */ + public DataStream buildForRow() { + DataType rowType = fromLogicalToDataType(toLogicalType(table.rowType())); + DataType[] fieldDataTypes = rowType.getChildren().toArray(new DataType[0]); + + DataFormatConverters.RowConverter converter = + new DataFormatConverters.RowConverter(fieldDataTypes); + DataStream source = build(); + return source.map((MapFunction) converter::toExternal) + .setParallelism(source.getParallelism()) + .returns(ExternalTypeInfo.of(rowType)); + } + + /** Build source {@link DataStream} with {@link RowData}. */ public DataStream build() { if (env == null) { throw new IllegalArgumentException("StreamExecutionEnvironment should not be null."); } - if (isContinuous) { - TableScanUtils.streamingReadingValidate(table); - - // TODO visit all options through CoreOptions - StartupMode startupMode = CoreOptions.startupMode(conf); - StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf); - - if (logSourceProvider != null && streamingReadMode != FILE) { - if (startupMode != StartupMode.LATEST_FULL) { - return toDataStream(logSourceProvider.createSource(null)); - } else { - return toDataStream( - HybridSource.builder( - LogHybridSourceFactory.buildHybridFirstSource( - table, projectedFields, predicate)) - .addSource( - new LogHybridSourceFactory(logSourceProvider), - Boundedness.CONTINUOUS_UNBOUNDED) - .build()); - } + if (sourceBounded) { + return buildStaticFileSource(); + } + + TableScanUtils.streamingReadingValidate(table); + + // TODO visit all options through CoreOptions + StartupMode startupMode = CoreOptions.startupMode(conf); + StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf); + + if (logSourceProvider != null && streamingReadMode != FILE) { + if (startupMode != StartupMode.LATEST_FULL) { + return toDataStream(logSourceProvider.createSource(null)); } else { - if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) { - return buildAlignedContinuousFileSource(); - } else if (conf.contains(CoreOptions.CONSUMER_ID) - && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) - == CoreOptions.ConsumerMode.EXACTLY_ONCE) { - return buildContinuousStreamOperator(); - } else { - return buildContinuousFileSource(); - } + return toDataStream( + HybridSource.builder( + LogHybridSourceFactory.buildHybridFirstSource( + table, projectedFields, predicate)) + .addSource( + new LogHybridSourceFactory(logSourceProvider), + Boundedness.CONTINUOUS_UNBOUNDED) + .build()); } } else { - return buildStaticFileSource(); + if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) { + return buildAlignedContinuousFileSource(); + } else if (conf.contains(CoreOptions.CONSUMER_ID) + && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) + == CoreOptions.ConsumerMode.EXACTLY_ONCE) { + return buildContinuousStreamOperator(); + } else { + return buildContinuousFileSource(); + } } } @@ -262,7 +296,7 @@ private DataStream buildContinuousStreamOperator() { dataStream = MonitorFunction.buildSource( env, - tableIdentifier.asSummaryString(), + sourceName, produceTypeInfo(), createReadBuilder(), conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(), @@ -276,7 +310,7 @@ private DataStream buildContinuousStreamOperator() { return dataStream; } - public void assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) { + private void assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) { CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkArgument( checkpointConfig.isCheckpointingEnabled(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java index 6b68532a25b9..dff589e9295e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java @@ -35,12 +35,14 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.FailingFileIO; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.data.GenericRowData; @@ -50,6 +52,7 @@ import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; @@ -135,17 +138,54 @@ private static SerializableRowData wrap(RowData row) { return new SerializableRowData(row, InternalSerializers.create(TABLE_TYPE)); } + @TestTemplate + public void testRowSourceSink() throws Exception { + FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); + + // write + DataStreamSource source = buildTestSource(env, isBatch); + DataStream input = + source.map( + (MapFunction) + r -> + Row.of( + r.getInt(0), + r.getString(1).toString(), + r.getInt(2))) + .setParallelism(source.getParallelism()); + DataType inputType = + DataTypes.ROW( + DataTypes.FIELD("v", DataTypes.INT()), + DataTypes.FIELD("p", DataTypes.STRING()), + DataTypes.FIELD("_k", DataTypes.INT())); + new FlinkSinkBuilder(table).forRow(input, inputType).build(); + env.execute(); + + // read + List results = + executeAndCollectRow( + new FlinkSourceBuilder(table).env(env).sourceBounded(true).buildForRow()); + + // assert + Row[] expected = + new Row[] { + Row.of(5, "p2", 1), Row.of(3, "p2", 5), Row.of(5, "p1", 1), Row.of(0, "p1", 2) + }; + assertThat(results).containsExactlyInAnyOrder(expected); + } + @TestTemplate public void testPartitioned() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // read List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + executeAndCollect( + new FlinkSourceBuilder(table).sourceBounded(true).env(env).build()); // assert Row[] expected = @@ -160,12 +200,13 @@ public void testNonPartitioned() throws Exception { FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2}); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // read List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + executeAndCollect( + new FlinkSourceBuilder(table).sourceBounded(true).env(env).build()); // assert Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2), Row.of(3, "p2", 5)}; @@ -179,7 +220,7 @@ public void testOverwrite() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // overwrite p2 @@ -190,15 +231,13 @@ public void testOverwrite() throws Exception { InternalTypeInfo.of(TABLE_TYPE)); Map overwrite = new HashMap<>(); overwrite.put("p", "p2"); - new FlinkSinkBuilder(table) - .withInput(partialData) - .withOverwritePartition(overwrite) - .build(); + new FlinkSinkBuilder(table).forRowData(partialData).overwrite(overwrite).build(); env.execute(); // read List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + executeAndCollect( + new FlinkSourceBuilder(table).sourceBounded(true).env(env).build()); Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), Row.of(0, "p1", 2)}; assertThat(results).containsExactlyInAnyOrder(expected); @@ -209,14 +248,13 @@ public void testOverwrite() throws Exception { Collections.singletonList( wrap(GenericRowData.of(19, StringData.fromString("p2"), 6))), InternalTypeInfo.of(TABLE_TYPE)); - new FlinkSinkBuilder(table) - .withInput(partialData) - .withOverwritePartition(new HashMap<>()) - .build(); + new FlinkSinkBuilder(table).forRowData(partialData).overwrite().build(); env.execute(); // read - results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + results = + executeAndCollect( + new FlinkSourceBuilder(table).sourceBounded(true).env(env).build()); expected = new Row[] {Row.of(19, "p2", 6), Row.of(5, "p1", 1), Row.of(0, "p1", 2)}; assertThat(results).containsExactlyInAnyOrder(expected); @@ -230,13 +268,15 @@ public void testOverwrite() throws Exception { table.copy( Collections.singletonMap( CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"))) - .withInput(partialData) - .withOverwritePartition(new HashMap<>()) + .forRowData(partialData) + .overwrite() .build(); env.execute(); // read - results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + results = + executeAndCollect( + new FlinkSourceBuilder(table).sourceBounded(true).env(env).build()); expected = new Row[] {Row.of(20, "p2", 3)}; assertThat(results).containsExactlyInAnyOrder(expected); } @@ -246,12 +286,13 @@ public void testPartitionedNonKey() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[0]); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // read List results = - executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + executeAndCollect( + new FlinkSourceBuilder(table).sourceBounded(true).env(env).build()); // assert // in streaming mode, expect origin data X 2 (FiniteTestSource) @@ -275,7 +316,7 @@ public void testNonKeyedProjection() throws Exception { private void testProjection(FileStoreTable table) throws Exception { // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // read @@ -288,9 +329,10 @@ private void testProjection(FileStoreTable table) throws Exception { projection.project(TABLE_TYPE))); List results = executeAndCollect( - new FlinkSourceBuilder(IDENTIFIER, table) - .withProjection(projection.toNestedIndexes()) - .withEnv(env) + new FlinkSourceBuilder(table) + .sourceBounded(true) + .projection(projection.toNestedIndexes()) + .env(env) .build(), converter); @@ -328,10 +370,7 @@ public void testContinuousBounded() throws Exception { table.copy( Collections.singletonMap(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "1024")); DataStream source = - new FlinkSourceBuilder(IDENTIFIER, table) - .withContinuousMode(true) - .withEnv(env) - .build(); + new FlinkSourceBuilder(table).sourceBounded(false).env(env).build(); Transformation transformation = source.getTransformation(); assertThat(transformation).isInstanceOf(SourceTransformation.class); assertThat(((SourceTransformation) transformation).getSource().getBoundedness()) @@ -343,9 +382,9 @@ private void innerTestContinuous(FileStoreTable table) throws Exception { BlockingIterator iterator = BlockingIterator.of( - new FlinkSourceBuilder(IDENTIFIER, table) - .withContinuousMode(true) - .withEnv(env) + new FlinkSourceBuilder(table) + .sourceBounded(false) + .env(env) .build() .executeAndCollect(), CONVERTER::toExternal); @@ -382,7 +421,7 @@ private void sinkAndValidate( } DataStreamSource source = env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE)); - new FlinkSinkBuilder(table).withInput(source).build(); + new FlinkSinkBuilder(table).forRowData(source).build(); env.execute(); assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected); } @@ -468,4 +507,14 @@ public static List executeAndCollect( iterator.close(); return results; } + + public static List executeAndCollectRow(DataStream source) throws Exception { + CloseableIterator iterator = source.executeAndCollect(); + List results = new ArrayList<>(); + while (iterator.hasNext()) { + results.add(iterator.next()); + } + iterator.close(); + return results; + } }