Skip to content

Commit

Permalink
[api] Publish DataStream Sink API
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Apr 10, 2024
1 parent a5303ac commit 8edf268
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 94 deletions.
59 changes: 33 additions & 26 deletions docs/content/program-api/flink-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,58 +61,65 @@ Please choose your Flink version.

Paimon relies on Hadoop environment, you should add hadoop classpath or bundled jar.

Paimon does not provide a DataStream API, but you can read or write to Paimon tables by the conversion between DataStream and Table in Flink.
Not only DataStream API, you can also read or write to Paimon tables by the conversion between DataStream and Table in Flink.
See [DataStream API Integration](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/data_stream_api/).

## Write to Table

```java
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

public class WriteToTable {

public static void writeTo() {
public static void writeTo() throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// for CONTINUOUS_UNBOUNDED source, set checkpoint interval
// env.enableCheckpointing(60_000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// create a changelog DataStream
DataStream<Row> dataStream =
DataStream<Row> input =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
.returns(
Types.ROW_NAMED(
new String[] {"name", "age"},
Types.STRING, Types.INT));
new String[] {"name", "age"}, Types.STRING, Types.INT));

// interpret the DataStream as a Table
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
Table table = tableEnv.fromChangelogStream(dataStream, schema);
// get table from catalog
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalog.getTable(Identifier.create("my_db", "T"));

// create paimon catalog
tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')");
tableEnv.executeSql("USE CATALOG paimon");
DataType inputType =
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()));
FlinkSinkBuilder builder = new FlinkSinkBuilder(table).forRow(input, inputType);

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
// set sink parallelism
// builder.parallelism(_your_parallelism)

// insert into paimon table from your data stream table
tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT * FROM InputTable");
// set overwrite mode
// builder.overwrite(...)

builder.build();
env.execute();
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
Expand All @@ -38,7 +38,6 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -110,10 +109,10 @@ public void build() {
TableSorter sorter =
TableSorter.getSorter(env, source, fileStoreTable, sortStrategy, orderColumns);

new FlinkSinkBuilder(fileStoreTable)
.withInput(sorter.sort())
new SortCompactSinkBuilder(fileStoreTable)
.forCompact(true)
.withOverwritePartition(new HashMap<>())
.forRowData(sorter.sort())
.overwrite()
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public TableResult batchSink(DataStream<RowData> dataStream) {
List<Transformation<?>> transformations =
Collections.singletonList(
new FlinkSinkBuilder((FileStoreTable) table)
.withInput(dataStream)
.forRowData(dataStream)
.build()
.getTransformation());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ private boolean hasSinkMaterializer(DataStream<T> input) {
public DataStream<Committable> doWrite(
DataStream<T> input, String commitUser, @Nullable Integer parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming =
env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
boolean isStreaming = isStreaming(input);

boolean writeOnly = table.coreOptions().writeOnly();
SingleOutputStreamOperator<Committable> written =
Expand Down Expand Up @@ -221,10 +219,8 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
StreamExecutionEnvironment env = written.getExecutionEnvironment();
ReadableConfig conf = env.getConfiguration();
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
boolean streamingCheckpointEnabled =
isStreaming && checkpointConfig.isCheckpointingEnabled();
isStreaming(written) && checkpointConfig.isCheckpointingEnabled();
if (streamingCheckpointEnabled) {
assertStreamingConfiguration(env);
}
Expand Down Expand Up @@ -313,4 +309,9 @@ protected abstract Committer.Factory<Committable, ManifestCommittable> createCom
boolean streamingCheckpointEnabled);

protected abstract CommittableStateManager<ManifestCommittable> createCommittableStateManager();

public static boolean isStreaming(DataStream<?> input) {
return input.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,77 +18,107 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;

import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Builder for {@link FlinkSink}. */
/**
* DataStream API for building Flink Sink.
*
* @since 0.8
*/
@Public
public class FlinkSinkBuilder {

private final FileStoreTable table;

private DataStream<RowData> input;
@Nullable private Map<String, String> overwritePartition;
@Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;
private boolean boundedInput = false;
private boolean compactSink = false;
private Boolean boundedInput = null;

public FlinkSinkBuilder(FileStoreTable table) {
this.table = table;
}
// ============== for extension ==============

public FlinkSinkBuilder withInput(DataStream<RowData> input) {
this.input = input;
return this;
protected boolean compactSink = false;
@Nullable protected LogSinkFunction logSinkFunction;

public FlinkSinkBuilder(Table table) {
if (!(table instanceof FileStoreTable)) {
throw new UnsupportedOperationException("Unsupported table type: " + table);
}
this.table = (FileStoreTable) table;
}

/**
* Whether we need to overwrite partitions.
*
* @param overwritePartition If we pass null, it means not overwrite. If we pass an empty map,
* it means to overwrite every partition it received. If we pass a non-empty map, it means
* we only overwrite the partitions match the map.
* @return returns this.
* From {@link DataStream} with {@link Row}, need to provide a {@link DataType} for builder to
* convert those {@link Row}s to a {@link RowData} DataStream.
*/
public FlinkSinkBuilder withOverwritePartition(
@Nullable Map<String, String> overwritePartition) {
this.overwritePartition = overwritePartition;
public FlinkSinkBuilder forRow(DataStream<Row> input, DataType rowDataType) {
RowType rowType = (RowType) rowDataType.getLogicalType();
DataType[] fieldDataTypes = rowDataType.getChildren().toArray(new DataType[0]);

DataFormatConverters.RowConverter converter =
new DataFormatConverters.RowConverter(fieldDataTypes);
this.input =
input.map((MapFunction<Row, RowData>) converter::toInternal)
.returns(InternalTypeInfo.of(rowType));
return this;
}

public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) {
this.logSinkFunction = logSinkFunction;
/** From {@link DataStream} with {@link RowData}. */
public FlinkSinkBuilder forRowData(DataStream<RowData> input) {
this.input = input;
return this;
}

public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
this.parallelism = parallelism;
/** INSERT OVERWRITE. */
public FlinkSinkBuilder overwrite() {
return overwrite(new HashMap<>());
}

/** INSERT OVERWRITE PARTITION (...). */
public FlinkSinkBuilder overwrite(Map<String, String> overwritePartition) {
this.overwritePartition = overwritePartition;
return this;
}

public FlinkSinkBuilder withBoundedInputStream(boolean bounded) {
this.boundedInput = bounded;
/** Set sink parallelism. */
public FlinkSinkBuilder parallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}

public FlinkSinkBuilder forCompact(boolean compactSink) {
this.compactSink = compactSink;
/**
* Set input bounded, if it is bounded, append table sink does not generate a topology for
* merging small files.
*/
public FlinkSinkBuilder inputBounded(boolean bounded) {
this.boundedInput = bounded;
return this;
}

/** Build {@link DataStreamSink}. */
public DataStreamSink<?> build() {
DataStream<InternalRow> input = MapToInternalRow.map(this.input, table.rowType());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
Expand Down Expand Up @@ -143,6 +173,9 @@ private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> input)
checkArgument(
table.primaryKeys().isEmpty(),
"Unaware bucket mode only works with append-only table for now.");
if (boundedInput == null) {
boundedInput = !FlinkSink.isStreaming(input);
}
return new RowUnawareBucketSink(
table, overwritePartition, logSinkFunction, parallelism, boundedInput)
.sinkFrom(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
import org.apache.paimon.flink.log.LogSinkProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -47,6 +45,7 @@
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;

/** Table sink to create sink. */
public abstract class FlinkTableSinkBase
Expand Down Expand Up @@ -125,17 +124,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final LogSinkFunction logSinkFunction =
overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
return new PaimonDataStreamSinkProvider(
(dataStream) ->
new FlinkSinkBuilder((FileStoreTable) table)
.withInput(
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
.withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ? staticPartitions : null)
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
.withBoundedInputStream(context.isBounded())
.build());
(dataStream) -> {
LogFlinkSinkBuilder builder = new LogFlinkSinkBuilder(table);
builder.logSinkFunction(logSinkFunction)
.forRowData(
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
.inputBounded(context.isBounded());
if (overwrite) {
builder.overwrite(staticPartitions);
}
conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism);
return builder.build();
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.table.Table;

import javax.annotation.Nullable;

/** A special version {@link FlinkSinkBuilder} with log sink. */
public class LogFlinkSinkBuilder extends FlinkSinkBuilder {

public LogFlinkSinkBuilder(Table table) {
super(table);
}

FlinkSinkBuilder logSinkFunction(@Nullable LogSinkFunction logSinkFunction) {
this.logSinkFunction = logSinkFunction;
return this;
}
}
Loading

0 comments on commit 8edf268

Please sign in to comment.