From 8edf2680ed18105a0ef821b6a4830a345e8f9d72 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 10 Apr 2024 16:43:46 +0800 Subject: [PATCH] [api] Publish DataStream Sink API --- docs/content/program-api/flink-api.md | 59 +++++++------ .../flink/action/SortCompactAction.java | 9 +- .../paimon/flink/action/TableActionBase.java | 2 +- .../apache/paimon/flink/sink/FlinkSink.java | 13 +-- .../paimon/flink/sink/FlinkSinkBuilder.java | 87 +++++++++++++------ .../paimon/flink/sink/FlinkTableSinkBase.java | 28 +++--- .../flink/sink/LogFlinkSinkBuilder.java | 36 ++++++++ .../flink/sink/SortCompactSinkBuilder.java | 34 ++++++++ .../apache/paimon/flink/FileStoreITCase.java | 63 ++++++++++---- 9 files changed, 237 insertions(+), 94 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java diff --git a/docs/content/program-api/flink-api.md b/docs/content/program-api/flink-api.md index 942208e1aa1a5..0fe555eed84d3 100644 --- a/docs/content/program-api/flink-api.md +++ b/docs/content/program-api/flink-api.md @@ -61,58 +61,65 @@ Please choose your Flink version. Paimon relies on Hadoop environment, you should add hadoop classpath or bundled jar. -Paimon does not provide a DataStream API, but you can read or write to Paimon tables by the conversion between DataStream and Table in Flink. +Not only DataStream API, you can also read or write to Paimon tables by the conversion between DataStream and Table in Flink. See [DataStream API Integration](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/data_stream_api/). ## Write to Table ```java +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; + +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; public class WriteToTable { - public static void writeTo() { + public static void writeTo() throws Exception { // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // for CONTINUOUS_UNBOUNDED source, set checkpoint interval // env.enableCheckpointing(60_000); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a changelog DataStream - DataStream dataStream = + DataStream input = env.fromElements( - Row.ofKind(RowKind.INSERT, "Alice", 12), - Row.ofKind(RowKind.INSERT, "Bob", 5), - Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), - Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)) + Row.ofKind(RowKind.INSERT, "Alice", 12), + Row.ofKind(RowKind.INSERT, "Bob", 5), + Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)) .returns( Types.ROW_NAMED( - new String[] {"name", "age"}, - Types.STRING, Types.INT)); + new String[] {"name", "age"}, Types.STRING, Types.INT)); - // interpret the DataStream as a Table - Schema schema = Schema.newBuilder() - .column("name", DataTypes.STRING()) - .column("age", DataTypes.INT()) - .build(); - Table table = tableEnv.fromChangelogStream(dataStream, schema); + // get table from catalog + Options catalogOptions = new Options(); + catalogOptions.set("warehouse", "/path/to/warehouse"); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + Table table = catalog.getTable(Identifier.create("my_db", "T")); - // create paimon catalog - tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')"); - tableEnv.executeSql("USE CATALOG paimon"); + DataType inputType = + DataTypes.ROW( + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("age", DataTypes.INT())); + FlinkSinkBuilder builder = new FlinkSinkBuilder(table).forRow(input, inputType); - // register the table under a name and perform an aggregation - tableEnv.createTemporaryView("InputTable", table); + // set sink parallelism + // builder.parallelism(_your_parallelism) - // insert into paimon table from your data stream table - tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT * FROM InputTable"); + // set overwrite mode + // builder.overwrite(...) + + builder.build(); + env.execute(); } } ``` diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index f49be619a1399..44d8acdad2dc6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -20,7 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.flink.sink.SortCompactSinkBuilder; import org.apache.paimon.flink.sorter.TableSorter; import org.apache.paimon.flink.source.FlinkSourceBuilder; import org.apache.paimon.predicate.Predicate; @@ -38,7 +38,6 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -110,10 +109,10 @@ public void build() { TableSorter sorter = TableSorter.getSorter(env, source, fileStoreTable, sortStrategy, orderColumns); - new FlinkSinkBuilder(fileStoreTable) - .withInput(sorter.sort()) + new SortCompactSinkBuilder(fileStoreTable) .forCompact(true) - .withOverwritePartition(new HashMap<>()) + .forRowData(sorter.sort()) + .overwrite() .build(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index f10f3d625dcde..6a9793f44f0cf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -61,7 +61,7 @@ public TableResult batchSink(DataStream dataStream) { List> transformations = Collections.singletonList( new FlinkSinkBuilder((FileStoreTable) table) - .withInput(dataStream) + .forRowData(dataStream) .build() .getTransformation()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 97c426ee56854..c65ef6a354401 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -187,9 +187,7 @@ private boolean hasSinkMaterializer(DataStream input) { public DataStream doWrite( DataStream input, String commitUser, @Nullable Integer parallelism) { StreamExecutionEnvironment env = input.getExecutionEnvironment(); - boolean isStreaming = - env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) - == RuntimeExecutionMode.STREAMING; + boolean isStreaming = isStreaming(input); boolean writeOnly = table.coreOptions().writeOnly(); SingleOutputStreamOperator written = @@ -221,10 +219,8 @@ protected DataStreamSink doCommit(DataStream written, String com StreamExecutionEnvironment env = written.getExecutionEnvironment(); ReadableConfig conf = env.getConfiguration(); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); - boolean isStreaming = - conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; boolean streamingCheckpointEnabled = - isStreaming && checkpointConfig.isCheckpointingEnabled(); + isStreaming(written) && checkpointConfig.isCheckpointingEnabled(); if (streamingCheckpointEnabled) { assertStreamingConfiguration(env); } @@ -313,4 +309,9 @@ protected abstract Committer.Factory createCom boolean streamingCheckpointEnabled); protected abstract CommittableStateManager createCommittableStateManager(); + + public static boolean isStreaming(DataStream input) { + return input.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE) + == RuntimeExecutionMode.STREAMING; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index b5de897b92f83..8a51cf0190ea1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -18,77 +18,107 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.annotation.Public; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; import javax.annotation.Nullable; +import java.util.HashMap; import java.util.Map; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; import static org.apache.paimon.utils.Preconditions.checkArgument; -/** Builder for {@link FlinkSink}. */ +/** + * DataStream API for building Flink Sink. + * + * @since 0.8 + */ +@Public public class FlinkSinkBuilder { private final FileStoreTable table; private DataStream input; @Nullable private Map overwritePartition; - @Nullable private LogSinkFunction logSinkFunction; @Nullable private Integer parallelism; - private boolean boundedInput = false; - private boolean compactSink = false; + private Boolean boundedInput = null; - public FlinkSinkBuilder(FileStoreTable table) { - this.table = table; - } + // ============== for extension ============== - public FlinkSinkBuilder withInput(DataStream input) { - this.input = input; - return this; + protected boolean compactSink = false; + @Nullable protected LogSinkFunction logSinkFunction; + + public FlinkSinkBuilder(Table table) { + if (!(table instanceof FileStoreTable)) { + throw new UnsupportedOperationException("Unsupported table type: " + table); + } + this.table = (FileStoreTable) table; } /** - * Whether we need to overwrite partitions. - * - * @param overwritePartition If we pass null, it means not overwrite. If we pass an empty map, - * it means to overwrite every partition it received. If we pass a non-empty map, it means - * we only overwrite the partitions match the map. - * @return returns this. + * From {@link DataStream} with {@link Row}, need to provide a {@link DataType} for builder to + * convert those {@link Row}s to a {@link RowData} DataStream. */ - public FlinkSinkBuilder withOverwritePartition( - @Nullable Map overwritePartition) { - this.overwritePartition = overwritePartition; + public FlinkSinkBuilder forRow(DataStream input, DataType rowDataType) { + RowType rowType = (RowType) rowDataType.getLogicalType(); + DataType[] fieldDataTypes = rowDataType.getChildren().toArray(new DataType[0]); + + DataFormatConverters.RowConverter converter = + new DataFormatConverters.RowConverter(fieldDataTypes); + this.input = + input.map((MapFunction) converter::toInternal) + .returns(InternalTypeInfo.of(rowType)); return this; } - public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) { - this.logSinkFunction = logSinkFunction; + /** From {@link DataStream} with {@link RowData}. */ + public FlinkSinkBuilder forRowData(DataStream input) { + this.input = input; return this; } - public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) { - this.parallelism = parallelism; + /** INSERT OVERWRITE. */ + public FlinkSinkBuilder overwrite() { + return overwrite(new HashMap<>()); + } + + /** INSERT OVERWRITE PARTITION (...). */ + public FlinkSinkBuilder overwrite(Map overwritePartition) { + this.overwritePartition = overwritePartition; return this; } - public FlinkSinkBuilder withBoundedInputStream(boolean bounded) { - this.boundedInput = bounded; + /** Set sink parallelism. */ + public FlinkSinkBuilder parallelism(int parallelism) { + this.parallelism = parallelism; return this; } - public FlinkSinkBuilder forCompact(boolean compactSink) { - this.compactSink = compactSink; + /** + * Set input bounded, if it is bounded, append table sink does not generate a topology for + * merging small files. + */ + public FlinkSinkBuilder inputBounded(boolean bounded) { + this.boundedInput = bounded; return this; } + /** Build {@link DataStreamSink}. */ public DataStreamSink build() { DataStream input = MapToInternalRow.map(this.input, table.rowType()); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { @@ -143,6 +173,9 @@ private DataStreamSink buildUnawareBucketSink(DataStream input) checkArgument( table.primaryKeys().isEmpty(), "Unaware bucket mode only works with append-only table for now."); + if (boundedInput == null) { + boundedInput = !FlinkSink.isStreaming(input); + } return new RowUnawareBucketSink( table, overwritePartition, logSinkFunction, parallelism, boundedInput) .sinkFrom(input); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index 30039188d148e..b6a9447034698 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -22,12 +22,10 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.LogChangelogMode; import org.apache.paimon.CoreOptions.MergeEngine; -import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.PaimonDataStreamSinkProvider; import org.apache.paimon.flink.log.LogSinkProvider; import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.options.Options; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.flink.streaming.api.datastream.DataStream; @@ -47,6 +45,7 @@ import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE; import static org.apache.paimon.CoreOptions.MERGE_ENGINE; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM; /** Table sink to create sink. */ public abstract class FlinkTableSinkBase @@ -125,17 +124,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { final LogSinkFunction logSinkFunction = overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink()); return new PaimonDataStreamSinkProvider( - (dataStream) -> - new FlinkSinkBuilder((FileStoreTable) table) - .withInput( - new DataStream<>( - dataStream.getExecutionEnvironment(), - dataStream.getTransformation())) - .withLogSinkFunction(logSinkFunction) - .withOverwritePartition(overwrite ? staticPartitions : null) - .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM)) - .withBoundedInputStream(context.isBounded()) - .build()); + (dataStream) -> { + LogFlinkSinkBuilder builder = new LogFlinkSinkBuilder(table); + builder.logSinkFunction(logSinkFunction) + .forRowData( + new DataStream<>( + dataStream.getExecutionEnvironment(), + dataStream.getTransformation())) + .inputBounded(context.isBounded()); + if (overwrite) { + builder.overwrite(staticPartitions); + } + conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism); + return builder.build(); + }); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java new file mode 100644 index 0000000000000..aa64b3e35f47c --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogFlinkSinkBuilder.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.table.Table; + +import javax.annotation.Nullable; + +/** A special version {@link FlinkSinkBuilder} with log sink. */ +public class LogFlinkSinkBuilder extends FlinkSinkBuilder { + + public LogFlinkSinkBuilder(Table table) { + super(table); + } + + FlinkSinkBuilder logSinkFunction(@Nullable LogSinkFunction logSinkFunction) { + this.logSinkFunction = logSinkFunction; + return this; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java new file mode 100644 index 0000000000000..c30ebc824b850 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SortCompactSinkBuilder.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.table.Table; + +/** A special version {@link FlinkSinkBuilder} for sort compact. */ +public class SortCompactSinkBuilder extends FlinkSinkBuilder { + + public SortCompactSinkBuilder(Table table) { + super(table); + } + + public FlinkSinkBuilder forCompact(boolean compactSink) { + this.compactSink = compactSink; + return this; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java index 6b68532a25b9d..4fbae980877df 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java @@ -35,12 +35,14 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.FailingFileIO; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.data.GenericRowData; @@ -50,6 +52,7 @@ import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; @@ -135,12 +138,46 @@ private static SerializableRowData wrap(RowData row) { return new SerializableRowData(row, InternalSerializers.create(TABLE_TYPE)); } + @TestTemplate + public void testRowSink() throws Exception { + FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); + + // write + DataStream input = + buildTestSource(env, isBatch) + .map( + (MapFunction) + r -> + Row.of( + r.getInt(0), + r.getString(1).toString(), + r.getInt(2))); + DataType inputType = + DataTypes.ROW( + DataTypes.FIELD("v", DataTypes.INT()), + DataTypes.FIELD("p", DataTypes.STRING()), + DataTypes.FIELD("_k", DataTypes.INT())); + new FlinkSinkBuilder(table).forRow(input, inputType).build(); + env.execute(); + + // read + List results = + executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build()); + + // assert + Row[] expected = + new Row[] { + Row.of(5, "p2", 1), Row.of(3, "p2", 5), Row.of(5, "p1", 1), Row.of(0, "p1", 2) + }; + assertThat(results).containsExactlyInAnyOrder(expected); + } + @TestTemplate public void testPartitioned() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // read @@ -160,7 +197,7 @@ public void testNonPartitioned() throws Exception { FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2}); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // read @@ -179,7 +216,7 @@ public void testOverwrite() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // overwrite p2 @@ -190,10 +227,7 @@ public void testOverwrite() throws Exception { InternalTypeInfo.of(TABLE_TYPE)); Map overwrite = new HashMap<>(); overwrite.put("p", "p2"); - new FlinkSinkBuilder(table) - .withInput(partialData) - .withOverwritePartition(overwrite) - .build(); + new FlinkSinkBuilder(table).forRowData(partialData).overwrite(overwrite).build(); env.execute(); // read @@ -209,10 +243,7 @@ public void testOverwrite() throws Exception { Collections.singletonList( wrap(GenericRowData.of(19, StringData.fromString("p2"), 6))), InternalTypeInfo.of(TABLE_TYPE)); - new FlinkSinkBuilder(table) - .withInput(partialData) - .withOverwritePartition(new HashMap<>()) - .build(); + new FlinkSinkBuilder(table).forRowData(partialData).overwrite().build(); env.execute(); // read @@ -230,8 +261,8 @@ public void testOverwrite() throws Exception { table.copy( Collections.singletonMap( CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"))) - .withInput(partialData) - .withOverwritePartition(new HashMap<>()) + .forRowData(partialData) + .overwrite() .build(); env.execute(); @@ -246,7 +277,7 @@ public void testPartitionedNonKey() throws Exception { FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[0]); // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // read @@ -275,7 +306,7 @@ public void testNonKeyedProjection() throws Exception { private void testProjection(FileStoreTable table) throws Exception { // write - new FlinkSinkBuilder(table).withInput(buildTestSource(env, isBatch)).build(); + new FlinkSinkBuilder(table).forRowData(buildTestSource(env, isBatch)).build(); env.execute(); // read @@ -382,7 +413,7 @@ private void sinkAndValidate( } DataStreamSource source = env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE)); - new FlinkSinkBuilder(table).withInput(source).build(); + new FlinkSinkBuilder(table).forRowData(source).build(); env.execute(); assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected); }