Skip to content

Commit

Permalink
[core] Lookup wait should affect deletion vectors mode and first row …
Browse files Browse the repository at this point in the history
…engine (#3782)
  • Loading branch information
JingsongLi authored Jul 18, 2024
1 parent e1f6528 commit e26fb64
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 21 deletions.
5 changes: 1 addition & 4 deletions docs/content/primary-key-table/compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,12 @@ You can use the following strategies for your table:
```shell
num-sorted-run.stop-trigger = 2147483647
sort-spill-threshold = 10
changelog-producer.lookup-wait = false
lookup-wait = false
```

This configuration will generate more files during peak write periods and gradually merge into optimal read
performance during low write periods.

In the case of `'changelog-producer' = 'lookup'`, by default, the lookup will be completed at checkpointing, which
will block the checkpoint. So if you want an asynchronous lookup, you should also set `'changelog-producer.lookup-wait' = 'false'`.

## Dedicated compaction job

In general, if you expect multiple jobs to be written to the same table, you need to separate the compaction. You can
Expand Down
18 changes: 9 additions & 9 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1279,16 +1279,13 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("Specifies the commit user prefix.");

public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_LOOKUP_WAIT =
key("changelog-producer.lookup-wait")
public static final ConfigOption<Boolean> LOOKUP_WAIT =
key("lookup-wait")
.booleanType()
.defaultValue(true)
.withFallbackKeys("changelog-producer.lookup-wait")
.withDescription(
"When "
+ CoreOptions.CHANGELOG_PRODUCER.key()
+ " is set to "
+ ChangelogProducer.LOOKUP.name()
+ ", commit will wait for changelog generation by lookup.");
"When need to lookup, commit will wait for compaction by lookup.");

public static final ConfigOption<Boolean> METADATA_ICEBERG_COMPATIBLE =
key("metadata.iceberg-compatible")
Expand Down Expand Up @@ -2036,8 +2033,11 @@ public String recordLevelTimeField() {
}

public boolean prepareCommitWaitCompaction() {
return changelogProducer() == ChangelogProducer.LOOKUP
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
if (!needLookup()) {
return false;
}

return options.get(LOOKUP_WAIT);
}

public boolean metadataIcebergCompatible() {
Expand Down
22 changes: 22 additions & 0 deletions paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,26 @@ public void testDeprecatedStartupMode() {
assertThat(new CoreOptions(conf).startupMode())
.isEqualTo(CoreOptions.StartupMode.LATEST_FULL);
}

@Test
public void testPrepareCommitWaitCompaction() {
Options conf = new Options();
CoreOptions options = new CoreOptions(conf);

assertThat(options.prepareCommitWaitCompaction()).isFalse();

conf.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
assertThat(options.prepareCommitWaitCompaction()).isTrue();
conf.remove(CoreOptions.DELETION_VECTORS_ENABLED.key());

conf.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.FIRST_ROW);
assertThat(options.prepareCommitWaitCompaction()).isTrue();
conf.remove(CoreOptions.MERGE_ENGINE.key());

conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.LOOKUP);
assertThat(options.prepareCommitWaitCompaction()).isTrue();

conf.set(CoreOptions.LOOKUP_WAIT, false);
assertThat(options.prepareCommitWaitCompaction()).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ private StoreSinkWrite.Provider createWriteProvider(
}
}

if (changelogProducer == ChangelogProducer.LOOKUP
&& !coreOptions.prepareCommitWaitCompaction()) {
if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) {
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new AsyncLookupSinkWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,7 @@ private StoreSinkWrite.Provider createWriteProvider(
}
}

if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
&& !coreOptions.prepareCommitWaitCompaction()) {
if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) {
return (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
new AsyncLookupSinkWrite(
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,7 @@ private void testLookupChangelogProducerRandom(
"'write-buffer-size' = '%s',",
random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'lookup',"
+ String.format(
"'changelog-producer.lookup-wait' = '%s',", random.nextBoolean())
+ String.format("'lookup-wait' = '%s',", random.nextBoolean())
+ String.format(
"'deletion-vectors.enabled' = '%s'", enableDeletionVectors));

Expand Down Expand Up @@ -551,8 +550,7 @@ private void testStandAloneLookupJobRandom(
"'write-buffer-size' = '%s',",
random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'lookup',"
+ String.format(
"'changelog-producer.lookup-wait' = '%s',", random.nextBoolean())
+ String.format("'lookup-wait' = '%s',", random.nextBoolean())
+ "'write-only' = 'true'");

// sleep for a random amount of time to check
Expand Down

0 comments on commit e26fb64

Please sign in to comment.