Skip to content

Commit

Permalink
[api] Publish DataStream Sink & Source API (#3192)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Apr 15, 2024
1 parent 8ff2537 commit ffb9032
Show file tree
Hide file tree
Showing 12 changed files with 469 additions and 226 deletions.
114 changes: 59 additions & 55 deletions docs/content/program-api/flink-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,85 +61,103 @@ Please choose your Flink version.

Paimon relies on Hadoop environment, you should add hadoop classpath or bundled jar.

Paimon does not provide a DataStream API, but you can read or write to Paimon tables by the conversion between DataStream and Table in Flink.
Not only DataStream API, you can also read or write to Paimon tables by the conversion between DataStream and Table in Flink.
See [DataStream API Integration](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/data_stream_api/).

## Write to Table

```java
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

public class WriteToTable {

public static void writeTo() {
public static void writeTo() throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// for CONTINUOUS_UNBOUNDED source, set checkpoint interval
// env.enableCheckpointing(60_000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// create a changelog DataStream
DataStream<Row> dataStream =
DataStream<Row> input =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
.returns(
Types.ROW_NAMED(
new String[] {"name", "age"},
Types.STRING, Types.INT));
new String[] {"name", "age"}, Types.STRING, Types.INT));

// interpret the DataStream as a Table
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
Table table = tableEnv.fromChangelogStream(dataStream, schema);
// get table from catalog
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalog.getTable(Identifier.create("my_db", "T"));

// create paimon catalog
tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')");
tableEnv.executeSql("USE CATALOG paimon");
DataType inputType =
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()));
FlinkSinkBuilder builder = new FlinkSinkBuilder(table).forRow(input, inputType);

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
// set sink parallelism
// builder.parallelism(_your_parallelism)

// insert into paimon table from your data stream table
tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT * FROM InputTable");
// set overwrite mode
// builder.overwrite(...)

builder.build();
env.execute();
}
}
```

## Read from Table

```java
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class ReadFromTable {

public static void readFrom() throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// create paimon catalog
tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')");
tableEnv.executeSql("USE CATALOG paimon");
// get table from catalog
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalog.getTable(Identifier.create("my_db", "T"));

FlinkSourceBuilder builder = new FlinkSourceBuilder(table).env(env);

// builder.sourceBounded(true);
// builder.projection(...);
// builder.predicate(...);
// builder.limit(...);
// builder.sourceParallelism(...);

// convert to DataStream
Table table = tableEnv.sqlQuery("SELECT * FROM my_paimon_table");
DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
DataStream<Row> dataStream = builder.buildForRow();

// use this datastream
dataStream.executeAndCollect().forEachRemaining(System.out::println);
Expand Down Expand Up @@ -204,29 +222,15 @@ public class WriteCdcToTable {
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog.Loader catalogLoader =
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);

new RichCdcSinkBuilder()
.withInput(dataStream)
.withTable(createTableIfNotExists(catalogLoader.load(), identifier))
.withIdentifier(identifier)
.withCatalogLoader(catalogLoader)
Table table = catalogLoader.load().getTable(identifier);

new RichCdcSinkBuilder(table)
.forRichCdcRecord(dataStream)
.identifier(identifier)
.catalogLoader(catalogLoader)
.build();

env.execute();
}

private static Table createTableIfNotExists(Catalog catalog, Identifier identifier) throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.primaryKey("order_id");
schemaBuilder.column("order_id", DataTypes.BIGINT());
schemaBuilder.column("price", DataTypes.DOUBLE());
Schema schema = schemaBuilder.build();
try {
catalog.createTable(identifier, schema, false);
} catch (Catalog.TableAlreadyExistException e) {
// do something
}
return catalog.getTable(identifier);
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
Expand All @@ -28,8 +28,12 @@

import javax.annotation.Nullable;

/** Builder for sink when syncing {@link RichCdcRecord} records into one Paimon table. */
@Experimental
/**
* DataStream API for building Flink Sink for {@link RichCdcRecord} to write with schema evolution.
*
* @since 0.8
*/
@Public
public class RichCdcSinkBuilder {

private DataStream<RichCdcRecord> input = null;
Expand All @@ -39,27 +43,26 @@ public class RichCdcSinkBuilder {

@Nullable private Integer parallelism;

public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
this.input = input;
return this;
public RichCdcSinkBuilder(Table table) {
this.table = table;
}

public RichCdcSinkBuilder withTable(Table table) {
this.table = table;
public RichCdcSinkBuilder forRichCdcRecord(DataStream<RichCdcRecord> input) {
this.input = input;
return this;
}

public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
this.parallelism = parallelism;
public RichCdcSinkBuilder identifier(Identifier identifier) {
this.identifier = identifier;
return this;
}

public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
this.identifier = identifier;
public RichCdcSinkBuilder parallelism(@Nullable Integer parallelism) {
this.parallelism = parallelism;
return this;
}

public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
Expand All @@ -74,4 +77,45 @@ public DataStreamSink<?> build() {
.withCatalogLoader(catalogLoader)
.build();
}

// ====================== Deprecated ============================

/** @deprecated Use constructor to pass table. */
@Deprecated
public RichCdcSinkBuilder() {}

/** @deprecated Use {@link #forRichCdcRecord}. */
@Deprecated
public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
this.input = input;
return this;
}

/** @deprecated Use constructor to pass Table. */
@Deprecated
public RichCdcSinkBuilder withTable(Table table) {
this.table = table;
return this;
}

/** @deprecated Use {@link #parallelism}. */
@Deprecated
public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
this.parallelism = parallelism;
return this;
}

/** @deprecated Use {@link #identifier}. */
@Deprecated
public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
this.identifier = identifier;
return this;
}

/** @deprecated Use {@link #catalogLoader}. */
@Deprecated
public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
Expand All @@ -38,7 +38,6 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -85,35 +84,36 @@ public void build() {
}
Map<String, String> tableConfig = fileStoreTable.options();
FlinkSourceBuilder sourceBuilder =
new FlinkSourceBuilder(
ObjectIdentifier.of(
catalogName,
identifier.getDatabaseName(),
identifier.getObjectName()),
fileStoreTable);
new FlinkSourceBuilder(fileStoreTable)
.sourceName(
ObjectIdentifier.of(
catalogName,
identifier.getDatabaseName(),
identifier.getObjectName())
.asSummaryString());

if (getPartitions() != null) {
Predicate partitionPredicate =
PredicateBuilder.or(
getPartitions().stream()
.map(p -> PredicateBuilder.partition(p, table.rowType()))
.toArray(Predicate[]::new));
sourceBuilder.withPredicate(partitionPredicate);
sourceBuilder.predicate(partitionPredicate);
}

String scanParallelism = tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
if (scanParallelism != null) {
sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
sourceBuilder.sourceParallelism(Integer.parseInt(scanParallelism));
}

DataStream<RowData> source = sourceBuilder.withEnv(env).withContinuousMode(false).build();
DataStream<RowData> source = sourceBuilder.env(env).sourceBounded(true).build();
TableSorter sorter =
TableSorter.getSorter(env, source, fileStoreTable, sortStrategy, orderColumns);

new FlinkSinkBuilder(fileStoreTable)
.withInput(sorter.sort())
new SortCompactSinkBuilder(fileStoreTable)
.forCompact(true)
.withOverwritePartition(new HashMap<>())
.forRowData(sorter.sort())
.overwrite()
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public TableResult batchSink(DataStream<RowData> dataStream) {
List<Transformation<?>> transformations =
Collections.singletonList(
new FlinkSinkBuilder((FileStoreTable) table)
.withInput(dataStream)
.forRowData(dataStream)
.build()
.getTransformation());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ private boolean hasSinkMaterializer(DataStream<T> input) {
public DataStream<Committable> doWrite(
DataStream<T> input, String commitUser, @Nullable Integer parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming =
env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
boolean isStreaming = isStreaming(input);

boolean writeOnly = table.coreOptions().writeOnly();
SingleOutputStreamOperator<Committable> written =
Expand Down Expand Up @@ -221,10 +219,8 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
StreamExecutionEnvironment env = written.getExecutionEnvironment();
ReadableConfig conf = env.getConfiguration();
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
boolean streamingCheckpointEnabled =
isStreaming && checkpointConfig.isCheckpointingEnabled();
isStreaming(written) && checkpointConfig.isCheckpointingEnabled();
if (streamingCheckpointEnabled) {
assertStreamingConfiguration(env);
}
Expand Down Expand Up @@ -313,4 +309,13 @@ protected abstract Committer.Factory<Committable, ManifestCommittable> createCom
boolean streamingCheckpointEnabled);

protected abstract CommittableStateManager<ManifestCommittable> createCommittableStateManager();

public static boolean isStreaming(DataStream<?> input) {
return isStreaming(input.getExecutionEnvironment());
}

public static boolean isStreaming(StreamExecutionEnvironment env) {
return env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
}
}
Loading

0 comments on commit ffb9032

Please sign in to comment.