Skip to content

Commit

Permalink
expose source too
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Apr 12, 2024
1 parent 8438634 commit 73d68e6
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 138 deletions.
55 changes: 26 additions & 29 deletions docs/content/program-api/flink-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,26 +127,37 @@ public class WriteToTable {
## Read from Table

```java
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class ReadFromTable {

public static void readFrom() throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// create paimon catalog
tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')");
tableEnv.executeSql("USE CATALOG paimon");
// get table from catalog
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalog.getTable(Identifier.create("my_db", "T"));

FlinkSourceBuilder builder = new FlinkSourceBuilder(table).env(env);

// builder.sourceBounded(true);
// builder.projection(...);
// builder.predicate(...);
// builder.limit(...);
// builder.sourceParallelism(...);

// convert to DataStream
Table table = tableEnv.sqlQuery("SELECT * FROM my_paimon_table");
DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
DataStream<Row> dataStream = builder.buildForRow();

// use this datastream
dataStream.executeAndCollect().forEachRemaining(System.out::println);
Expand Down Expand Up @@ -211,29 +222,15 @@ public class WriteCdcToTable {
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog.Loader catalogLoader =
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);

new RichCdcSinkBuilder()
.withInput(dataStream)
.withTable(createTableIfNotExists(catalogLoader.load(), identifier))
.withIdentifier(identifier)
.withCatalogLoader(catalogLoader)
Table table = catalogLoader.load().getTable(identifier);

new RichCdcSinkBuilder(table)
.forRichCdcRecord(dataStream)
.identifier(identifier)
.catalogLoader(catalogLoader)
.build();

env.execute();
}

private static Table createTableIfNotExists(Catalog catalog, Identifier identifier) throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.primaryKey("order_id");
schemaBuilder.column("order_id", DataTypes.BIGINT());
schemaBuilder.column("price", DataTypes.DOUBLE());
Schema schema = schemaBuilder.build();
try {
catalog.createTable(identifier, schema, false);
} catch (Catalog.TableAlreadyExistException e) {
// do something
}
return catalog.getTable(identifier);
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
Expand All @@ -28,8 +28,12 @@

import javax.annotation.Nullable;

/** Builder for sink when syncing {@link RichCdcRecord} records into one Paimon table. */
@Experimental
/**
* DataStream API for building Flink Sink for {@link RichCdcRecord} to write with schema evolution.
*
* @since 0.8
*/
@Public
public class RichCdcSinkBuilder {

private DataStream<RichCdcRecord> input = null;
Expand All @@ -39,27 +43,26 @@ public class RichCdcSinkBuilder {

@Nullable private Integer parallelism;

public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
this.input = input;
return this;
public RichCdcSinkBuilder(Table table) {
this.table = table;
}

public RichCdcSinkBuilder withTable(Table table) {
this.table = table;
public RichCdcSinkBuilder forRichCdcRecord(DataStream<RichCdcRecord> input) {
this.input = input;
return this;
}

public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
this.parallelism = parallelism;
public RichCdcSinkBuilder identifier(Identifier identifier) {
this.identifier = identifier;
return this;
}

public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
this.identifier = identifier;
public RichCdcSinkBuilder parallelism(@Nullable Integer parallelism) {
this.parallelism = parallelism;
return this;
}

public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
public RichCdcSinkBuilder catalogLoader(Catalog.Loader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
Expand All @@ -74,4 +77,45 @@ public DataStreamSink<?> build() {
.withCatalogLoader(catalogLoader)
.build();
}

// ====================== Deprecated ============================

/** @deprecated Use constructor to pass table. */
@Deprecated
public RichCdcSinkBuilder() {}

/** @deprecated Use {@link #forRichCdcRecord}. */
@Deprecated
public RichCdcSinkBuilder withInput(DataStream<RichCdcRecord> input) {
this.input = input;
return this;
}

/** @deprecated Use constructor to pass Table. */
@Deprecated
public RichCdcSinkBuilder withTable(Table table) {
this.table = table;
return this;
}

/** @deprecated Use {@link #parallelism}. */
@Deprecated
public RichCdcSinkBuilder withParallelism(@Nullable Integer parallelism) {
this.parallelism = parallelism;
return this;
}

/** @deprecated Use {@link #identifier}. */
@Deprecated
public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
this.identifier = identifier;
return this;
}

/** @deprecated Use {@link #catalogLoader}. */
@Deprecated
public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
this.catalogLoader = catalogLoader;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,28 +84,29 @@ public void build() {
}
Map<String, String> tableConfig = fileStoreTable.options();
FlinkSourceBuilder sourceBuilder =
new FlinkSourceBuilder(
ObjectIdentifier.of(
catalogName,
identifier.getDatabaseName(),
identifier.getObjectName()),
fileStoreTable);
new FlinkSourceBuilder(fileStoreTable)
.sourceName(
ObjectIdentifier.of(
catalogName,
identifier.getDatabaseName(),
identifier.getObjectName())
.asSummaryString());

if (getPartitions() != null) {
Predicate partitionPredicate =
PredicateBuilder.or(
getPartitions().stream()
.map(p -> PredicateBuilder.partition(p, table.rowType()))
.toArray(Predicate[]::new));
sourceBuilder.withPredicate(partitionPredicate);
sourceBuilder.predicate(partitionPredicate);
}

String scanParallelism = tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
if (scanParallelism != null) {
sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
sourceBuilder.sourceParallelism(Integer.parseInt(scanParallelism));
}

DataStream<RowData> source = sourceBuilder.withEnv(env).withContinuousMode(false).build();
DataStream<RowData> source = sourceBuilder.env(env).sourceBounded(true).build();
TableSorter sorter =
TableSorter.getSorter(env, source, fileStoreTable, sortStrategy, orderColumns);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,11 @@ protected abstract Committer.Factory<Committable, ManifestCommittable> createCom
protected abstract CommittableStateManager<ManifestCommittable> createCommittableStateManager();

public static boolean isStreaming(DataStream<?> input) {
return input.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
return isStreaming(input.getExecutionEnvironment());
}

public static boolean isStreaming(StreamExecutionEnvironment env) {
return env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
}

FlinkSourceBuilder sourceBuilder =
new FlinkSourceBuilder(tableIdentifier, table)
.withContinuousMode(streaming)
.withLogSourceProvider(logSourceProvider)
.withProjection(projectFields)
.withPredicate(predicate)
.withLimit(limit)
.withWatermarkStrategy(watermarkStrategy)
.withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
new FlinkSourceBuilder(table)
.sourceName(tableIdentifier.asSummaryString())
.sourceBounded(!streaming)
.logSourceProvider(logSourceProvider)
.projection(projectFields)
.predicate(predicate)
.limit(limit)
.watermarkStrategy(watermarkStrategy)
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields);

return new PaimonDataStreamScanProvider(
!streaming, env -> configureSource(sourceBuilder, env));
Expand Down Expand Up @@ -244,7 +245,7 @@ private DataStream<RowData> configureSource(
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
}

return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
return sourceBuilder.sourceParallelism(parallelism).env(env).build();
}

private void scanSplitsForInference() {
Expand Down
Loading

0 comments on commit 73d68e6

Please sign in to comment.