Skip to content

Commit

Permalink
Merge branch 'master' into tc-paimon-0.4.0-1.17-newstage
Browse files Browse the repository at this point in the history
# Conflicts:
#	paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
  • Loading branch information
wxplovecc committed Sep 12, 2023
2 parents 510ee1b + e4224a1 commit 1ff6358
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 78 deletions.
13 changes: 12 additions & 1 deletion docs/content/maintenance/write-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ In the initialization of write, the writer of the bucket needs to read all histo
here (For example, writing a large number of partitions simultaneously), you can use `write-manifest-cache` to cache
the read manifest data to accelerate initialization.

## Memory
## Write Memory

There are three main places in Paimon writer that takes up memory:

Expand All @@ -242,3 +242,14 @@ If your Flink job does not rely on state, please avoid using managed memory, whi
```shell
taskmanager.memory.managed.size=1m
```

## Commit Memory

Committer node may use a large memory if the amount of data written to the table is particularly large, OOM may occur
if the memory is too small. In this case, you need to increase the Committer heap memory, but you may not want to
increase the memory of Flink's TaskManager uniformly, which may lead to a waste of memory.

You can use fine-grained-resource-management of Flink to increase committer heap memory only:
1. Configure Flink Configuration `cluster.fine-grained-resource-management.enabled: true`. (This is default after Flink 1.18)
2. Configure Paimon Table Options: `sink.committer-memory`, for example 300 MB, depends on your `TaskManager`.
(`sink.committer-cpu` is also supported)
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@
<td>Duration</td>
<td>If no records flow in a partition of a stream for that amount of time, then that partition is considered "idle" and will not hold back the progress of watermarks in downstream operators.</td>
</tr>
<tr>
<td><h5>sink.committer-cpu</h5></td>
<td style="word-wrap: break-word;">1.0</td>
<td>Double</td>
<td>Sink committer cpu to control cpu cores of global committer.</td>
</tr>
<tr>
<td><h5>sink.committer-memory</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
<td>Sink committer memory to control heap memory of global committer.</td>
</tr>
<tr>
<td><h5>sink.managed.writer-buffer-memory</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
Expand Down Expand Up @@ -168,23 +167,16 @@ public void build(StreamExecutionEnvironment env) throws Exception {
new RichCdcMultiplexRecordEventParser(
schemaBuilder, includingPattern, excludingPattern);

FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"Kafka Source")
.flatMap(recordParser))
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withDatabase(database)
.withMode(MultiTablesSinkMode.COMBINED);
String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
}
sinkBuilder.build();
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.flatMap(recordParser))
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withDatabase(database)
.withMode(MultiTablesSinkMode.COMBINED)
.withTableOptions(tableConfig)
.build();
}

private void validateCaseInsensitive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
Expand Down Expand Up @@ -127,27 +126,18 @@ public void build(StreamExecutionEnvironment env) throws Exception {
() ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder, includingPattern, excludingPattern);
FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"MongoDB Source")
.flatMap(
new MongoDBRecordParser(
caseSensitive,
tableNameConverter,
mongodbConfig)))
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withDatabase(database)
.withMode(MultiTablesSinkMode.COMBINED);
String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
}
sinkBuilder.build();
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB Source")
.flatMap(
new MongoDBRecordParser(
caseSensitive, tableNameConverter, mongodbConfig)))
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withDatabase(database)
.withMode(MultiTablesSinkMode.COMBINED)
.withTableOptions(tableConfig)
.build();
}

private void validateCaseInsensitive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
Expand Down Expand Up @@ -283,23 +282,15 @@ public void build(StreamExecutionEnvironment env) throws Exception {

String database = this.database;
MultiTablesSinkMode mode = this.mode;
FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
new FlinkCdcSyncDatabaseSinkBuilder<String>()
.withInput(
env.fromSource(
source, WatermarkStrategy.noWatermarks(), "MySQL Source"))
.withParserFactory(parserFactory)
.withDatabase(database)
.withCatalogLoader(catalogLoader())
.withTables(fileStoreTables)
.withMode(mode);

String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
}

sinkBuilder.build();
new FlinkCdcSyncDatabaseSinkBuilder<String>()
.withInput(env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source"))
.withParserFactory(parserFactory)
.withDatabase(database)
.withCatalogLoader(catalogLoader())
.withTables(fileStoreTables)
.withMode(mode)
.withTableOptions(tableConfig)
.build();
}

