Skip to content

Commit

Permalink
[flink] Introduce FlinkConnectorOptions.prepareCommitWaitCompaction t…
Browse files Browse the repository at this point in the history
…o add document
  • Loading branch information
JingsongLi committed Mar 13, 2024
1 parent 384c6e7 commit 8828b14
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {
return NO_CHANGELOG;
}

// In deletionVector mode, since drop delete is required, rewrite is always required.
// TODO In deletionVector mode, since drop delete is required, rewrite is always required.
// TODO wait https://github.com/apache/incubator-paimon/pull/2962
// TODO but should be careful to not be deleted by DeletionVectorsMaintainer!
if (dvMaintainer != null) {
return CHANGELOG_WITH_REWRITE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.paimon.flink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.StreamingReadMode;
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.InlineElement;
Expand All @@ -34,6 +36,7 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
import static org.apache.paimon.options.ConfigOptions.key;
import static org.apache.paimon.options.description.TextElement.text;
Expand Down Expand Up @@ -140,7 +143,7 @@ public class FlinkConnectorOptions {
"When "
+ CoreOptions.CHANGELOG_PRODUCER.key()
+ " is set to "
+ CoreOptions.ChangelogProducer.FULL_COMPACTION.name()
+ ChangelogProducer.FULL_COMPACTION.name()
+ ", full compaction will be constantly triggered after this interval.");

public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_LOOKUP_WAIT =
Expand All @@ -151,7 +154,7 @@ public class FlinkConnectorOptions {
"When "
+ CoreOptions.CHANGELOG_PRODUCER.key()
+ " is set to "
+ CoreOptions.ChangelogProducer.LOOKUP.name()
+ ChangelogProducer.LOOKUP.name()
+ ", commit will wait for changelog generation by lookup.");

public static final ConfigOption<WatermarkEmitStrategy> SCAN_WATERMARK_EMIT_STRATEGY =
Expand Down Expand Up @@ -344,6 +347,20 @@ public static List<ConfigOption<?>> getOptions() {
return list;
}

public static boolean prepareCommitWaitCompaction(Options options) {
if (options.get(DELETION_VECTORS_ENABLED)) {
// DeletionVector (DV) is maintained in the compaction thread, but it needs to be
// read into a file during prepareCommit to submit it.
// We must set waitComparison to true so that there are no multiple threads
// operating DV simultaneously.
return true;
}

ChangelogProducer changelogProducer = options.get(CoreOptions.CHANGELOG_PRODUCER);
return changelogProducer == ChangelogProducer.LOOKUP
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
}

/** The mode of lookup cache. */
public enum LookupCacheMode {
/** Auto mode, try to use partial mode. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@
import java.util.UUID;

import static org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.prepareCommitWaitCompaction;
import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -102,12 +101,7 @@ private StoreSinkWrite.Provider createWriteProvider(
} else {
Options options = table.coreOptions().toConfiguration();
ChangelogProducer changelogProducer = table.coreOptions().changelogProducer();
// todo: deletion vectors support lookup wait
waitCompaction =
(changelogProducer == ChangelogProducer.LOOKUP
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT))
|| options.get(DELETION_VECTORS_ENABLED);

waitCompaction = prepareCommitWaitCompaction(options);
int deltaCommits = -1;
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
import static org.apache.paimon.flink.FlinkConnectorOptions.prepareCommitWaitCompaction;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;

/**
Expand Down Expand Up @@ -241,11 +240,7 @@ private StoreSinkWrite.Provider createWriteProvider(
Options options = fileStoreTable.coreOptions().toConfiguration();
CoreOptions.ChangelogProducer changelogProducer =
fileStoreTable.coreOptions().changelogProducer();
// todo: deletion vectors support lookup wait
waitCompaction =
(changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT))
|| options.get(DELETION_VECTORS_ENABLED);
waitCompaction = prepareCommitWaitCompaction(options);
int deltaCommits = -1;
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
Expand Down

0 comments on commit 8828b14

Please sign in to comment.