From e26fb64b9dc95ec4bceacd94a1794d3c8bd7e3da Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Thu, 18 Jul 2024 22:16:59 +0800 Subject: [PATCH] [core] Lookup wait should affect deletion vectors mode and first row engine (#3782) --- docs/content/primary-key-table/compaction.md | 5 +---- .../java/org/apache/paimon/CoreOptions.java | 18 +++++++-------- .../org/apache/paimon/CoreOptionsTest.java | 22 +++++++++++++++++++ .../apache/paimon/flink/sink/FlinkSink.java | 3 +-- .../sink/MultiTablesStoreCompactOperator.java | 3 +-- .../flink/PrimaryKeyFileStoreTableITCase.java | 6 ++--- 6 files changed, 36 insertions(+), 21 deletions(-) diff --git a/docs/content/primary-key-table/compaction.md b/docs/content/primary-key-table/compaction.md index c7c866581c96..e5040a03aadd 100644 --- a/docs/content/primary-key-table/compaction.md +++ b/docs/content/primary-key-table/compaction.md @@ -59,15 +59,12 @@ You can use the following strategies for your table: ```shell num-sorted-run.stop-trigger = 2147483647 sort-spill-threshold = 10 -changelog-producer.lookup-wait = false +lookup-wait = false ``` This configuration will generate more files during peak write periods and gradually merge into optimal read performance during low write periods. -In the case of `'changelog-producer' = 'lookup'`, by default, the lookup will be completed at checkpointing, which -will block the checkpoint. So if you want an asynchronous lookup, you should also set `'changelog-producer.lookup-wait' = 'false'`. - ## Dedicated compaction job In general, if you expect multiple jobs to be written to the same table, you need to separate the compaction. You can diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 5b2f8aa77526..b368b483a47f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1279,16 +1279,13 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("Specifies the commit user prefix."); - public static final ConfigOption CHANGELOG_PRODUCER_LOOKUP_WAIT = - key("changelog-producer.lookup-wait") + public static final ConfigOption LOOKUP_WAIT = + key("lookup-wait") .booleanType() .defaultValue(true) + .withFallbackKeys("changelog-producer.lookup-wait") .withDescription( - "When " - + CoreOptions.CHANGELOG_PRODUCER.key() - + " is set to " - + ChangelogProducer.LOOKUP.name() - + ", commit will wait for changelog generation by lookup."); + "When need to lookup, commit will wait for compaction by lookup."); public static final ConfigOption METADATA_ICEBERG_COMPATIBLE = key("metadata.iceberg-compatible") @@ -2036,8 +2033,11 @@ public String recordLevelTimeField() { } public boolean prepareCommitWaitCompaction() { - return changelogProducer() == ChangelogProducer.LOOKUP - && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT); + if (!needLookup()) { + return false; + } + + return options.get(LOOKUP_WAIT); } public boolean metadataIcebergCompatible() { diff --git a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java index b8d134916e9e..bf5445fc10c7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java @@ -64,4 +64,26 @@ public void testDeprecatedStartupMode() { assertThat(new CoreOptions(conf).startupMode()) .isEqualTo(CoreOptions.StartupMode.LATEST_FULL); } + + @Test + public void testPrepareCommitWaitCompaction() { + Options conf = new Options(); + CoreOptions options = new CoreOptions(conf); + + assertThat(options.prepareCommitWaitCompaction()).isFalse(); + + conf.set(CoreOptions.DELETION_VECTORS_ENABLED, true); + assertThat(options.prepareCommitWaitCompaction()).isTrue(); + conf.remove(CoreOptions.DELETION_VECTORS_ENABLED.key()); + + conf.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.FIRST_ROW); + assertThat(options.prepareCommitWaitCompaction()).isTrue(); + conf.remove(CoreOptions.MERGE_ENGINE.key()); + + conf.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.LOOKUP); + assertThat(options.prepareCommitWaitCompaction()).isTrue(); + + conf.set(CoreOptions.LOOKUP_WAIT, false); + assertThat(options.prepareCommitWaitCompaction()).isFalse(); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index f369ec31c3d5..865b2a939e2e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -134,8 +134,7 @@ private StoreSinkWrite.Provider createWriteProvider( } } - if (changelogProducer == ChangelogProducer.LOOKUP - && !coreOptions.prepareCommitWaitCompaction()) { + if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) { return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { assertNoSinkMaterializer.run(); return new AsyncLookupSinkWrite( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index b25ec0dfccd0..f253a3bf8e79 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -275,8 +275,7 @@ private StoreSinkWrite.Provider createWriteProvider( } } - if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP - && !coreOptions.prepareCommitWaitCompaction()) { + if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) { return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> new AsyncLookupSinkWrite( table, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 0d6f37ce6602..1a8da3c5b7f9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -487,8 +487,7 @@ private void testLookupChangelogProducerRandom( "'write-buffer-size' = '%s',", random.nextBoolean() ? "512kb" : "1mb") + "'changelog-producer' = 'lookup'," - + String.format( - "'changelog-producer.lookup-wait' = '%s',", random.nextBoolean()) + + String.format("'lookup-wait' = '%s',", random.nextBoolean()) + String.format( "'deletion-vectors.enabled' = '%s'", enableDeletionVectors)); @@ -551,8 +550,7 @@ private void testStandAloneLookupJobRandom( "'write-buffer-size' = '%s',", random.nextBoolean() ? "512kb" : "1mb") + "'changelog-producer' = 'lookup'," - + String.format( - "'changelog-producer.lookup-wait' = '%s',", random.nextBoolean()) + + String.format("'lookup-wait' = '%s',", random.nextBoolean()) + "'write-only' = 'true'"); // sleep for a random amount of time to check