private void validateCaseInsensitive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,20 @@ public class FlinkConnectorOptions {
.withDescription(
"If true, a tag will be automatically created for the snapshot created by flink savepoint.");

public static final ConfigOption<Double> SINK_COMMITTER_CPU =
ConfigOptions.key("sink.committer-cpu")
.doubleType()
.defaultValue(1.0)
.withDescription(
"Sink committer cpu to control cpu cores of global committer.");

public static final ConfigOption<MemorySize> SINK_COMMITTER_MEMORY =
ConfigOptions.key("sink.committer-memory")
.memoryType()
.noDefaultValue()
.withDescription(
"Sink committer memory to control heap memory of global committer.");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
Expand All @@ -41,13 +42,17 @@
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.UUID;

import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -213,14 +218,32 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
}
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " : " + table.name(),
new CommittableTypeInfo(),
committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
GLOBAL_COMMITTER_NAME + " : " + table.name(),
new CommittableTypeInfo(),
committerOperator);
Options options = Options.fromMap(table.options());
configureGlobalCommitter(
committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

public static void configureGlobalCommitter(
SingleOutputStreamOperator<?> committed,
double cpuCores,
@Nullable MemorySize heapMemory) {
committed.setParallelism(1).setMaxParallelism(1);
if (heapMemory != null) {
SlotSharingGroup slotSharingGroup =
SlotSharingGroup.newBuilder(committed.getName())
.setCpuCores(cpuCores)
.setTaskHeapMemory(
new org.apache.flink.configuration.MemorySize(
heapMemory.getBytes()))
.build();
committed.slotSharingGroup(slotSharingGroup);
}
}

public static void assertStreamingConfiguration(StreamExecutionEnvironment env) {
checkArgument(
!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;

import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -42,10 +43,13 @@
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.UUID;

import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;

/**
* A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema change if necessary.
Expand All @@ -58,9 +62,16 @@ public class FlinkCdcMultiTableSink implements Serializable {

private final boolean isOverwrite = false;
private final Catalog.Loader catalogLoader;
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;

public FlinkCdcMultiTableSink(Catalog.Loader catalogLoader) {
public FlinkCdcMultiTableSink(
Catalog.Loader catalogLoader,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory) {
this.catalogLoader = catalogLoader;
this.commitCpuCores = commitCpuCores;
this.commitHeapMemory = commitHeapMemory;
}

private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
Expand Down Expand Up @@ -103,15 +114,14 @@ public DataStreamSink<?> sinkFrom(

SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator<>(
true,
commitUser,
createCommitterFactory(),
createCommittableStateManager()))
.setParallelism(1)
.setMaxParallelism(1);
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator<>(
true,
commitUser,
createCommitterFactory(),
createCommittableStateManager()));
configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -36,6 +39,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
Expand All @@ -60,6 +64,9 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
private List<FileStoreTable> tables = new ArrayList<>();

@Nullable private Integer parallelism;
private double committerCpu;
@Nullable private MemorySize committerMemory;

// Paimon catalog used to check and create tables. There will be two
// places where this catalog is used. 1) in processing function,
// it will check newly added tables and create the corresponding
Expand All @@ -86,8 +93,14 @@ public FlinkCdcSyncDatabaseSinkBuilder<T> withTables(List<FileStoreTable> tables
return this;
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withParallelism(@Nullable Integer parallelism) {
this.parallelism = parallelism;
public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Map<String, String> options) {
return withTableOptions(Options.fromMap(options));
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options options) {
this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
return this;
}

Expand Down Expand Up @@ -148,7 +161,8 @@ private void buildCombinedCdcSink() {
partitioned.setParallelism(parallelism);
}

FlinkCdcMultiTableSink sink = new FlinkCdcMultiTableSink(catalogLoader);
FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(catalogLoader, committerCpu, committerMemory);
sink.sinkFrom(new DataStream<>(input.getExecutionEnvironment(), partitioned));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;

import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;

/** IT cases for {@link FlinkCdcSyncDatabaseSinkBuilder}. */
public class FlinkCdcSyncDatabaseSinkITCase extends AbstractTestBase {

Expand Down Expand Up @@ -160,7 +162,7 @@ private void innerTestRandomCdcEvents(Supplier<Integer> bucket) throws Exception
.withTables(fileStoreTables)
// because we have at most 3 tables and 8 slots in AbstractTestBase
// each table can only get 2 slots
.withParallelism(2)
.withTableOptions(Collections.singletonMap(SINK_PARALLELISM.key(), "2"))
.withDatabase(DATABASE_NAME)
.withCatalogLoader(catalogLoader)
.build();
Expand Down

0 comments on commit 1ff6358

Please sign in to comment.