Skip to content

Commit

Permalink
[core] support custom commit.user-prefix (apache#3474)
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored Jun 7, 2024
1 parent 3b1810b commit b62923d
Show file tree
Hide file tree
Showing 18 changed files with 101 additions and 38 deletions.
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<td>Boolean</td>
<td>Whether to force create snapshot on commit.</td>
</tr>
<tr>
<td><h5>commit.user-prefix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specifies the commit user prefix.</td>
</tr>
<tr>
<td><h5>compaction.max-size-amplification-percent</h5></td>
<td style="word-wrap: break-word;">200</td>
Expand Down Expand Up @@ -228,7 +234,7 @@
<td><h5>fields.default-aggregate-function</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Default aggregate function of all fields for partial-update and aggregate merge function</td>
<td>Default aggregate function of all fields for partial-update and aggregate merge function.</td>
</tr>
<tr>
<td><h5>file-index.in-manifest-threshold</h5></td>
Expand Down
16 changes: 15 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.options.ConfigOptions.key;
Expand Down Expand Up @@ -1183,7 +1184,13 @@ public class CoreOptions implements Serializable {
.stringType()
.noDefaultValue()
.withDescription(
"Default aggregate function of all fields for partial-update and aggregate merge function");
"Default aggregate function of all fields for partial-update and aggregate merge function.");

public static final ConfigOption<String> COMMIT_USER_PREFIX =
key("commit.user-prefix")
.stringType()
.noDefaultValue()
.withDescription("Specifies the commit user prefix.");

private final Options options;

Expand Down Expand Up @@ -1302,6 +1309,13 @@ public String fieldsDefaultFunc() {
return options.get(FIELDS_DEFAULT_AGG_FUNC);
}

public static String createCommitUser(Options options) {
String commitUserPrefix = options.get(COMMIT_USER_PREFIX);
return commitUserPrefix == null
? UUID.randomUUID().toString()
: commitUserPrefix + "_" + UUID.randomUUID();
}

public boolean definedAggFunc() {
if (options.contains(FIELDS_DEFAULT_AGG_FUNC)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
Expand Down Expand Up @@ -170,7 +170,11 @@ public void dropPartition(Identifier identifier, Map<String, String> partitionSp
throws TableNotExistException {
Table table = getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
commit.dropPartitions(
Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,29 @@

package org.apache.paimon.table.sink;

import org.apache.paimon.options.Options;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.UUID;

import static org.apache.paimon.CoreOptions.createCommitUser;

/** Implementation for {@link WriteBuilder}. */
public class BatchWriteBuilderImpl implements BatchWriteBuilder {

private static final long serialVersionUID = 1L;

private final InnerTable table;
private final String commitUser = UUID.randomUUID().toString();
private final String commitUser;

private Map<String, String> staticPartition;

public BatchWriteBuilderImpl(InnerTable table) {
this.table = table;
this.commitUser = createCommitUser(new Options(table.options()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.paimon.table.sink;

import org.apache.paimon.options.Options;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;

import java.util.UUID;
import static org.apache.paimon.CoreOptions.createCommitUser;

/** Implementation for {@link WriteBuilder}. */
public class StreamWriteBuilderImpl implements StreamWriteBuilder {
Expand All @@ -30,10 +31,11 @@ public class StreamWriteBuilderImpl implements StreamWriteBuilder {

private final InnerTable table;

private String commitUser = UUID.randomUUID().toString();
private String commitUser;

public StreamWriteBuilderImpl(InnerTable table) {
this.table = table;
this.commitUser = createCommitUser(new Options(table.options()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.UUID;

import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;
Expand All @@ -65,16 +64,19 @@ public class FlinkCdcMultiTableSink implements Serializable {
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;
private final boolean commitChaining;
private final String commitUser;

public FlinkCdcMultiTableSink(
Catalog.Loader catalogLoader,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory,
boolean commitChaining) {
boolean commitChaining,
String commitUser) {
this.catalogLoader = catalogLoader;
this.commitCpuCores = commitCpuCores;
this.commitHeapMemory = commitHeapMemory;
this.commitChaining = commitChaining;
this.commitUser = commitUser;
}

private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
Expand All @@ -99,8 +101,7 @@ public DataStreamSink<?> sinkFrom(DataStream<CdcMultiplexRecord> input) {
// commit operators.
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
return sinkFrom(input, initialCommitUser, createWriteProvider());
return sinkFrom(input, commitUser, createWriteProvider());
}

public DataStreamSink<?> sinkFrom(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;

Expand Down Expand Up @@ -76,6 +77,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
// database to sync, currently only support single database
private String database;
private MultiTablesSinkMode mode;
private String commitUser;

public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> input) {
this.input = input;
Expand All @@ -102,6 +104,7 @@ public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options options) {
this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
this.commitChaining = options.get(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING);
this.commitUser = createCommitUser(options);
return this;
}

Expand Down Expand Up @@ -163,7 +166,7 @@ private void buildCombinedCdcSink() {

FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(
catalogLoader, committerCpu, committerMemory, commitChaining);
catalogLoader, committerCpu, committerMemory, commitChaining, commitUser);
sink.sinkFrom(partitioned);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.junit.jupiter.api.Test;

import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -59,7 +60,8 @@ public void cancel() {}
() -> FlinkCatalogFactory.createPaimonCatalog(new Options()),
FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(),
null,
true);
true,
UUID.randomUUID().toString());
DataStreamSink<?> dataStreamSink = sink.sinkFrom(input);

// check the transformation graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.apache.paimon.CoreOptions.createCommitUser;

/** Table drop partition action for Flink. */
public class DropPartitionAction extends TableActionBase {
Expand All @@ -49,7 +50,11 @@ public DropPartitionAction(
this.partitions = partitions;

FileStoreTable fileStoreTable = (FileStoreTable) table;
this.commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
this.commit =
fileStoreTable
.store()
.newCommit(
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

import org.apache.flink.table.procedure.ProcedureContext;

import java.util.UUID;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
Expand All @@ -53,7 +52,11 @@ public String[] call(

FileStoreTable fileStoreTable =
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
commit.dropPartitions(
ParameterUtils.getPartitions(partitionStrings),
BatchWriteBuilder.COMMIT_IDENTIFIER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@

import java.io.Serializable;
import java.util.Map;
import java.util.UUID;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
Expand All @@ -57,7 +57,6 @@ public class CombinedTableCompactorSink implements Serializable {

private final Catalog.Loader catalogLoader;
private final boolean ignorePreviousFiles;

private final Options options;

public CombinedTableCompactorSink(Catalog.Loader catalogLoader, Options options) {
Expand All @@ -74,8 +73,8 @@ public DataStreamSink<?> sinkFrom(
// commit operators.
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
return sinkFrom(awareBucketTableSource, unawareBucketTableSource, initialCommitUser);
return sinkFrom(
awareBucketTableSource, unawareBucketTableSource, createCommitUser(options));
}

public DataStreamSink<?> sinkFrom(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import javax.annotation.Nullable;

import java.util.Map;
import java.util.UUID;

import static org.apache.paimon.CoreOptions.createCommitUser;

/** This class is only used for generate compact sink topology for dynamic bucket table. */
public class DynamicBucketCompactSink extends RowDynamicBucketSink {
Expand All @@ -42,7 +43,7 @@ public DynamicBucketCompactSink(

@Override
public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable Integer parallelism) {
String initialCommitUser = UUID.randomUUID().toString();
String initialCommitUser = createCommitUser(table.coreOptions().toConfiguration());

// This input is sorted and compacted. So there is no shuffle here, we just assign bucket
// for each record, and sink them to table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import javax.annotation.Nullable;

import java.util.Map;
import java.util.UUID;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;

/** Sink for dynamic bucket table. */
Expand All @@ -55,7 +55,7 @@ public DynamicBucketSink(
extractorFunction();

public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer parallelism) {
String initialCommitUser = UUID.randomUUID().toString();
String initialCommitUser = createCommitUser(table.coreOptions().toConfiguration());

// Topology:
// input -- shuffle by key hash --> bucket-assigner -- shuffle by partition & bucket -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;

import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
Expand Down Expand Up @@ -154,8 +154,7 @@ public DataStreamSink<?> sinkFrom(DataStream<T> input) {
// commit operators.
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
return sinkFrom(input, initialCommitUser);
return sinkFrom(input, createCommitUser(table.coreOptions().toConfiguration()));
}

public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Flink table sink that supports row level update and delete. */
Expand Down Expand Up @@ -160,8 +160,12 @@ public boolean applyDeleteFilters(List<ResolvedExpression> list) {

@Override
public Optional<Long> executeDeletion() {
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStoreCommit commit =
((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
fileStoreTable
.store()
.newCommit(
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.truncateTable(identifier);
Expand Down
Loading

0 comments on commit b62923d

Please sign in to comment.